using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using System.Dynamic; using System.Net.Sockets; using System.Text; using System.Text.Json; using ZeroFramework.EventBus.Abstractions; using ZeroFramework.EventBus.Events; using ZeroFramework.EventBus.Extensions; namespace ZeroFramework.EventBus.RabbitMQ { public class EventBusRabbitMQ : IEventBus, IDisposable { const string ExchangeName = "my_event_bus"; private readonly IRabbitMQPersistentConnection _persistentConnection; private readonly ILogger _logger; private readonly IEventBusSubscriptionsManager _subsManager; private readonly IServiceProvider _serviceProvider; private readonly int _retryCount; private IModel _consumerChannel; private string? _queueName; public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, IServiceProvider serviceProvider, IEventBusSubscriptionsManager subsManager, string? queueName, int retryCount = 5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager(); _queueName = queueName; _consumerChannel = CreateConsumerChannel(); _serviceProvider = serviceProvider; _retryCount = retryCount; _subsManager.OnEventRemoved += SubsManager_OnEventRemoved!; } private void SubsManager_OnEventRemoved(object sender, string eventName) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using var channel = _persistentConnection.CreateModel(); channel.QueueUnbind(queue: _queueName, exchange: ExchangeName, routingKey: eventName); if (_subsManager.IsEmpty) { _queueName = string.Empty; _consumerChannel.Close(); } } public Task PublishAsync(IntegrationEvent @event, CancellationToken cancellationToken = default) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } var eventName = @event.GetType().Name; _logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName); using var channel = _persistentConnection.CreateModel(); _logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id); channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Direct); var message = JsonSerializer.Serialize(@event, @event.GetType()); var body = Encoding.UTF8.GetBytes(message); for (int retryAttempt = 1; retryAttempt <= _retryCount; retryAttempt++) { var time = TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)); try { var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; // persistent _logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id); channel.BasicPublish(exchange: ExchangeName, routingKey: eventName, mandatory: true, basicProperties: properties, body: body); break; } catch (SystemException ex) when (ex is BrokerUnreachableException || ex is SocketException) { _logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message); } Task.Delay(time, cancellationToken).Wait(cancellationToken); } return Task.CompletedTask; } public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); DoInternalSubscription(eventName); _subsManager.AddDynamicSubscription(eventName); StartBasicConsume(); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); DoInternalSubscription(eventName); _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); _subsManager.AddSubscription(); StartBasicConsume(); } private void DoInternalSubscription(string eventName) { var containsKey = _subsManager.HasSubscriptionsForEvent(eventName); if (!containsKey) { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } using var channel = _persistentConnection.CreateModel(); channel.QueueBind(queue: _queueName, exchange: ExchangeName, routingKey: eventName); } } public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { var eventName = _subsManager.GetEventKey(); _logger.LogInformation("Unsubscribing from event {EventName}", eventName); _subsManager.RemoveSubscription(); } public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.RemoveDynamicSubscription(eventName); } private bool disposed = false; // Protected implementation of Dispose pattern. protected virtual void Dispose(bool disposing) { if (disposed) { return; } // Release any managed resources here. if (disposing) { // dispose managed state (managed objects). _consumerChannel?.Dispose(); _subsManager.Clear(); } // free unmanaged resources (unmanaged objects) and override a finalizer below. // set large fields to null. disposed = true; // Call the base class implementation. //base.Dispose(disposing); } // Public implementation of Dispose pattern callable by consumers. public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } ~EventBusRabbitMQ() => Dispose(false); private void StartBasicConsume() { _logger.LogTrace("Starting RabbitMQ basic consume"); if (_consumerChannel != null) { var consumer = new AsyncEventingBasicConsumer(_consumerChannel); consumer.Received += Consumer_Received; _consumerChannel.BasicConsume(queue: _queueName, autoAck: false, consumer: consumer); } else { _logger.LogError("StartBasicConsume can't call on _consumerChannel == null"); } } private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) { var eventName = eventArgs.RoutingKey; var message = Encoding.UTF8.GetString(eventArgs.Body.ToArray()); try { if (message.ToLowerInvariant().Contains("throw-fake-exception")) { throw new InvalidOperationException($"Fake exception requested: \"{message}\""); } await ProcessEvent(eventName, message); } catch (Exception ex) { _logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message); } // Even on exception we take the message off the queue. // in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). // For more information see: https://www.rabbitmq.com/dlx.html _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); } private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) { _persistentConnection.TryConnect(); } _logger.LogTrace("Creating RabbitMQ consumer channel"); var channel = _persistentConnection.CreateModel(); channel.ExchangeDeclare(exchange: ExchangeName, type: ExchangeType.Direct); channel.QueueDeclare(queue: _queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.CallbackException += (sender, ea) => { _logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel"); _consumerChannel.Dispose(); _consumerChannel = CreateConsumerChannel(); StartBasicConsume(); }; return channel; } private async Task ProcessEvent(string eventName, string message) { _logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName); if (_subsManager.HasSubscriptionsForEvent(eventName)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { if (_serviceProvider.GetService(subscription.HandlerType) is IDynamicIntegrationEventHandler handler) { dynamic? eventData = JsonSerializer.Deserialize(message); await handler.HandleAsync(eventData); } } else { var handler = _serviceProvider.GetService(subscription.HandlerType); if (handler is not null) { var eventType = _subsManager.GetEventTypeByName(eventName); object? integrationEvent = JsonSerializer.Deserialize(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); if (integrationEvent is not null) { Task? task = concreteType.GetMethod("HandleAsync")?.Invoke(handler, new object[] { integrationEvent }) as Task; task ??= Task.CompletedTask; await task; } } } } } else { _logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName); } } } }