using System.Dynamic; using System.Text.Json; using ZeroFramework.EventBus.Abstractions; using ZeroFramework.EventBus.Events; namespace ZeroFramework.EventBus.MemoryQueue { public class InMemoryEventBus(IEventBusSubscriptionsManager subsManager, IServiceProvider serviceProvider) : IEventBus { private readonly IEventBusSubscriptionsManager _subsManager = subsManager; private readonly IServiceProvider _serviceProvider = serviceProvider; public async Task PublishAsync(IntegrationEvent @event, CancellationToken cancellationToken = default) { string eventName = @event.GetType().Name; string message = JsonSerializer.Serialize(@event, @event.GetType()); await ProcessEvent(eventName, message); } public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { _subsManager.AddSubscription(); } public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.AddDynamicSubscription(eventName); } public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler { _subsManager.RemoveSubscription(); } public void UnsubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { _subsManager.RemoveDynamicSubscription(eventName); } private async Task ProcessEvent(string eventName, string message) { if (_subsManager.HasSubscriptionsForEvent(eventName)) { var subscriptions = _subsManager.GetHandlersForEvent(eventName); foreach (var subscription in subscriptions) { if (subscription.IsDynamic) { if (_serviceProvider.GetService(subscription.HandlerType) is IDynamicIntegrationEventHandler handler) { dynamic? eventData = JsonSerializer.Deserialize(message); await handler.HandleAsync(eventData); } } else { var handler = _serviceProvider.GetService(subscription.HandlerType); if (handler is not null) { var eventType = _subsManager.GetEventTypeByName(eventName); object? integrationEvent = JsonSerializer.Deserialize(message, eventType); var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); if (integrationEvent is not null) { Task? task = concreteType.GetMethod("HandleAsync")?.Invoke(handler, new object[] { integrationEvent }) as Task; task ??= Task.CompletedTask; await task; } } } } } } } }