From 16386a8ea5c7dcf234e547f451de5bbb791e4bc8 Mon Sep 17 00:00:00 2001 From: hello Date: Wed, 9 Oct 2024 20:56:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E4=BA=8B=E5=8A=A1=E6=80=A7?= =?UTF-8?q?=E5=8F=91=E4=BB=B6=E7=AE=B1=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../HelloShop.ApiService.csproj | 2 +- .../HelloShop.AppHost.csproj | 4 +- .../HelloShop.BasketService.csproj | 10 +-- .../HelloShop.HybridApp.csproj | 8 +- .../HelloShop.IdentityService.csproj | 8 +- .../Behaviors/TransactionBehavior.cs | 5 +- .../Orders/CreateOrderCommandHandler.cs | 8 +- ...PaymentSucceededDistributedEventHandler.cs | 11 ++- .../Entities/EventLogs/DistributedEventLog.cs | 42 +++++++++ .../Extensions/Extensions.cs | 2 + .../HelloShop.OrderingService.csproj | 8 +- ...tributedEventLogEntityTypeConfiguration.cs | 27 ++++++ .../Services/DistributedEventService.cs | 89 +++++++++++++++++++ .../Services/IDistributedEventService.cs | 19 ++++ .../HelloShop.ProductService.csproj | 6 +- .../HelloShop.ServiceDefaults.csproj | 6 +- ...oShop.BasketService.FunctionalTests.csproj | 12 +-- .../HelloShop.BasketService.UnitTests.csproj | 6 +- .../HelloShop.FunctionalTests.csproj | 6 +- ...Shop.ProductService.FunctionalTests.csproj | 8 +- .../HelloShop.ProductService.UnitTests.csproj | 8 +- 21 files changed, 241 insertions(+), 54 deletions(-) create mode 100644 src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs create mode 100644 src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs create mode 100644 src/HelloShop.OrderingService/Services/DistributedEventService.cs create mode 100644 src/HelloShop.OrderingService/Services/IDistributedEventService.cs diff --git a/src/HelloShop.ApiService/HelloShop.ApiService.csproj b/src/HelloShop.ApiService/HelloShop.ApiService.csproj index 66e34c5..b3ff7dc 100644 --- a/src/HelloShop.ApiService/HelloShop.ApiService.csproj +++ b/src/HelloShop.ApiService/HelloShop.ApiService.csproj @@ -9,6 +9,6 @@ - + \ No newline at end of file diff --git a/src/HelloShop.AppHost/HelloShop.AppHost.csproj b/src/HelloShop.AppHost/HelloShop.AppHost.csproj index 0d68099..8ce83a0 100644 --- a/src/HelloShop.AppHost/HelloShop.AppHost.csproj +++ b/src/HelloShop.AppHost/HelloShop.AppHost.csproj @@ -15,7 +15,7 @@ - - + + \ No newline at end of file diff --git a/src/HelloShop.BasketService/HelloShop.BasketService.csproj b/src/HelloShop.BasketService/HelloShop.BasketService.csproj index 6aea2ac..125ea08 100644 --- a/src/HelloShop.BasketService/HelloShop.BasketService.csproj +++ b/src/HelloShop.BasketService/HelloShop.BasketService.csproj @@ -13,12 +13,12 @@ - + - - - - + + + + diff --git a/src/HelloShop.HybridApp/HelloShop.HybridApp.csproj b/src/HelloShop.HybridApp/HelloShop.HybridApp.csproj index 63578c4..4368cfd 100644 --- a/src/HelloShop.HybridApp/HelloShop.HybridApp.csproj +++ b/src/HelloShop.HybridApp/HelloShop.HybridApp.csproj @@ -58,10 +58,10 @@ - - - - + + + + diff --git a/src/HelloShop.IdentityService/HelloShop.IdentityService.csproj b/src/HelloShop.IdentityService/HelloShop.IdentityService.csproj index b610f84..997f47f 100644 --- a/src/HelloShop.IdentityService/HelloShop.IdentityService.csproj +++ b/src/HelloShop.IdentityService/HelloShop.IdentityService.csproj @@ -8,12 +8,12 @@ - - - + + + runtime; build; native; contentfiles; analyzers; buildtransitive all - + \ No newline at end of file diff --git a/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs b/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs index fda8b7e..568e4ac 100644 --- a/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs +++ b/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs @@ -3,13 +3,14 @@ using HelloShop.OrderingService.Extensions; using HelloShop.OrderingService.Infrastructure; +using HelloShop.OrderingService.Services; using MediatR; using Microsoft.EntityFrameworkCore; using System.Data; namespace HelloShop.OrderingService.Behaviors { - public class TransactionBehavior(OrderingServiceDbContext dbContext, ILoggerFactory loggerFactory) : IPipelineBehavior where TRequest : IRequest + public class TransactionBehavior(OrderingServiceDbContext dbContext, ILoggerFactory loggerFactory, IDistributedEventService eventService) : IPipelineBehavior where TRequest : IRequest { private readonly ILogger> _logger = loggerFactory.CreateLogger>(); @@ -42,6 +43,8 @@ namespace HelloShop.OrderingService.Behaviors } await transaction.CommitAsync(); + + await eventService.PublishEventsThroughEventBusAsync(transaction.TransactionId, cancellationToken); }); return response ?? throw new ApplicationException($"Command {typeName} returned null response"); diff --git a/src/HelloShop.OrderingService/Commands/Orders/CreateOrderCommandHandler.cs b/src/HelloShop.OrderingService/Commands/Orders/CreateOrderCommandHandler.cs index c10c65c..38e5b89 100644 --- a/src/HelloShop.OrderingService/Commands/Orders/CreateOrderCommandHandler.cs +++ b/src/HelloShop.OrderingService/Commands/Orders/CreateOrderCommandHandler.cs @@ -14,10 +14,14 @@ using Microsoft.EntityFrameworkCore; namespace HelloShop.OrderingService.Commands.Orders { - public class CreateOrderCommandHandler(IMediator mediator, OrderingServiceDbContext dbContext, IMapper mapper, IDistributedEventBus distributedEventBus) : IRequestHandler + public class CreateOrderCommandHandler(IMediator mediator, IDistributedEventService distributedEventService, OrderingServiceDbContext dbContext, IMapper mapper) : IRequestHandler { public async Task Handle(CreateOrderCommand request, CancellationToken cancellationToken) { + var orderStartedIntegrationEvent = new OrderStartedDistributedEvent(request.UserId); + + await distributedEventService.AddAndSaveEventAsync(orderStartedIntegrationEvent, cancellationToken); + Address address = mapper.Map
(request); IEnumerable orderItems = mapper.Map>(request.OrderItems); @@ -47,8 +51,6 @@ namespace HelloShop.OrderingService.Commands.Orders await mediator.Publish(new OrderStartedLocalEvent(order), cancellationToken); - await distributedEventBus.PublishAsync(new OrderStartedDistributedEvent(request.UserId), cancellationToken); - return await Task.FromResult(true); } } diff --git a/src/HelloShop.OrderingService/DistributedEvents/EventHandling/OrderPaymentSucceededDistributedEventHandler.cs b/src/HelloShop.OrderingService/DistributedEvents/EventHandling/OrderPaymentSucceededDistributedEventHandler.cs index 34c6cc0..6e27c61 100644 --- a/src/HelloShop.OrderingService/DistributedEvents/EventHandling/OrderPaymentSucceededDistributedEventHandler.cs +++ b/src/HelloShop.OrderingService/DistributedEvents/EventHandling/OrderPaymentSucceededDistributedEventHandler.cs @@ -4,12 +4,13 @@ using HelloShop.OrderingService.DistributedEvents.Events; using HelloShop.OrderingService.Entities.Orders; using HelloShop.OrderingService.Infrastructure; +using HelloShop.OrderingService.Services; using HelloShop.ServiceDefaults.DistributedEvents.Abstractions; using Microsoft.EntityFrameworkCore; namespace HelloShop.OrderingService.DistributedEvents.EventHandling { - public class OrderPaymentSucceededDistributedEventHandler(OrderingServiceDbContext dbContext, IDistributedEventBus distributedEventBus) : IDistributedEventHandler + public class OrderPaymentSucceededDistributedEventHandler(OrderingServiceDbContext dbContext, IDistributedEventService distributedEventService) : IDistributedEventHandler { public async Task HandleAsync(OrderPaymentSucceededDistributedEvent @event) { @@ -29,13 +30,15 @@ namespace HelloShop.OrderingService.DistributedEvents.EventHandling await dbContext.SaveChangesAsync(); - await transaction.CommitAsync(); - var orderStockList = order.OrderItems.Select(orderItem => new OrderStockItem(orderItem.ProductId, orderItem.Units)); var integrationEvent = new OrderPaidDistributedEvent(order.Id, orderStockList); - await distributedEventBus.PublishAsync(integrationEvent); + await distributedEventService.AddAndSaveEventAsync(integrationEvent); + + await distributedEventService.PublishEventsThroughEventBusAsync(transaction.TransactionId); + + await transaction.CommitAsync(); }); } } diff --git a/src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs b/src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs new file mode 100644 index 0000000..0c66301 --- /dev/null +++ b/src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs @@ -0,0 +1,42 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using HelloShop.ServiceDefaults.DistributedEvents.Abstractions; +using System.Diagnostics.CodeAnalysis; + +namespace HelloShop.OrderingService.Entities.EventLogs +{ + public enum DistributedEventStatus { NotPublished, InProgress, Published, PublishedFailed } + + public class DistributedEventLog + { + public Guid EventId { get; set; } + + public required string EventTypeName { get; set; } + + public required DistributedEvent DistributedEvent { get; set; } + + public DistributedEventStatus Status { get; set; } = DistributedEventStatus.NotPublished; + + public int TimesSent { get; set; } + + public DateTimeOffset CreationTime { get; set; } = DateTimeOffset.UtcNow; + + public required Guid TransactionId { get; set; } + + /// + /// EF Core cannot set navigation properties using a constructor. + /// The constructor can be public, private, or have any other accessibility. + /// + private DistributedEventLog() { } + + [SetsRequiredMembers] + public DistributedEventLog(DistributedEvent @event, Guid transactionId) + { + EventId = @event.Id; + EventTypeName = @event.GetType().Name; + DistributedEvent = @event; + TransactionId = transactionId; + } + } +} diff --git a/src/HelloShop.OrderingService/Extensions/Extensions.cs b/src/HelloShop.OrderingService/Extensions/Extensions.cs index 2acae5e..53253d0 100644 --- a/src/HelloShop.OrderingService/Extensions/Extensions.cs +++ b/src/HelloShop.OrderingService/Extensions/Extensions.cs @@ -48,6 +48,8 @@ namespace HelloShop.OrderingService.Extensions builder.Services.AddHostedService(); builder.Services.AddHostedService(); + + builder.Services.AddTransient>(); } public static WebApplication MapApplicationEndpoints(this WebApplication app) diff --git a/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj b/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj index b92c965..210ccca 100644 --- a/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj +++ b/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj @@ -8,12 +8,12 @@ - - - + + + runtime; build; native; contentfiles; analyzers; buildtransitive all - + \ No newline at end of file diff --git a/src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs b/src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs new file mode 100644 index 0000000..4e17d4d --- /dev/null +++ b/src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs @@ -0,0 +1,27 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using HelloShop.OrderingService.Entities.EventLogs; +using Microsoft.EntityFrameworkCore.Metadata.Builders; +using Microsoft.EntityFrameworkCore; +using System.Text.Json; +using HelloShop.ServiceDefaults.DistributedEvents.Abstractions; + +namespace HelloShop.OrderingService.Infrastructure.EntityConfigurations.EventLogs +{ + public class DistributedEventLogEntityTypeConfiguration : IEntityTypeConfiguration + { + private static readonly JsonSerializerOptions s_jsonSerializerOptions = new(JsonSerializerDefaults.Web); + + public void Configure(EntityTypeBuilder builder) + { + builder.ToTable("DistributedEventLogs"); + + builder.HasKey(x => x.EventId); + builder.Property(x => x.EventTypeName).HasMaxLength(32); + builder.Property(x => x.Status).HasConversion(); + + builder.Property(x => x.DistributedEvent).HasConversion(v => JsonSerializer.Serialize(v, v.GetType(), s_jsonSerializerOptions), v => JsonSerializer.Deserialize(v, s_jsonSerializerOptions)!); + } + } +} diff --git a/src/HelloShop.OrderingService/Services/DistributedEventService.cs b/src/HelloShop.OrderingService/Services/DistributedEventService.cs new file mode 100644 index 0000000..8d02035 --- /dev/null +++ b/src/HelloShop.OrderingService/Services/DistributedEventService.cs @@ -0,0 +1,89 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using HelloShop.OrderingService.Entities.EventLogs; +using HelloShop.ServiceDefaults.DistributedEvents.Abstractions; +using Microsoft.EntityFrameworkCore; + +namespace HelloShop.OrderingService.Services +{ + public class DistributedEventService(TContext dbContext, IDistributedEventBus distributedEventBus, ILogger> logger) : IDistributedEventService, IDisposable where TContext : DbContext + { + private volatile bool _disposedValue; + + public async Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default) + { + var eventLogEntry = dbContext.Set().Single(ie => ie.EventId == eventId); + + eventLogEntry.Status = status; + + if (status == DistributedEventStatus.InProgress) + { + eventLogEntry.TimesSent++; + } + + await dbContext.SaveChangesAsync(cancellationToken); + } + + public async Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default) + { + var result = await dbContext.Set().Where(e => e.TransactionId == transactionId && e.Status == DistributedEventStatus.NotPublished).ToListAsync(cancellationToken: cancellationToken); + + return result.Count != 0 ? result.OrderBy(o => o.CreationTime) : []; + } + + public async Task AddAndSaveEventAsync(DistributedEvent @event, CancellationToken cancellationToken = default) + { + var transaction = dbContext.Database.CurrentTransaction ?? throw new InvalidOperationException("This method must be called within a transaction scope."); + + var eventLog = new DistributedEventLog(@event, transaction.TransactionId); + + await dbContext.AddAsync(eventLog, cancellationToken); + + await dbContext.SaveChangesAsync(cancellationToken); + } + + public async Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default) + { + var pendingEventLogs = await RetrieveEventLogsPendingToPublishAsync(transactionId, cancellationToken); + + foreach (var eventLog in pendingEventLogs) + { + logger.LogInformation("Publishing integration event {EventId} {DistributedEvent}", eventLog.EventId, eventLog.DistributedEvent); + + try + { + await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.InProgress, cancellationToken); + await distributedEventBus.PublishAsync(eventLog.DistributedEvent, cancellationToken); + await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.Published, cancellationToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Error publishing distributed event {EventId}", eventLog.EventId); + + await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.PublishedFailed, cancellationToken); + } + } + } + + protected virtual void Dispose(bool disposing) + { + if (!_disposedValue) + { + if (disposing) + { + dbContext.Dispose(); + } + + _disposedValue = true; + } + } + + public void Dispose() + { + Dispose(disposing: true); + GC.SuppressFinalize(this); + } + } + +} diff --git a/src/HelloShop.OrderingService/Services/IDistributedEventService.cs b/src/HelloShop.OrderingService/Services/IDistributedEventService.cs new file mode 100644 index 0000000..2e24319 --- /dev/null +++ b/src/HelloShop.OrderingService/Services/IDistributedEventService.cs @@ -0,0 +1,19 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using HelloShop.OrderingService.Entities.EventLogs; +using HelloShop.ServiceDefaults.DistributedEvents.Abstractions; + +namespace HelloShop.OrderingService.Services +{ + public interface IDistributedEventService + { + Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default); + + Task AddAndSaveEventAsync(DistributedEvent @event, CancellationToken cancellationToken = default); + + Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default); + + Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default); + } +} diff --git a/src/HelloShop.ProductService/HelloShop.ProductService.csproj b/src/HelloShop.ProductService/HelloShop.ProductService.csproj index af10394..125b29b 100644 --- a/src/HelloShop.ProductService/HelloShop.ProductService.csproj +++ b/src/HelloShop.ProductService/HelloShop.ProductService.csproj @@ -8,11 +8,11 @@ - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all - + \ No newline at end of file diff --git a/src/HelloShop.ServiceDefaults/HelloShop.ServiceDefaults.csproj b/src/HelloShop.ServiceDefaults/HelloShop.ServiceDefaults.csproj index 955f878..032b2a7 100644 --- a/src/HelloShop.ServiceDefaults/HelloShop.ServiceDefaults.csproj +++ b/src/HelloShop.ServiceDefaults/HelloShop.ServiceDefaults.csproj @@ -8,14 +8,14 @@ - - + + - + diff --git a/tests/HelloShop.BasketService.FunctionalTests/HelloShop.BasketService.FunctionalTests.csproj b/tests/HelloShop.BasketService.FunctionalTests/HelloShop.BasketService.FunctionalTests.csproj index df16895..4f293d5 100644 --- a/tests/HelloShop.BasketService.FunctionalTests/HelloShop.BasketService.FunctionalTests.csproj +++ b/tests/HelloShop.BasketService.FunctionalTests/HelloShop.BasketService.FunctionalTests.csproj @@ -15,15 +15,15 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/HelloShop.BasketService.UnitTests/HelloShop.BasketService.UnitTests.csproj b/tests/HelloShop.BasketService.UnitTests/HelloShop.BasketService.UnitTests.csproj index 0c01a46..131e442 100644 --- a/tests/HelloShop.BasketService.UnitTests/HelloShop.BasketService.UnitTests.csproj +++ b/tests/HelloShop.BasketService.UnitTests/HelloShop.BasketService.UnitTests.csproj @@ -14,9 +14,9 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/HelloShop.FunctionalTests/HelloShop.FunctionalTests.csproj b/tests/HelloShop.FunctionalTests/HelloShop.FunctionalTests.csproj index 5396ee6..a949376 100644 --- a/tests/HelloShop.FunctionalTests/HelloShop.FunctionalTests.csproj +++ b/tests/HelloShop.FunctionalTests/HelloShop.FunctionalTests.csproj @@ -9,13 +9,13 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/HelloShop.ProductService.FunctionalTests/HelloShop.ProductService.FunctionalTests.csproj b/tests/HelloShop.ProductService.FunctionalTests/HelloShop.ProductService.FunctionalTests.csproj index b1e92fe..9068ac2 100644 --- a/tests/HelloShop.ProductService.FunctionalTests/HelloShop.ProductService.FunctionalTests.csproj +++ b/tests/HelloShop.ProductService.FunctionalTests/HelloShop.ProductService.FunctionalTests.csproj @@ -14,10 +14,10 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - + + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/HelloShop.ProductService.UnitTests/HelloShop.ProductService.UnitTests.csproj b/tests/HelloShop.ProductService.UnitTests/HelloShop.ProductService.UnitTests.csproj index 1e39637..0454e9f 100644 --- a/tests/HelloShop.ProductService.UnitTests/HelloShop.ProductService.UnitTests.csproj +++ b/tests/HelloShop.ProductService.UnitTests/HelloShop.ProductService.UnitTests.csproj @@ -14,10 +14,10 @@ all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - + + + + all runtime; build; native; contentfiles; analyzers; buildtransitive