Zum Hauptinhalt springen

part 8 - Message Bus with RabbitMQ

Goal is to implement the Message-Bus and add the PlatformService as a Publisher and the CommandService as a Subscriber.

notes about RabbitMQ

  • Message Broker: accepts, forwards messages
  • Messages are stored on Queues. (in real production those would be persisted if RabbitMQ crashes etc...)
  • uses AMQP - Advanced Message Queuing Protocl (among others)
  • 4 types of exchanges
    • direct exchange - delivers messages to queues based on a routing key. ideal for direct/unicasting messaging
    • fanout exchange (used here) - delivers messages to all queues bound to the exchange. ideal for broadcast messages.
    • topic exchance - routes messages to 1 or more queues based on routingkey/patterns. ideal for multicasting messages
    • header exchange

starting up RabbitMQ in Kubernetes

  • K8S/rabbitmq-depl.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rabbitmq-depl
spec:
replicas: 1
selector:
matchLabels:
app: rabbitmq
template:
metadata:
labels:
app: rabbitmq
spec:
containers:
- name: rabbitmq
image: rabbitmq:3-management
ports:
## first port is just to access the "management" webinterface
- containerPort: 15672
name: rbmq-mgmt-port
## this is the used port for the Bus itself
- containerPort: 5672
name: rbmq-msg-port
---
# the Bus needs to be accessible from the Services inside Kubernetes, so we create a ClusterIP for it
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-clusterip-srv
spec:
type: ClusterIP
selector:
app: rabbitmq
ports:
- name: rbmq-mgmt-port
protocol: TCP
port: 15672
targetPort: 15672
- name: rbmq-msg-port
protocol: TCP
port: 5672
targetPort: 5672
---
# the Bus also needs to be accessible from outside the Kubernetes (at least for development)
# so we create a LoadbalancerService for it
apiVersion: v1
kind: Service
metadata:
name: rabbitmq-loadbalancer
spec:
type: LoadBalancer
selector:
app: rabbitmq
ports:
- name: rbmq-mgmt-port
protocol: TCP
port: 15672
targetPort: 15672
- name: rbmq-msg-port
protocol: TCP
port: 5672
targetPort: 5672
  • then we deploy our messagebus
kubectl apply -f K8S/rabbitmq-depl.yaml
  • now we can reach out messagebus webinterface with localhost:15672 username: guest password: guest

Code in PlatformService - The Publisher

dotnet add package RabbitMQ.Client
  • we add to appsettings.Development.json
"RabbitMQHost": "localhost",
"RabbitMQPort": "5672"
  • we add to appsettings.Production.json
"RabbitMQHost": "rabbitmq-clusterip-srv",
"RabbitMQPort": "5672"
  • we create Dtos/PlatformPublishedDto This is the Event that gets pushed onto the MessageBus
public class PlatformPublishdDto {
public required int Id { get; set; }
public required string Name { get; set; }
public required string Event { get; set; }
}

Implementing the Message Bus Client

  • we create an interface for the following RabbitMQ Message Bus implementation. AsyncDataServices/IMessageBusClient.cs
public interface IMessageBusClient {
void PublishNewPlatform(PlatformPublishdDto newCreatedPlatform);
}
  • we inject our Bus in Program.cs. Here as a Singleton as we assume it always stays the "same" connection.
builder.Services.AddSingleton<IMessageBusClient, MessageBusClient>();

AsyncDataServices/IMessageBusClient.cs

