rex/lib/rex/server/publisher.rb

34 lines
842 B
Ruby

module Rex
module Server
class Publisher
def initialize(inputs, output)
@inputs = inputs
@output = output
@buffer = Hash.new { |hash,key| hash[key] = [] }
end
def start
@input_subscription_ids = @inputs.each_with_index.map do |input,idex|
input.subscribe do |message|
process_message(message, idex)
end
end
end
def process_message(message, index)
@buffer[index].push(message)
first_values = @buffer.map { |_,v| v.first }.compact
return if first_values.length != @inputs.length
if first_values.compact.all? { |val| val == first_values[0] }
@output.push(first_values[0])
@buffer.each do |key,_|
@buffer[key].shift
end
end
end
end
end
end