From b9ea7cca308f6f26a5cfe12425336fd03510aad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20K=C3=A4chele?= Date: Sat, 3 Feb 2024 22:25:37 +0100 Subject: [PATCH] Implement basic message broker --- lib/rex.rb | 1 + lib/rex/server.rb | 8 +++ lib/rex/server/message_broker.rb | 72 ++++++++++++++++++++ spec/server/message_broker_spec.rb | 104 +++++++++++++++++++++++++++++ spec/support/mock_connection.rb | 17 +++++ spec/support/negated_matchers.rb | 1 + spec/support/no_op_serializer.rb | 5 ++ 7 files changed, 208 insertions(+) create mode 100644 lib/rex/server.rb create mode 100644 lib/rex/server/message_broker.rb create mode 100644 spec/server/message_broker_spec.rb create mode 100644 spec/support/mock_connection.rb create mode 100644 spec/support/negated_matchers.rb create mode 100644 spec/support/no_op_serializer.rb diff --git a/lib/rex.rb b/lib/rex.rb index a127952..f0b20da 100644 --- a/lib/rex.rb +++ b/lib/rex.rb @@ -2,6 +2,7 @@ require_relative "rex/version" require_relative "rex/book" +require_relative "rex/server" module Rex class Error < StandardError; end diff --git a/lib/rex/server.rb b/lib/rex/server.rb new file mode 100644 index 0000000..9568a26 --- /dev/null +++ b/lib/rex/server.rb @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +require_relative "server/message_broker" +module Rex + module Server + VERSION = "0.1.0" + end +end diff --git a/lib/rex/server/message_broker.rb b/lib/rex/server/message_broker.rb new file mode 100644 index 0000000..2c65cb6 --- /dev/null +++ b/lib/rex/server/message_broker.rb @@ -0,0 +1,72 @@ +module Rex + module Server + # Responsible for brokering outgoing messages to the correct clients + class MessageBroker + def initialize(message_serializer) + @connections = {} + @connection_message_counter = {} + @connection_id = 0 + @user_connection_associations = {} + @connection_user_associations = {} + @message_serializer = message_serializer + end + + def register(connection) + @connection_id += 1 + + @connections[@connection_id] = connection + @connection_message_counter[@connection_id] = 0 + + @connection_id + end + + def send_to_all(message) + @connections.each do |connection_id, _| + send_to_connection(connection_id, message) + end + end + + def send_to_user(user_id, message) + connection_ids = @user_connection_associations[user_id] + return unless connection_ids + + connection_ids.each do |connection_id| + send_to_connection(connection_id, message) + end + end + + def send_to_connection(connection_id, message) + connection = @connections[connection_id] + return unless connection + + @connection_message_counter[connection_id] += 1 + connection.send(serialize(message)) + end + + def user_id_for_connection(connection_id) + @connection_user_associations[connection_id] + end + + def associate_connection_with_user(connection_id, user_id) + @user_connection_associations[user_id] ||= [] + @user_connection_associations[user_id].push(connection_id) + @connection_user_associations[connection_id] = user_id + end + + def unregister(connection_id) + @connections.delete(connection_id) + @connection_message_counter.delete(connection_id) + @user_connection_associations[ + @connection_user_associations[connection_id] + ].delete(connection_id) + @connection_user_associations.delete(connection_id) + end + + private + + def serialize(message) + @message_serializer.serialize(message) + end + end + end +end diff --git a/spec/server/message_broker_spec.rb b/spec/server/message_broker_spec.rb new file mode 100644 index 0000000..999153c --- /dev/null +++ b/spec/server/message_broker_spec.rb @@ -0,0 +1,104 @@ +# require "eventmachine" +RSpec.describe Rex::Server::MessageBroker do + subject(:instance) { described_class.new(NoOpSerializer.new) } + + let(:connection_a) { MockConnection.new } + let(:connection_b) { MockConnection.new } + + let!(:connection_id_a) { instance.register(connection_a) } + let!(:connection_id_b) { instance.register(connection_b) } + + describe "#send_to_all" do + subject { instance.send_to_all("Hello, World") } + + it "sends the message to all connections" do + expect { subject }.to( + change { connection_a.messages }.from([]).to(["Hello, World"]) + .and( + change { connection_b.messages }.from([]).to(["Hello, World"]) + ) + ) + end + end + + describe "#send_to_user" do + before do + instance.associate_connection_with_user(connection_id_a, "user_id") + end + + subject { instance.send_to_user("user_id", "Hello, World") } + + it "sends the message to the connection associated with the user" do + expect { subject }.to( + change { connection_a.messages }.to(["Hello, World"]) + .and( + not_change { connection_b.messages } + ) + ) + end + + context "when user id is unknown" do + subject { instance.send_to_user("unknown_user_id", "Hello, World") } + it "sends no messages" do + expect { subject }.to( + not_change { connection_a.messages } + .and( + not_change { connection_b.messages } + ) + ) + end + end + end + + describe "#send_to_connection" do + subject { instance.send_to_connection(connection_id_a, "Hello, World") } + + it "sends the message to the connection" do + expect { subject }.to( + change { connection_a.messages }.to(["Hello, World"]) + .and( + not_change { connection_b.messages } + ) + ) + end + + context "when the connection id is unknown" do + subject { instance.send_to_connection(-99, "Hello, World") } + + it "sends no messages" do + expect { subject }.to( + not_change { connection_a.messages } + .and( + not_change { connection_b.messages } + ) + ) + end + end + end + + describe "#user_id_for_connection" do + subject { instance.user_id_for_connection(connection_id_a) } + context "when connection has user associated with it" do + before do + instance.associate_connection_with_user(connection_id_a, "user_id") + end + + it "returns the user id" do + expect(subject).to eq("user_id") + end + end + + context "when connection has no user associated with it" do + it { is_expected.to be_nil } + end + + context "when connection was unregistered after being associated" do + before do + instance.associate_connection_with_user(connection_id_a, "user_id") + instance.unregister(connection_id_a) + end + + it { is_expected.to be_nil } + end + end +end diff --git a/spec/support/mock_connection.rb b/spec/support/mock_connection.rb new file mode 100644 index 0000000..5b33622 --- /dev/null +++ b/spec/support/mock_connection.rb @@ -0,0 +1,17 @@ +class MockConnection + def initialize + @inbox = [] + end + + def send(message) + @inbox.push(message) + end + + def messages + @inbox + end + + def clear_inbox! + @inbox = [] + end +end diff --git a/spec/support/negated_matchers.rb b/spec/support/negated_matchers.rb new file mode 100644 index 0000000..93f5b66 --- /dev/null +++ b/spec/support/negated_matchers.rb @@ -0,0 +1 @@ +RSpec::Matchers.define_negated_matcher :not_change, :change diff --git a/spec/support/no_op_serializer.rb b/spec/support/no_op_serializer.rb new file mode 100644 index 0000000..e8cd63e --- /dev/null +++ b/spec/support/no_op_serializer.rb @@ -0,0 +1,5 @@ +class NoOpSerializer + def serialize(object) + object + end +end