Compare commits
1 Commits
main
...
mulit-matc
Author | SHA1 | Date | |
---|---|---|---|
cf026dc02c |
@ -1,28 +1,28 @@
|
|||||||
module Rex
|
module Rex
|
||||||
module Book
|
module Book
|
||||||
# Trade Tracker implemented as a simple ring buffer
|
|
||||||
class TradeTracker
|
class TradeTracker
|
||||||
include Enumerable
|
|
||||||
DEFAULT_TRADE_HISTORY_LIMIT = 200
|
DEFAULT_TRADE_HISTORY_LIMIT = 200
|
||||||
|
|
||||||
attr_reader :trades
|
attr_reader :trades
|
||||||
|
|
||||||
def initialize(limit: DEFAULT_TRADE_HISTORY_LIMIT)
|
def initialize(limit: DEFAULT_TRADE_HISTORY_LIMIT)
|
||||||
@limit = limit
|
@limit = limit
|
||||||
@trades = Array.new(limit)
|
@trades = []
|
||||||
@current_id = -1
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def add(trade)
|
def add(trade)
|
||||||
@current_id += 1
|
@trades.push(trade)
|
||||||
idx = (@current_id % @limit)
|
cap
|
||||||
@trades[idx] = trade
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def each(&block)
|
def fetch_trades(limit)
|
||||||
(([@current_id - @limit, 0].max)..@current_id ).each do |id|
|
@trades.last(limit)
|
||||||
yield @trades[id % @limit]
|
end
|
||||||
end
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def cap
|
||||||
|
@trades = @trades.last(@limit)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
require "json-schema"
|
require "json-schema"
|
||||||
|
|
||||||
|
require_relative "server/publisher"
|
||||||
require_relative "server/messages"
|
require_relative "server/messages"
|
||||||
|
|
||||||
require_relative "server/request_schemas"
|
require_relative "server/request_schemas"
|
||||||
@ -14,6 +15,7 @@ require_relative "server/websocket_server"
|
|||||||
|
|
||||||
require_relative "server/json_message_serializer"
|
require_relative "server/json_message_serializer"
|
||||||
require_relative "server/simple_server_setup"
|
require_relative "server/simple_server_setup"
|
||||||
|
require_relative "server/multi_matching_engine_server_setup"
|
||||||
|
|
||||||
module Rex
|
module Rex
|
||||||
module Server
|
module Server
|
||||||
|
@ -4,12 +4,14 @@ module Rex
|
|||||||
def initialize(inbox, outbox,
|
def initialize(inbox, outbox,
|
||||||
order_book: Rex::Book::LimitOrderBook.new,
|
order_book: Rex::Book::LimitOrderBook.new,
|
||||||
limit_volume_tracker: Rex::Book::LimitVolumeTracker.new,
|
limit_volume_tracker: Rex::Book::LimitVolumeTracker.new,
|
||||||
trade_tracker: Rex::Book::TradeTracker.new)
|
trade_tracker: Rex::Book::TradeTracker.new,
|
||||||
|
id: 1)
|
||||||
@inbox = inbox
|
@inbox = inbox
|
||||||
@outbox = outbox
|
@outbox = outbox
|
||||||
@order_book = order_book
|
@order_book = order_book
|
||||||
@limit_volume_tracker = limit_volume_tracker
|
@limit_volume_tracker = limit_volume_tracker
|
||||||
@trade_tracker = trade_tracker
|
@trade_tracker = trade_tracker
|
||||||
|
@id = id
|
||||||
end
|
end
|
||||||
|
|
||||||
def start
|
def start
|
||||||
@ -50,7 +52,7 @@ module Rex
|
|||||||
end
|
end
|
||||||
|
|
||||||
def fetch_trades(user_id)
|
def fetch_trades(user_id)
|
||||||
@trade_tracker.each do |trade|
|
@trade_tracker.fetch_trades(50).each do |trade|
|
||||||
@outbox.push(
|
@outbox.push(
|
||||||
Messages::TradeFetchEvent.new(
|
Messages::TradeFetchEvent.new(
|
||||||
trade.id,
|
trade.id,
|
||||||
|
63
lib/rex/server/multi_matching_engine_server_setup.rb
Normal file
63
lib/rex/server/multi_matching_engine_server_setup.rb
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
module Rex
|
||||||
|
module Server
|
||||||
|
# Implementation of a simple one node server setup
|
||||||
|
#
|
||||||
|
# Setups/Starts
|
||||||
|
#
|
||||||
|
# - a message processor to parse and forward
|
||||||
|
# messages to the matching engine
|
||||||
|
# - a matching engine instance to process any order book related
|
||||||
|
# messages
|
||||||
|
# - a websocket server to accept websocket base connections and
|
||||||
|
# process incoming messages
|
||||||
|
class MultiMatchingEngineServerSetup
|
||||||
|
def initialize(host, port)
|
||||||
|
matching_engine_inbox_pipe = EventMachine::Channel.new
|
||||||
|
|
||||||
|
matching_engine_out_channels = Hash[(1..3).map { |i| [i, EventMachine::Channel.new] }]
|
||||||
|
|
||||||
|
@matching_engines = matching_engine_out_channels.map do |id,channel|
|
||||||
|
MatchingEngine.new(
|
||||||
|
matching_engine_inbox_pipe,
|
||||||
|
channel,
|
||||||
|
id: id
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
matching_engine_published_channel = EventMachine::Channel.new
|
||||||
|
@publisher = Publisher.new(matching_engine_out_channels.values, matching_engine_published_channel)
|
||||||
|
|
||||||
|
|
||||||
|
message_processor_inflow_pipe = EventMachine::Channel.new
|
||||||
|
message_broker = MessageBroker.new(JsonMessageSerializer.new)
|
||||||
|
|
||||||
|
@message_processor = Rex::Server::MessageProcessor.new(
|
||||||
|
matching_engine_inbox_pipe,
|
||||||
|
matching_engine_published_channel,
|
||||||
|
message_broker,
|
||||||
|
message_processor_inflow_pipe
|
||||||
|
)
|
||||||
|
|
||||||
|
@websocket_server = Rex::Server::WebsocketServer.new(
|
||||||
|
message_broker,
|
||||||
|
message_processor_inflow_pipe,
|
||||||
|
host: host,
|
||||||
|
port: port
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
def start
|
||||||
|
@message_processor.start
|
||||||
|
@publisher.start
|
||||||
|
@matching_engines.each(&:start)
|
||||||
|
@websocket_server.start
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop
|
||||||
|
@matching_engines.each(&:stop)
|
||||||
|
@message_processor.stop
|
||||||
|
@websocket_server.stop
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
33
lib/rex/server/publisher.rb
Normal file
33
lib/rex/server/publisher.rb
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
module Rex
|
||||||
|
module Server
|
||||||
|
class Publisher
|
||||||
|
def initialize(inputs, output)
|
||||||
|
@inputs = inputs
|
||||||
|
@output = output
|
||||||
|
@buffer = Hash.new { |hash,key| hash[key] = [] }
|
||||||
|
end
|
||||||
|
|
||||||
|
def start
|
||||||
|
@input_subscription_ids = @inputs.each_with_index.map do |input,idex|
|
||||||
|
input.subscribe do |message|
|
||||||
|
process_message(message, idex)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def process_message(message, index)
|
||||||
|
@buffer[index].push(message)
|
||||||
|
|
||||||
|
first_values = @buffer.map { |_,v| v.first }.compact
|
||||||
|
return if first_values.length != @inputs.length
|
||||||
|
|
||||||
|
if first_values.compact.all? { |val| val == first_values[0] }
|
||||||
|
@output.push(first_values[0])
|
||||||
|
@buffer.each do |key,_|
|
||||||
|
@buffer[key].shift
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
@ -1,5 +1,5 @@
|
|||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
module Rex
|
module Rex
|
||||||
VERSION = "0.1.1"
|
VERSION = "0.1.0"
|
||||||
end
|
end
|
||||||
|
@ -12,7 +12,7 @@ Gem::Specification.new do |spec|
|
|||||||
spec.description = "A simple limit order book with a matching engine implementation"
|
spec.description = "A simple limit order book with a matching engine implementation"
|
||||||
spec.homepage = "https://git.timkaechele.me/timkaechele/rex"
|
spec.homepage = "https://git.timkaechele.me/timkaechele/rex"
|
||||||
spec.required_ruby_version = ">= 2.6.0"
|
spec.required_ruby_version = ">= 2.6.0"
|
||||||
spec.licenses = ["AGPL-3.0-only"]
|
|
||||||
spec.metadata["homepage_uri"] = spec.homepage
|
spec.metadata["homepage_uri"] = spec.homepage
|
||||||
|
|
||||||
# Specify which files should be added to the gem when it is released.
|
# Specify which files should be added to the gem when it is released.
|
||||||
@ -28,10 +28,10 @@ Gem::Specification.new do |spec|
|
|||||||
|
|
||||||
spec.add_dependency "rbtree", "~> 0.4.6"
|
spec.add_dependency "rbtree", "~> 0.4.6"
|
||||||
|
|
||||||
spec.add_dependency "eventmachine", "~> 1.2"
|
spec.add_runtime_dependency "eventmachine", "~> 1.2"
|
||||||
spec.add_dependency "em-websocket", "~> 0.5"
|
spec.add_runtime_dependency "em-websocket", "~> 0.5"
|
||||||
|
|
||||||
spec.add_dependency "json-schema", "~> 4"
|
spec.add_runtime_dependency "json-schema", "~> 4"
|
||||||
# For more information and examples about making a new gem, check out our
|
# For more information and examples about making a new gem, check out our
|
||||||
# guide at: https://bundler.io/guides/creating_gem.html
|
# guide at: https://bundler.io/guides/creating_gem.html
|
||||||
end
|
end
|
||||||
|
@ -15,10 +15,10 @@ RSpec.describe Rex::Book::TradeTracker do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe "#each" do
|
describe "#fetch_trades" do
|
||||||
let(:n) { 3 }
|
let(:n) { 3 }
|
||||||
it "returns only the last n trades" do
|
it "returns only the last n trades" do
|
||||||
expect(instance.to_a).to eq([10, 11])
|
expect(instance.fetch_trades(1)).to eq([11])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
Loading…
Reference in New Issue
Block a user