public class MessageBusClient : IMessageBusClient
{
private readonly IConfiguration _config;
private readonly RabbitMQ.Client.IConnection _connection;
private readonly RabbitMQ.Client.IModel _channel;

public MessageBusClient(IConfiguration configuration) {
_config = configuration;
// RabbitMQ 1. wants the factory with config data
var factory = new RabbitMQ.Client.ConnectionFactory() {
HostName = _config["RabbitMQHost"],
Port = int.Parse(_config["RabbitMQPort"]!),
};

try {
// RabbitMQ 2. wants us to create the connection itself
_connection = factory.CreateConnection();

// RabbitMQ 3. wants us to create our channel
_channel = _connection.CreateModel();

// RabbitMQ 4. wants us to create the Exchange( in this case the fanout-type)
_channel.ExchangeDeclare(exchange: "trigger", type: ExchangeType.Fanout);

_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;

Console.WriteLine("--> Connected to MessageBus");

} catch (Exception e) {
Console.WriteLine($"--> Could not connect to the Messagebus! {e.Message}");
}
}

public void PublishNewPlatform(PlatformPublishdDto newCreatedPlatformDto) {
var message = JsonSerializer.Serialize(newCreatedPlatformDto);

if (_connection.IsOpen) {
Console.WriteLine("--> RabbitMQ Connection Open, sending message.");
SendMessage(message);
} else {
Console.WriteLine("--> RabbitMQ Connection CLOSED, NOT sending!");
}
}

private void SendMessage(string message) {
var body = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(
exchange: "trigger",
routingKey: "",
basicProperties: null,
body: body);

Console.WriteLine($"--> We have sent {message}");
}

// properly close ressources when this class leaves scope/dies
public void Dispose() {
Console.WriteLine("--> MessageBus Disposed");
if (_channel.IsOpen) {
_channel.Close();
_connection.Close();
}
}

// triggers every time the connection to the Bus gets shut down
private void RabbitMQ_ConnectionShutdown(object? sender, ShutdownEventArgs args) {
Console.WriteLine($"--> RabbitMQ Connection Shut Down. args={args}");
}
}

We Use that Bus to send in our Controller

  • for a first test we just spin up both Services while having Kubernetes active (for the RabbitMQ Bus).
  • When we send a postrequest to http://localhost:5062/api/platforms/ we see in our logs for PlatformService:
--> Connected to MessageBus
info: Microsoft.EntityFrameworkCore.Update[30100]
Saved 1 entities to in-memory store.
// ...
--> Sync POST to CommandService was OK.
--> RabbitMQ Connection Open, sending message.
--> We have sent {"Id":4,"Name":"Docker","Event":"New_Platform_Published"}
  • And a peek into the RabbitMQ manager web interface shows the recent message: Alt text

Code in CommandsService - The Subscriber

Setup

  • add the package
dotnet add package RabbitMQ.Client
  • add RabbitMQ-configSettings to appsettings.Development.json and add appsettings.Production.json like previous Service

  • we add two new Dtos: Dtos/GenericEventDto.cs Dtos/PlatformPublishedDto.cs

public class GenericEventDto
{
public required string Event { get; set; }
}
public class PlatformPublishedDto
{
public required int Id { get; set; }
public required string Name { get; set; }
public required string Event { get; set; }
}
  • we add to our Mappings:
CreateMap<PlatformPublishedDto, Platform>()
.ForMember(dest => dest.ExternalId, opt => opt.MapFrom(src => src.Id));
// Basically we want to take PlatformPublishedDto.Id and map it to our Platform.ExternalID
  • we add to our Data/ICommandRepo.cs
/// <summary>
/// Checks if we already added this ExternalPlatform to our data. If so were synced up. Makes sure we dont duplicate data.
/// </summary>
bool ExternalPlatformExist(int ExternalPlatformId);
  • we add to our Data/CommandRepo.cs
public bool ExternalPlatformExist(int ExternalPlatformId)
{
return _ctx.Platforms.Any(p => p.ExternalId == ExternalPlatformId);
}

Note about Dependency Injection:

The Implementation listening on our Bus will be added as a Singleton. - so for the lifetime of the app.

  • That Singleton-Service will create "instances" of the EventProcessor (available via dependency-injection)

    • so the EventProcessor MUST have a lifetime the same or greater than it's "parent" - so it MUST be a Singleton aswell.
    • A Consequence of this is, that we can not inject our repository here in the constructor of EventProcessor(). Because it will only get created once.
      • so we need to pass in the reference to repo another way.
  • we add EventProcessing/IEventProcessor.cs. Basically whenever we get an Event, we use this To Map over our Reactions. Ex we Deserialize it, add it to localPlatformsData if neccessary etc... Ignore it if its a wrong kind of Event...

public interface IEventProcessor
{
void ProcessEvent(string message);
}
  • and we inject it as singleton in our main:
builder.Services.AddSingleton<IEventProcessor, EventProcessor>();
  • finally we write the implementation EventProcessing/EventProcessor.cs
