Implement basic message broker

This commit is contained in:
Tim Kächele 2024-02-03 22:25:37 +01:00
parent 14557d2a34
commit b9ea7cca30
7 changed files with 208 additions and 0 deletions

View File

@ -2,6 +2,7 @@
require_relative "rex/version"
require_relative "rex/book"
require_relative "rex/server"
module Rex
class Error < StandardError; end

8
lib/rex/server.rb Normal file
View File

@ -0,0 +1,8 @@
# frozen_string_literal: true
require_relative "server/message_broker"
module Rex
module Server
VERSION = "0.1.0"
end
end

View File

@ -0,0 +1,72 @@
module Rex
module Server
# Responsible for brokering outgoing messages to the correct clients
class MessageBroker
def initialize(message_serializer)
@connections = {}
@connection_message_counter = {}
@connection_id = 0
@user_connection_associations = {}
@connection_user_associations = {}
@message_serializer = message_serializer
end
def register(connection)
@connection_id += 1
@connections[@connection_id] = connection
@connection_message_counter[@connection_id] = 0
@connection_id
end
def send_to_all(message)
@connections.each do |connection_id, _|
send_to_connection(connection_id, message)
end
end
def send_to_user(user_id, message)
connection_ids = @user_connection_associations[user_id]
return unless connection_ids
connection_ids.each do |connection_id|
send_to_connection(connection_id, message)
end
end
def send_to_connection(connection_id, message)
connection = @connections[connection_id]
return unless connection
@connection_message_counter[connection_id] += 1
connection.send(serialize(message))
end
def user_id_for_connection(connection_id)
@connection_user_associations[connection_id]
end
def associate_connection_with_user(connection_id, user_id)
@user_connection_associations[user_id] ||= []
@user_connection_associations[user_id].push(connection_id)
@connection_user_associations[connection_id] = user_id
end
def unregister(connection_id)
@connections.delete(connection_id)
@connection_message_counter.delete(connection_id)
@user_connection_associations[
@connection_user_associations[connection_id]
].delete(connection_id)
@connection_user_associations.delete(connection_id)
end
private
def serialize(message)
@message_serializer.serialize(message)
end
end
end
end

View File

@ -0,0 +1,104 @@
# require "eventmachine"
RSpec.describe Rex::Server::MessageBroker do
subject(:instance) { described_class.new(NoOpSerializer.new) }
let(:connection_a) { MockConnection.new }
let(:connection_b) { MockConnection.new }
let!(:connection_id_a) { instance.register(connection_a) }
let!(:connection_id_b) { instance.register(connection_b) }
describe "#send_to_all" do
subject { instance.send_to_all("Hello, World") }
it "sends the message to all connections" do
expect { subject }.to(
change { connection_a.messages }.from([]).to(["Hello, World"])
.and(
change { connection_b.messages }.from([]).to(["Hello, World"])
)
)
end
end
describe "#send_to_user" do
before do
instance.associate_connection_with_user(connection_id_a, "user_id")
end
subject { instance.send_to_user("user_id", "Hello, World") }
it "sends the message to the connection associated with the user" do
expect { subject }.to(
change { connection_a.messages }.to(["Hello, World"])
.and(
not_change { connection_b.messages }
)
)
end
context "when user id is unknown" do
subject { instance.send_to_user("unknown_user_id", "Hello, World") }
it "sends no messages" do
expect { subject }.to(
not_change { connection_a.messages }
.and(
not_change { connection_b.messages }
)
)
end
end
end
describe "#send_to_connection" do
subject { instance.send_to_connection(connection_id_a, "Hello, World") }
it "sends the message to the connection" do
expect { subject }.to(
change { connection_a.messages }.to(["Hello, World"])
.and(
not_change { connection_b.messages }
)
)
end
context "when the connection id is unknown" do
subject { instance.send_to_connection(-99, "Hello, World") }
it "sends no messages" do
expect { subject }.to(
not_change { connection_a.messages }
.and(
not_change { connection_b.messages }
)
)
end
end
end
describe "#user_id_for_connection" do
subject { instance.user_id_for_connection(connection_id_a) }
context "when connection has user associated with it" do
before do
instance.associate_connection_with_user(connection_id_a, "user_id")
end
it "returns the user id" do
expect(subject).to eq("user_id")
end
end
context "when connection has no user associated with it" do
it { is_expected.to be_nil }
end
context "when connection was unregistered after being associated" do
before do
instance.associate_connection_with_user(connection_id_a, "user_id")
instance.unregister(connection_id_a)
end
it { is_expected.to be_nil }
end
end
end

View File

@ -0,0 +1,17 @@
class MockConnection
def initialize
@inbox = []
end
def send(message)
@inbox.push(message)
end
def messages
@inbox
end
def clear_inbox!
@inbox = []
end
end

View File

@ -0,0 +1 @@
RSpec::Matchers.define_negated_matcher :not_change, :change

View File

@ -0,0 +1,5 @@
class NoOpSerializer
def serialize(object)
object
end
end