diff --git a/lib/rex/server.rb b/lib/rex/server.rb index e72b590..0b6b997 100644 --- a/lib/rex/server.rb +++ b/lib/rex/server.rb @@ -6,6 +6,8 @@ require_relative "server/messages" require_relative "server/request_schemas" require_relative "server/message_parser" + +require_relative "server/matching_engine" require_relative "server/message_broker" module Rex module Server diff --git a/lib/rex/server/matching_engine.rb b/lib/rex/server/matching_engine.rb new file mode 100644 index 0000000..040374f --- /dev/null +++ b/lib/rex/server/matching_engine.rb @@ -0,0 +1,195 @@ +module Rex + module Server + class MatchingEngine + def initialize(inbox, outbox, + order_book: Rex::Book::LimitOrderBook.new, + limit_volume_tracker: Rex::Book::LimitVolumeTracker.new, + trade_tracker: Rex::Book::TradeTracker.new) + @inbox = inbox + @outbox = outbox + @order_book = order_book + @limit_volume_tracker = limit_volume_tracker + @trade_tracker = trade_tracker + end + + def start + @subscription_id = @inbox.subscribe do |message| + process_message(message) + end + end + + def stop + return unless @subscription_id + + @inbox.unsubscribe(@subscription_id) + @subscription_id = nil + end + + private + + def process_message(message) + case message + when Rex::Server::Messages::CreateOrderRequest + create_order( + Rex::Book::Order.new( + user_id: message.user_id, + price: message.price, + quantity: message.quantity, + is_buy: message.side == :buy + ) + ) + when Rex::Server::Messages::CancelOrderRequest + cancel_order(message.order_id) + when Rex::Server::Messages::FetchOrderBookRequest + fetch_orderbook(message.user_id) + when Rex::Server::Messages::FetchTradesRequest + fetch_trades(message.user_id) + when Rex::Server::Messages::FetchOrdersRequest + fetch_orders(message.user_id) + end + end + + def fetch_trades(user_id) + @trade_tracker.fetch_trades(50).each do |trade| + @outbox.push( + Messages::TradeFetchEvent.new( + trade.id, + user_id, + trade.price, + trade.quantity + ) + ) + end + end + + def fetch_orders(user_id) + @order_book.orders_for_user(user_id).each do |order| + @outbox.push( + Messages::OrderFetchEvent.new( + order.id, + order.user_id, + order_side(order), + order.quantity, + order.remaining_quantity, + order.price + ) + ) + end + end + + def fetch_orderbook(user_id) + @limit_volume_tracker.each do |limit_volume_change| + @outbox.push( + Messages::OrderBookFetchEvent.new( + user_id, + limit_volume_change.side, + limit_volume_change.price, + limit_volume_change.quantity + ) + ) + end + end + + def create_order(order) + @order_book.add_order(order) + @outbox.push( + Messages::OrderCreatedEvent.new( + order.id, + order.user_id, + order_side(order), + order.quantity, + order.remaining_quantity, + order.price + ) + ) + + changes = @limit_volume_tracker.add_order(order) + + changes.each do |limit_volume_change| + @outbox.push( + Messages::OrderBookUpdateEvent.new( + limit_volume_change.side, + limit_volume_change.price, + limit_volume_change.quantity + ) + ) + end + + trades = @order_book.match + + trades.each do |trade| + @trade_tracker.add(trade) + + @outbox.push( + Messages::TradeEvent.new( + trade.id, + trade.price, + trade.quantity + ) + ) + + @outbox.push( + Messages::OrderFillEvent.new( + trade.buy_order.id, + trade.buy_order.user_id, + trade.buy_order.price, + order_side(trade.buy_order), + trade.buy_order.remaining_quantity + ) + ) + + @outbox.push( + Messages::OrderFillEvent.new( + trade.sell_order.id, + trade.sell_order.user_id, + trade.sell_order.price, + order_side(trade.sell_order), + trade.sell_order.remaining_quantity + ) + ) + + @limit_volume_tracker.process_trade(trade).each do |limit_volume_change| + @outbox.push( + Messages::OrderBookUpdateEvent.new( + limit_volume_change.side, + limit_volume_change.price, + limit_volume_change.quantity + ) + ) + end + end + end + + def cancel_order(order_id) + order = @order_book.remove_order(order_id) + return unless order + + @outbox.push( + Messages::OrderCancelledEvent.new( + order.id, + order.user_id, + order_side(order), + order.remaining_quantity, + order.price + ) + ) + + changes = @limit_volume_tracker.remove_order(order) + + changes.each do |limit_volume_change| + @outbox.push( + Messages::OrderBookUpdateEvent.new( + limit_volume_change.side, + limit_volume_change.price, + limit_volume_change.quantity + ) + ) + end + end + + def order_side(order) + order.is_buy ? :buy : :sell + end + end + end +end diff --git a/spec/server/matching_engine_spec.rb b/spec/server/matching_engine_spec.rb new file mode 100644 index 0000000..bdae07f --- /dev/null +++ b/spec/server/matching_engine_spec.rb @@ -0,0 +1,223 @@ +# require "eventmachine" +RSpec.describe Rex::Server::MatchingEngine do + let(:inbox) { MockQueue.new } + let(:outbox) { MockQueue.new } + + subject(:instance) { described_class.new(inbox, outbox) } + + around do |example| + instance.start + example.run + instance.stop + end + + before do + messages.each do |message| + inbox.push(message) + end + end + + describe "order creation message" do + let(:messages) do + [ + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 1, + price: 10, + quantity: 20, + side: :buy + ) + ] + end + + it "sends order created event followed by order book update event" do + expect(outbox.messages).to eq([ + Rex::Server::Messages::OrderCreatedEvent.new(1, 1, :buy, 20, 20, 10), + Rex::Server::Messages::OrderBookUpdateEvent.new(:buy, 10, 20) + ]) + end + + context "when my order matches an order" do + let(:messages) do + [ + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 1, + price: 10, + quantity: 20, + side: :buy + ), + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 2, + price: 9, + quantity: 22, + side: :sell + ) + ] + end + + it "returns order book updates, order fill updates and trades" do + expect(outbox.messages).to eq([ + Rex::Server::Messages::OrderCreatedEvent.new(id: 1, user_id: 1, side: :buy, quantity: 20, remaining_quantity: 20, price: 10), + Rex::Server::Messages::OrderBookUpdateEvent.new(side: :buy, price: 10, quantity: 20), + Rex::Server::Messages::OrderCreatedEvent.new(id: 2, user_id: 2, side: :sell, quantity: 22, remaining_quantity: 22, price: 9), + Rex::Server::Messages::OrderBookUpdateEvent.new(side: :sell, price: 9, quantity: 22), + Rex::Server::Messages::TradeEvent.new(id: 1, price: 9, quantity: 20), + Rex::Server::Messages::OrderFillEvent.new(id: 1, user_id: 1, price: 10, side: :buy, remaining_quantity: 0), + Rex::Server::Messages::OrderFillEvent.new(id: 2, user_id: 2, price: 9, side: :sell, remaining_quantity: 2), + Rex::Server::Messages::OrderBookUpdateEvent.new(side: :buy, price: 10, quantity: 0), + Rex::Server::Messages::OrderBookUpdateEvent.new(side: :sell, price: 9, quantity: 2) + ]) + end + end + end + + describe "cancel order request" do + context "when order is not existent" do + let(:messages) do + [ + Rex::Server::Messages::CancelOrderRequest.new(1, 1) + ] + end + + it "returns nothing" do + expect(outbox.messages).to be_empty + end + end + + context "when order exists" do + let(:messages) do + [ + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 1, + price: 10, + quantity: 20, + side: :buy + ), + Rex::Server::Messages::CancelOrderRequest.new(1, 1) + ] + end + + it "returns a cancelled order event and updates the order book" do + expect(outbox.messages.last(2)).to eq([ + Rex::Server::Messages::OrderCancelledEvent.new(id: 1, user_id: 1, side: :buy, remaining_quantity: 20, price: 10), + Rex::Server::Messages::OrderBookUpdateEvent.new(side: :buy, price: 10, quantity: 0) + ]) + end + end + end + + describe "fetch order book request" do + let(:messages) do + [ + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 1, + price: 10, + quantity: 20, + side: :buy + ), + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 2, + price: 11, + quantity: 22, + side: :sell + ), + Rex::Server::Messages::FetchOrderBookRequest.new(3) + ] + end + + it "returns a list limit order book updates" do + expect(outbox.messages.last(2)).to eq([ + Rex::Server::Messages::OrderBookFetchEvent.new( + 3, + :buy, + 10, + 20 + ), + Rex::Server::Messages::OrderBookFetchEvent.new( + 3, + :sell, + 11, + 22 + ) + ]) + end + end + + describe "fetch trades request" do + context "when no trades exist" do + let(:messages) do + [ + Rex::Server::Messages::FetchTradesRequest.new(1) + ] + end + + it "returns nothing" do + expect(outbox.messages).to be_empty + end + end + + context "when trades exist" do + let(:messages) do + [ + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 1, + price: 10, + quantity: 15, + side: :buy + ), + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 2, + price: 10, + quantity: 15, + side: :sell + ), + Rex::Server::Messages::FetchTradesRequest.new(1) + ] + end + + it "returns trade fetch events" do + expect(outbox.messages.last(1)).to eq([ + Rex::Server::Messages::TradeFetchEvent.new( + id: 1, + user_id: 1, + price: 10, + quantity: 15 + ) + ]) + end + end + end + + describe "fetch orders request" do + context "when user does not have any orders" do + let(:messages) do + [ + Rex::Server::Messages::FetchOrdersRequest.new(1) + ] + end + + it "returns nothing" do + expect(outbox.messages).to be_empty + end + end + + context "when user has orders" do + let(:messages) do + [ + Rex::Server::Messages::CreateOrderRequest.new( + user_id: 1, + price: 10, + quantity: 15, + side: :sell + ), + Rex::Server::Messages::FetchOrdersRequest.new(1) + ] + end + + it "returns the orders" do + expect(outbox.messages.last).to eq( + Rex::Server::Messages::OrderFetchEvent.new(id: 1, user_id: 1, side: :sell, quantity: 15, remaining_quantity: 15, price: 10) + ) + end + end + end +end