diff --git a/lib/rex/server.rb b/lib/rex/server.rb index fffd5ef..3b52411 100644 --- a/lib/rex/server.rb +++ b/lib/rex/server.rb @@ -2,6 +2,7 @@ require "json-schema" +require_relative "server/publisher" require_relative "server/messages" require_relative "server/request_schemas" @@ -14,6 +15,7 @@ require_relative "server/websocket_server" require_relative "server/json_message_serializer" require_relative "server/simple_server_setup" +require_relative "server/multi_matching_engine_server_setup" module Rex module Server diff --git a/lib/rex/server/matching_engine.rb b/lib/rex/server/matching_engine.rb index 040374f..4e585f0 100644 --- a/lib/rex/server/matching_engine.rb +++ b/lib/rex/server/matching_engine.rb @@ -4,12 +4,14 @@ module Rex def initialize(inbox, outbox, order_book: Rex::Book::LimitOrderBook.new, limit_volume_tracker: Rex::Book::LimitVolumeTracker.new, - trade_tracker: Rex::Book::TradeTracker.new) + trade_tracker: Rex::Book::TradeTracker.new, + id: 1) @inbox = inbox @outbox = outbox @order_book = order_book @limit_volume_tracker = limit_volume_tracker @trade_tracker = trade_tracker + @id = id end def start diff --git a/lib/rex/server/multi_matching_engine_server_setup.rb b/lib/rex/server/multi_matching_engine_server_setup.rb new file mode 100644 index 0000000..054f28f --- /dev/null +++ b/lib/rex/server/multi_matching_engine_server_setup.rb @@ -0,0 +1,63 @@ +module Rex + module Server + # Implementation of a simple one node server setup + # + # Setups/Starts + # + # - a message processor to parse and forward + # messages to the matching engine + # - a matching engine instance to process any order book related + # messages + # - a websocket server to accept websocket base connections and + # process incoming messages + class MultiMatchingEngineServerSetup + def initialize(host, port) + matching_engine_inbox_pipe = EventMachine::Channel.new + + matching_engine_out_channels = Hash[(1..3).map { |i| [i, EventMachine::Channel.new] }] + + @matching_engines = matching_engine_out_channels.map do |id,channel| + MatchingEngine.new( + matching_engine_inbox_pipe, + channel, + id: id + ) + end + + matching_engine_published_channel = EventMachine::Channel.new + @publisher = Publisher.new(matching_engine_out_channels.values, matching_engine_published_channel) + + + message_processor_inflow_pipe = EventMachine::Channel.new + message_broker = MessageBroker.new(JsonMessageSerializer.new) + + @message_processor = Rex::Server::MessageProcessor.new( + matching_engine_inbox_pipe, + matching_engine_published_channel, + message_broker, + message_processor_inflow_pipe + ) + + @websocket_server = Rex::Server::WebsocketServer.new( + message_broker, + message_processor_inflow_pipe, + host: host, + port: port + ) + end + + def start + @message_processor.start + @publisher.start + @matching_engines.each(&:start) + @websocket_server.start + end + + def stop + @matching_engines.each(&:stop) + @message_processor.stop + @websocket_server.stop + end + end + end +end diff --git a/lib/rex/server/publisher.rb b/lib/rex/server/publisher.rb new file mode 100644 index 0000000..b80fcfe --- /dev/null +++ b/lib/rex/server/publisher.rb @@ -0,0 +1,33 @@ +module Rex + module Server + class Publisher + def initialize(inputs, output) + @inputs = inputs + @output = output + @buffer = Hash.new { |hash,key| hash[key] = [] } + end + + def start + @input_subscription_ids = @inputs.each_with_index.map do |input,idex| + input.subscribe do |message| + process_message(message, idex) + end + end + end + + def process_message(message, index) + @buffer[index].push(message) + + first_values = @buffer.map { |_,v| v.first }.compact + return if first_values.length != @inputs.length + + if first_values.compact.all? { |val| val == first_values[0] } + @output.push(first_values[0]) + @buffer.each do |key,_| + @buffer[key].shift + end + end + end + end + end +end