public class EventProcessor : IEventProcessor
{
private readonly IServiceScopeFactory _scopeFactory;
private readonly IMapper _mapper;

public EventProcessor(
IServiceScopeFactory scopeFactory,
IMapper mapper
)
{
_scopeFactory = scopeFactory;
_mapper = mapper;
}
public void ProcessEvent(string message)
{
var eventType = DetermineEvent(message);
switch (eventType)
{
case EventType.PlatformPublished:
AddPlatform(message);
break;
default:
break;
}
}

private EventType DetermineEvent(string notificationMessage)
{
Console.WriteLine("--> Determining event");
var eventType = JsonSerializer.Deserialize<GenericEventDto>(notificationMessage);

if (eventType is null)
{
Console.WriteLine("--> Serializing event-type wrent wrong. Is Null");
return EventType.Undetermined;
}

switch (eventType.Event)
{
case "New_Platform_Published":
Console.WriteLine("--> New_Platform_Published event-type detected.");
return EventType.PlatformPublished;
default:
Console.WriteLine("--> Could not determine event-type.");
return EventType.Undetermined;
}
}

// TODO use AddPlatform
private void AddPlatform(string platformPublishedMessage)
{
// use the scopeFactory to get access to our repository
// this is neccessary because of the different lifetimes of our repo vs our Singleton-EventProcessing
using (var scope = _scopeFactory.CreateScope())
{
var repo = scope.ServiceProvider.GetRequiredService<ICommandRepo>();
var platformPublishedDto = JsonSerializer.Deserialize<PlatformPublishedDto>(platformPublishedMessage);

try
{
var plat = _mapper.Map<Platform>(platformPublishedDto);
if (!repo.ExternalPlatformExist(plat.ExternalId))
{
repo.CreatePlatform(plat);
repo.SaveChanges();
Console.WriteLine($" --> Added Platform=[{plat.Name}] to local-Server");
}
else
{
Console.WriteLine(" --> Platform already exists in local db...");
}
}
catch (Exception e)
{
Console.WriteLine($"--> Could not add Platform do DB; {e.Message}");
}
}
}
}
enum EventType
{
PlatformPublished, // <= "New_Platform_Published" as Event string
Undetermined // <= any other Event
}
  • AsyncDataServices/MessageBusSubscribers.cs
// A Background Service. A long running Task. That probably will run over the whole duration of the App. If nothing goes wrong
public class MessageBusSubscriber : BackgroundService
{
private readonly IConfiguration _config;
private readonly IEventProcessor _eventProcessor;
private IConnection? _connection;
private IModel? _channel;
private string? _queueName;

public MessageBusSubscriber(IConfiguration config, IEventProcessor eventProcessor)
{
_config = config;
_eventProcessor = eventProcessor;
InitializeMQ();
}

private void InitializeMQ()
{
var factory = new ConnectionFactory()
{
HostName = _config["RabbitMQHost"],
Port = int.Parse(_config["RabbitMQPort"]!),
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "trigger", type: ExchangeType.Fanout);
_queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: _queueName,
exchange: "trigger",
routingKey: "");

Console.WriteLine("--> Listening on the Message Bus.");

_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
}

private void RabbitMQ_ConnectionShutdown(object? sender, ShutdownEventArgs e)
{
Console.WriteLine("--> connection Shutdown");
}

public override void Dispose()
{
if (_channel is not null && _channel.IsOpen)
{
_channel.Close();
_connection?.Close();
}
base.Dispose();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();

var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (Modulehandle, ea) =>
{
Console.WriteLine("--> Event Received,");
var body = ea.Body;
var notificationMessage = Encoding.UTF8.GetString(body.ToArray());
_eventProcessor.ProcessEvent(notificationMessage);
};
_channel.BasicConsume(queue: _queueName, autoAck: true, consumer: consumer);
return Task.CompletedTask;
}
}
  • we inject it to our main:
builder.Services.AddHostedService<MessageBusSubscriber>();
  • now we should be able to test the whole pipeline locally.
    • start both servers up
  • create a new Platform -> see the data hit our Subscriber:
  • Create commands for the new platform and get those back.
>> LOG FROM CommandsService
--> Inbound POST # Command Service
--> Event Received,
--> Determining event
--> New_Platform_Published event-type detected.
info: Microsoft.EntityFrameworkCore.Update[30100]
Saved 1 entities to in-memory store.
--> Added Platform=[Docker] to local-Server

We Publish the above steps to Kubernetes

// (using my Makefile)
make dev


// (manually)
docker build -t vincepr/platformservice ./PlatformService
docker push vincepr/platformservice
kubectl rollout restart deployment platforms-depl

docker build -t vincepr/commandservice ./CommandsService
docker push vincepr/commandservice
kubectl rollout restart deployment commands-depl
  • now everything should be working up there.