From f007572e7ff1693ce2a3169d4dc1f493eb5adf81 Mon Sep 17 00:00:00 2001 From: hello Date: Fri, 28 Mar 2025 22:27:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BA=8B=E5=8A=A1=E6=80=A7=E5=8F=91=E4=BB=B6?= =?UTF-8?q?=E7=AE=B1=E6=A8=A1=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Directory.Packages.props | 2 + HelloShop.sln | 7 + .../DistributedEventLog.cs | 4 +- .../DistributedEventLogExtensions.cs | 20 + .../DistributedEventLogService.cs | 51 +++ .../DistributedEventWorker.cs | 54 +++ .../EventLogEntityTypeConfiguration.cs | 9 +- .../HelloShop.EventBus.Logging.csproj | 17 + .../IDistributedEventLogService.cs | 23 ++ .../ResilientTransaction.cs | 23 ++ .../RabbitMQEventBusOptions.cs | 2 +- .../Behaviors/TransactionBehavior.cs | 2 +- .../Extensions/Extensions.cs | 3 +- .../HelloShop.OrderingService.csproj | 1 + .../HelloShop.OrderingService.http | 27 +- ...0250328130944_AddEventLogTable.Designer.cs | 347 ++++++++++++++++++ .../20250328130944_AddEventLogTable.cs | 41 +++ .../OrderingServiceDbContextModelSnapshot.cs | 82 ++--- .../OrderingServiceDbContext.cs | 2 + .../Services/DistributedEventService.cs | 90 ++--- .../Services/IDistributedEventService.cs | 7 +- ...aitingValidationDistributedEventHandler.cs | 6 +- .../HelloShop.ProductService.csproj | 1 + ...0250328131028_AddEventLogTable.Designer.cs | 155 ++++++++ .../20250328131028_AddEventLogTable.cs | 41 +++ .../ProductServiceDbContextModelSnapshot.cs | 41 +++ .../Infrastructure/ProductServiceDbContext.cs | 2 + src/HelloShop.ProductService/Program.cs | 2 + .../Services/DistributedEventService.cs | 37 ++ .../Services/IDistributedEventService.cs | 14 + 30 files changed, 978 insertions(+), 135 deletions(-) rename {src/HelloShop.OrderingService/Entities/EventLogs => libraries/HelloShop.EventBus.Logging}/DistributedEventLog.cs (90%) create mode 100644 libraries/HelloShop.EventBus.Logging/DistributedEventLogExtensions.cs create mode 100644 libraries/HelloShop.EventBus.Logging/DistributedEventLogService.cs create mode 100644 libraries/HelloShop.EventBus.Logging/DistributedEventWorker.cs rename src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs => libraries/HelloShop.EventBus.Logging/EventLogEntityTypeConfiguration.cs (78%) create mode 100644 libraries/HelloShop.EventBus.Logging/HelloShop.EventBus.Logging.csproj create mode 100644 libraries/HelloShop.EventBus.Logging/IDistributedEventLogService.cs create mode 100644 libraries/HelloShop.EventBus.Logging/ResilientTransaction.cs create mode 100644 src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.Designer.cs create mode 100644 src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.cs create mode 100644 src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.Designer.cs create mode 100644 src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.cs create mode 100644 src/HelloShop.ProductService/Services/DistributedEventService.cs create mode 100644 src/HelloShop.ProductService/Services/IDistributedEventService.cs diff --git a/Directory.Packages.props b/Directory.Packages.props index adc1b31..0cd8288 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -29,9 +29,11 @@ + + diff --git a/HelloShop.sln b/HelloShop.sln index 31a6e5c..4790144 100644 --- a/HelloShop.sln +++ b/HelloShop.sln @@ -45,6 +45,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.DistributedLock", EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.DistributedLock.Dapr", "libraries\HelloShop.DistributedLock.Dapr\HelloShop.DistributedLock.Dapr.csproj", "{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.EventBus.Logging", "libraries\HelloShop.EventBus.Logging\HelloShop.EventBus.Logging.csproj", "{ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -125,6 +127,10 @@ Global {37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Debug|Any CPU.Build.0 = Debug|Any CPU {37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Release|Any CPU.ActiveCfg = Release|Any CPU {37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Release|Any CPU.Build.0 = Release|Any CPU + {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -148,6 +154,7 @@ Global {5D403BF6-9267-4B0F-AEB3-2143C8BBA8CD} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} {86729635-8E31-4C53-81AE-7C410C848219} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} {37F01A0B-50D6-48BA-8A20-FBADB5524CD7} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} + {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {845545A8-2006-46C3-ABD7-5BDF63F3858C} diff --git a/src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs b/libraries/HelloShop.EventBus.Logging/DistributedEventLog.cs similarity index 90% rename from src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs rename to libraries/HelloShop.EventBus.Logging/DistributedEventLog.cs index d6d11a7..f3b8f3c 100644 --- a/src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs +++ b/libraries/HelloShop.EventBus.Logging/DistributedEventLog.cs @@ -4,7 +4,7 @@ using HelloShop.EventBus.Abstractions; using System.Diagnostics.CodeAnalysis; -namespace HelloShop.OrderingService.Entities.EventLogs +namespace HelloShop.EventBus.Logging { public enum DistributedEventStatus { NotPublished, InProgress, Published, PublishedFailed } @@ -20,7 +20,7 @@ namespace HelloShop.OrderingService.Entities.EventLogs public int TimesSent { get; set; } - public DateTimeOffset CreationTime { get; set; } = DateTimeOffset.UtcNow; + public DateTimeOffset CreationTime { get; init; } = TimeProvider.System.GetUtcNow(); public required Guid TransactionId { get; set; } diff --git a/libraries/HelloShop.EventBus.Logging/DistributedEventLogExtensions.cs b/libraries/HelloShop.EventBus.Logging/DistributedEventLogExtensions.cs new file mode 100644 index 0000000..10def70 --- /dev/null +++ b/libraries/HelloShop.EventBus.Logging/DistributedEventLogExtensions.cs @@ -0,0 +1,20 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using System.Diagnostics.CodeAnalysis; + +namespace HelloShop.EventBus.Logging +{ + public static class DistributedEventLogExtensions + { + public static void UseDistributedEventLogs(this ModelBuilder builder) => builder.ApplyConfiguration(new EventLogEntityTypeConfiguration()); + + public static IServiceCollection AddDistributedEventLogs([NotNull] this IServiceCollection services) where TContext : DbContext + { + services.AddTransient>().AddHostedService(); + return services; + } + } +} diff --git a/libraries/HelloShop.EventBus.Logging/DistributedEventLogService.cs b/libraries/HelloShop.EventBus.Logging/DistributedEventLogService.cs new file mode 100644 index 0000000..b8b36e7 --- /dev/null +++ b/libraries/HelloShop.EventBus.Logging/DistributedEventLogService.cs @@ -0,0 +1,51 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using HelloShop.EventBus.Abstractions; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; + +namespace HelloShop.EventBus.Logging +{ + public class DistributedEventLogService(TContext dbContext) : IDistributedEventLogService where TContext : DbContext + { + public async Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default) + { + return await dbContext.Set().Where(e => e.TransactionId == transactionId && e.Status == DistributedEventStatus.NotPublished).ToListAsync(cancellationToken: cancellationToken); + } + + public async Task> RetrieveEventLogsFailedToPublishAsync(CancellationToken cancellationToken = default) + { + return await dbContext.Set().Where(e => e.Status == DistributedEventStatus.PublishedFailed).ToListAsync(cancellationToken); + } + + public async Task SaveEventAsync(DistributedEvent @event, IDbContextTransaction transaction, CancellationToken cancellationToken = default) + { + var eventLog = new DistributedEventLog(@event, transaction.TransactionId); + dbContext.Database.UseTransaction(transaction.GetDbTransaction()); + await dbContext.Set().AddAsync(eventLog, cancellationToken); + + await dbContext.SaveChangesAsync(cancellationToken); + } + + public async Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default) + { + var eventLogEntry = dbContext.Set().Single(ie => ie.EventId == eventId); + + eventLogEntry.Status = status; + + if (status == DistributedEventStatus.InProgress) + { + eventLogEntry.TimesSent++; + } + + await dbContext.SaveChangesAsync(cancellationToken); + } + + public Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.Published, cancellationToken); + + public Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.InProgress, cancellationToken); + + public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.PublishedFailed, cancellationToken); + } +} diff --git a/libraries/HelloShop.EventBus.Logging/DistributedEventWorker.cs b/libraries/HelloShop.EventBus.Logging/DistributedEventWorker.cs new file mode 100644 index 0000000..2a04dfb --- /dev/null +++ b/libraries/HelloShop.EventBus.Logging/DistributedEventWorker.cs @@ -0,0 +1,54 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + + +using HelloShop.EventBus.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace HelloShop.EventBus.Logging +{ + public class DistributedEventWorker(IServiceScopeFactory serviceScopeFactory) : BackgroundService + { + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + using var scope = serviceScopeFactory.CreateScope(); + + var eventBus = scope.ServiceProvider.GetRequiredService(); + var eventLogService = scope.ServiceProvider.GetRequiredService(); + var logger = scope.ServiceProvider.GetRequiredService>(); + + try + { + var failedEventLogs = await eventLogService.RetrieveEventLogsFailedToPublishAsync(stoppingToken); + + foreach (var eventLog in failedEventLogs) + { + DistributedEvent @event = eventLog.DistributedEvent; + + try + { + await eventLogService.MarkEventAsInProgressAsync(@event.Id, stoppingToken); + await eventBus.PublishAsync(@event, stoppingToken); + await eventLogService.MarkEventAsPublishedAsync(@event.Id, stoppingToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Publish through event bus failed for {EventId}.", @event.Id); + await eventLogService.MarkEventAsFailedAsync(@event.Id, stoppingToken); + } + } + } + catch (Exception ex) + { + logger.LogError(ex, "An error occurred while retrieving failed event logs."); + } + + await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken); + } + } + } +} diff --git a/src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs b/libraries/HelloShop.EventBus.Logging/EventLogEntityTypeConfiguration.cs similarity index 78% rename from src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs rename to libraries/HelloShop.EventBus.Logging/EventLogEntityTypeConfiguration.cs index 35b00a5..0cbe1f6 100644 --- a/src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs +++ b/libraries/HelloShop.EventBus.Logging/EventLogEntityTypeConfiguration.cs @@ -2,24 +2,23 @@ // See the license file in the project root for more information. using HelloShop.EventBus.Abstractions; -using HelloShop.OrderingService.Entities.EventLogs; using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata.Builders; using System.Text.Json; -namespace HelloShop.OrderingService.Infrastructure.EntityConfigurations.EventLogs +namespace HelloShop.EventBus.Logging { - public class DistributedEventLogEntityTypeConfiguration : IEntityTypeConfiguration + public class EventLogEntityTypeConfiguration : IEntityTypeConfiguration { private static readonly JsonSerializerOptions s_jsonSerializerOptions = new(JsonSerializerDefaults.Web); public void Configure(EntityTypeBuilder builder) { builder.HasKey(x => x.EventId); - builder.Property(x => x.EventTypeName).HasMaxLength(32); + builder.Property(x => x.EventTypeName).HasMaxLength(64); builder.Property(x => x.Status).HasConversion(); builder.Property(x => x.DistributedEvent).HasConversion(v => JsonSerializer.Serialize(v, v.GetType(), s_jsonSerializerOptions), v => JsonSerializer.Deserialize(v, s_jsonSerializerOptions)!); } } -} +} \ No newline at end of file diff --git a/libraries/HelloShop.EventBus.Logging/HelloShop.EventBus.Logging.csproj b/libraries/HelloShop.EventBus.Logging/HelloShop.EventBus.Logging.csproj new file mode 100644 index 0000000..8794144 --- /dev/null +++ b/libraries/HelloShop.EventBus.Logging/HelloShop.EventBus.Logging.csproj @@ -0,0 +1,17 @@ + + + + net9.0 + enable + enable + + + + + + + + + + + diff --git a/libraries/HelloShop.EventBus.Logging/IDistributedEventLogService.cs b/libraries/HelloShop.EventBus.Logging/IDistributedEventLogService.cs new file mode 100644 index 0000000..50afb91 --- /dev/null +++ b/libraries/HelloShop.EventBus.Logging/IDistributedEventLogService.cs @@ -0,0 +1,23 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using HelloShop.EventBus.Abstractions; +using Microsoft.EntityFrameworkCore.Storage; + +namespace HelloShop.EventBus.Logging +{ + public interface IDistributedEventLogService + { + Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default); + + Task> RetrieveEventLogsFailedToPublishAsync(CancellationToken cancellationToken = default); + + Task SaveEventAsync(DistributedEvent @event, IDbContextTransaction transaction, CancellationToken cancellationToken = default); + + Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default); + + Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default); + + Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default); + } +} \ No newline at end of file diff --git a/libraries/HelloShop.EventBus.Logging/ResilientTransaction.cs b/libraries/HelloShop.EventBus.Logging/ResilientTransaction.cs new file mode 100644 index 0000000..7ad0c29 --- /dev/null +++ b/libraries/HelloShop.EventBus.Logging/ResilientTransaction.cs @@ -0,0 +1,23 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using Microsoft.EntityFrameworkCore; + +namespace HelloShop.EventBus.Logging +{ + public class ResilientTransaction(DbContext dbContext) + { + public static ResilientTransaction New(DbContext context) => new(context); + + public async Task ExecuteAsync(Func action) + { + var strategy = dbContext.Database.CreateExecutionStrategy(); + await strategy.ExecuteAsync(async () => + { + await using var transaction = await dbContext.Database.BeginTransactionAsync(); + await action(); + await transaction.CommitAsync(); + }); + } + } +} diff --git a/libraries/HelloShop.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs b/libraries/HelloShop.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs index e679898..16d5d50 100644 --- a/libraries/HelloShop.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs +++ b/libraries/HelloShop.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs @@ -7,7 +7,7 @@ namespace HelloShop.EventBus.RabbitMQ { public string ExchangeName { get; set; } = "event-bus-exchange"; - public required string QueueName { get; set; } + public required string QueueName { get; set; } = "event-bus-queue"; public int RetryCount { get; set; } = 10; } diff --git a/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs b/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs index 568e4ac..dfecb59 100644 --- a/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs +++ b/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs @@ -41,7 +41,7 @@ namespace HelloShop.OrderingService.Behaviors _logger.LogInformation("Commit transaction {TransactionId} for {CommandName}", transaction.TransactionId, typeName); } - + await dbContext.SaveChangesAsync(cancellationToken); await transaction.CommitAsync(); await eventService.PublishEventsThroughEventBusAsync(transaction.TransactionId, cancellationToken); diff --git a/src/HelloShop.OrderingService/Extensions/Extensions.cs b/src/HelloShop.OrderingService/Extensions/Extensions.cs index f81a088..d564458 100644 --- a/src/HelloShop.OrderingService/Extensions/Extensions.cs +++ b/src/HelloShop.OrderingService/Extensions/Extensions.cs @@ -3,6 +3,7 @@ using HelloShop.EventBus.Abstractions; using HelloShop.EventBus.Dapr; +using HelloShop.EventBus.Logging; using HelloShop.OrderingService.Behaviors; using HelloShop.OrderingService.Constants; using HelloShop.OrderingService.Infrastructure; @@ -67,7 +68,7 @@ namespace HelloShop.OrderingService.Extensions builder.Services.AddHostedService(); builder.Services.AddHostedService(); - builder.Services.AddTransient>(); + builder.Services.AddDistributedEventLogs().AddTransient(); builder.Services.AddOpenApi(); diff --git a/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj b/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj index 5ad96cf..e0ddc97 100644 --- a/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj +++ b/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj @@ -6,6 +6,7 @@ + diff --git a/src/HelloShop.OrderingService/HelloShop.OrderingService.http b/src/HelloShop.OrderingService/HelloShop.OrderingService.http index 900eb89..79147f5 100644 --- a/src/HelloShop.OrderingService/HelloShop.OrderingService.http +++ b/src/HelloShop.OrderingService/HelloShop.OrderingService.http @@ -1,9 +1,12 @@ -@HelloShop.OrderingService_HostAddress = https://localhost:8105 +@HelloWorld.OrderingService_HostAddress = https://localhost:8105 +@AccessToken = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3NDI4Njg1ODAsImV4cCI6MTc2NDkwMDU4MCwiaWF0IjoxNzQyODY4NTgwfQ.6Wm6S4CNtKi9lGqxam4_ZnDebnTXVxycDubbv0DLy2c -POST {{HelloShop.OrderingService_HostAddress}}/api/orders +### + +POST {{HelloWorld.OrderingService_HostAddress}}/api/orders Content-Type: application/json -x-request-id: 8ddc1737-6a31-4b4f-bccd-63d0966348a1 -Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4 +x-request-id: 1ddc1737-6a32-4b4f-bccd-63d0966348a3 +Authorization : Bearer {{AccessToken}} { "country": "USA", @@ -36,12 +39,20 @@ Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwi ### -GET {{HelloShop.OrderingService_HostAddress}}/api/orders +GET {{HelloWorld.OrderingService_HostAddress}}/api/orders Content-Type: application/json -Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4 +Authorization : Bearer {{AccessToken}} ### -GET {{HelloShop.OrderingService_HostAddress}}/api/orders/1 +GET {{HelloWorld.OrderingService_HostAddress}}/api/orders/1 Content-Type: application/json -Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4 +Authorization : Bearer {{AccessToken}} + +### + + +PUT {{HelloWorld.OrderingService_HostAddress}}/api/orders/ship/1 +Content-Type: application/json +x-request-id: 1ddc1737-6a32-4b4f-bccd-63d0966348a1 +Authorization : Bearer {{AccessToken}} diff --git a/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.Designer.cs b/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.Designer.cs new file mode 100644 index 0000000..9f3f429 --- /dev/null +++ b/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.Designer.cs @@ -0,0 +1,347 @@ +// +using System; +using HelloShop.OrderingService.Infrastructure; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace HelloShop.OrderingService.Infrastructure.Migrations +{ + [DbContext(typeof(OrderingServiceDbContext))] + [Migration("20250328130944_AddEventLogTable")] + partial class AddEventLogTable + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "9.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b => + { + b.Property("EventId") + .ValueGeneratedOnAdd() + .HasColumnType("uuid") + .HasColumnName("event_id"); + + b.Property("CreationTime") + .HasColumnType("timestamp with time zone") + .HasColumnName("creation_time"); + + b.Property("DistributedEvent") + .IsRequired() + .HasColumnType("text") + .HasColumnName("distributed_event"); + + b.Property("EventTypeName") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)") + .HasColumnName("event_type_name"); + + b.Property("Status") + .IsRequired() + .HasColumnType("text") + .HasColumnName("status"); + + b.Property("TimesSent") + .HasColumnType("integer") + .HasColumnName("times_sent"); + + b.Property("TransactionId") + .HasColumnType("uuid") + .HasColumnName("transaction_id"); + + b.HasKey("EventId") + .HasName("pk_distributed_event_log"); + + b.ToTable("distributed_event_log", (string)null); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("Name") + .IsRequired() + .HasMaxLength(16) + .HasColumnType("character varying(16)") + .HasColumnName("name"); + + b.HasKey("Id") + .HasName("pk_buyer"); + + b.ToTable("buyer", (string)null); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("Alias") + .IsRequired() + .HasMaxLength(16) + .HasColumnType("character varying(16)") + .HasColumnName("alias"); + + b.Property("BuyerId") + .HasColumnType("integer") + .HasColumnName("buyer_id"); + + b.Property("CardHolderName") + .IsRequired() + .HasMaxLength(16) + .HasColumnType("character varying(16)") + .HasColumnName("card_holder_name"); + + b.Property("CardNumber") + .IsRequired() + .HasMaxLength(16) + .HasColumnType("character varying(16)") + .HasColumnName("card_number"); + + b.Property("Expiration") + .HasColumnType("timestamp with time zone") + .HasColumnName("expiration"); + + b.Property("SecurityNumber") + .HasMaxLength(6) + .HasColumnType("character varying(6)") + .HasColumnName("security_number"); + + b.HasKey("Id") + .HasName("pk_payment_method"); + + b.HasIndex("BuyerId") + .HasDatabaseName("ix_payment_method_buyer_id"); + + b.ToTable("payment_method", (string)null); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Idempotency.ClientRequest", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("uuid") + .HasColumnName("id"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)") + .HasColumnName("name"); + + b.Property("Time") + .HasColumnType("timestamp with time zone") + .HasColumnName("time"); + + b.HasKey("Id") + .HasName("pk_client_request"); + + b.ToTable("client_request", (string)null); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("BuyerId") + .HasColumnType("integer") + .HasColumnName("buyer_id"); + + b.Property("Description") + .HasMaxLength(64) + .HasColumnType("character varying(64)") + .HasColumnName("description"); + + b.Property("OrderDate") + .HasColumnType("timestamp with time zone") + .HasColumnName("order_date"); + + b.Property("OrderStatus") + .IsRequired() + .HasColumnType("text") + .HasColumnName("order_status"); + + b.Property("PaymentMethodId") + .HasColumnType("integer") + .HasColumnName("payment_method_id"); + + b.HasKey("Id") + .HasName("pk_order"); + + b.HasIndex("BuyerId") + .HasDatabaseName("ix_order_buyer_id"); + + b.HasIndex("PaymentMethodId") + .HasDatabaseName("ix_order_payment_method_id"); + + b.ToTable("order", (string)null); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.OrderItem", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("OrderId") + .HasColumnType("integer") + .HasColumnName("order_id"); + + b.Property("PictureUrl") + .IsRequired() + .HasMaxLength(256) + .HasColumnType("character varying(256)") + .HasColumnName("picture_url"); + + b.Property("ProductId") + .HasColumnType("integer") + .HasColumnName("product_id"); + + b.Property("ProductName") + .IsRequired() + .HasMaxLength(16) + .HasColumnType("character varying(16)") + .HasColumnName("product_name"); + + b.Property("UnitPrice") + .HasColumnType("numeric") + .HasColumnName("unit_price"); + + b.Property("Units") + .HasColumnType("integer") + .HasColumnName("units"); + + b.HasKey("Id") + .HasName("pk_order_item"); + + b.HasIndex("OrderId") + .HasDatabaseName("ix_order_item_order_id"); + + b.ToTable("order_item", (string)null); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", b => + { + b.HasOne("HelloShop.OrderingService.Entities.Buyers.Buyer", null) + .WithMany("PaymentMethods") + .HasForeignKey("BuyerId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired() + .HasConstraintName("fk_payment_method_buyer_buyer_id"); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b => + { + b.HasOne("HelloShop.OrderingService.Entities.Buyers.Buyer", null) + .WithMany() + .HasForeignKey("BuyerId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired() + .HasConstraintName("fk_order_buyer_buyer_id"); + + b.HasOne("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", null) + .WithMany() + .HasForeignKey("PaymentMethodId") + .OnDelete(DeleteBehavior.Restrict) + .HasConstraintName("fk_order_payment_method_payment_method_id"); + + b.OwnsOne("HelloShop.OrderingService.Entities.Orders.Address", "Address", b1 => + { + b1.Property("OrderId") + .HasColumnType("integer") + .HasColumnName("id"); + + b1.Property("City") + .IsRequired() + .HasMaxLength(16) + .HasColumnType("character varying(16)") + .HasColumnName("City"); + + b1.Property("Country") + .IsRequired() + .HasMaxLength(8) + .HasColumnType("character varying(8)") + .HasColumnName("Country"); + + b1.Property("State") + .IsRequired() + .HasMaxLength(16) + .HasColumnType("character varying(16)") + .HasColumnName("State"); + + b1.Property("Street") + .IsRequired() + .HasMaxLength(32) + .HasColumnType("character varying(32)") + .HasColumnName("Street"); + + b1.Property("ZipCode") + .IsRequired() + .HasMaxLength(6) + .HasColumnType("character varying(6)") + .HasColumnName("ZipCode"); + + b1.HasKey("OrderId"); + + b1.ToTable("order"); + + b1.WithOwner() + .HasForeignKey("OrderId") + .HasConstraintName("fk_order_order_id"); + }); + + b.Navigation("Address") + .IsRequired(); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.OrderItem", b => + { + b.HasOne("HelloShop.OrderingService.Entities.Orders.Order", null) + .WithMany("OrderItems") + .HasForeignKey("OrderId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired() + .HasConstraintName("fk_order_item_order_order_id"); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b => + { + b.Navigation("PaymentMethods"); + }); + + modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b => + { + b.Navigation("OrderItems"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.cs b/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.cs new file mode 100644 index 0000000..44e1344 --- /dev/null +++ b/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.cs @@ -0,0 +1,41 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace HelloShop.OrderingService.Infrastructure.Migrations +{ + /// + public partial class AddEventLogTable : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.AlterColumn( + name: "event_type_name", + table: "distributed_event_log", + type: "character varying(64)", + maxLength: 64, + nullable: false, + oldClrType: typeof(string), + oldType: "character varying(32)", + oldMaxLength: 32); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.AlterColumn( + name: "event_type_name", + table: "distributed_event_log", + type: "character varying(32)", + maxLength: 32, + nullable: false, + oldClrType: typeof(string), + oldType: "character varying(64)", + oldMaxLength: 64); + } + } +} diff --git a/src/HelloShop.OrderingService/Infrastructure/Migrations/OrderingServiceDbContextModelSnapshot.cs b/src/HelloShop.OrderingService/Infrastructure/Migrations/OrderingServiceDbContextModelSnapshot.cs index ac2a830..958bf51 100644 --- a/src/HelloShop.OrderingService/Infrastructure/Migrations/OrderingServiceDbContextModelSnapshot.cs +++ b/src/HelloShop.OrderingService/Infrastructure/Migrations/OrderingServiceDbContextModelSnapshot.cs @@ -22,6 +22,47 @@ namespace HelloShop.OrderingService.Infrastructure.Migrations NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b => + { + b.Property("EventId") + .ValueGeneratedOnAdd() + .HasColumnType("uuid") + .HasColumnName("event_id"); + + b.Property("CreationTime") + .HasColumnType("timestamp with time zone") + .HasColumnName("creation_time"); + + b.Property("DistributedEvent") + .IsRequired() + .HasColumnType("text") + .HasColumnName("distributed_event"); + + b.Property("EventTypeName") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)") + .HasColumnName("event_type_name"); + + b.Property("Status") + .IsRequired() + .HasColumnType("text") + .HasColumnName("status"); + + b.Property("TimesSent") + .HasColumnType("integer") + .HasColumnName("times_sent"); + + b.Property("TransactionId") + .HasColumnType("uuid") + .HasColumnName("transaction_id"); + + b.HasKey("EventId") + .HasName("pk_distributed_event_log"); + + b.ToTable("distributed_event_log", (string)null); + }); + modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b => { b.Property("Id") @@ -92,47 +133,6 @@ namespace HelloShop.OrderingService.Infrastructure.Migrations b.ToTable("payment_method", (string)null); }); - modelBuilder.Entity("HelloShop.OrderingService.Entities.EventLogs.DistributedEventLog", b => - { - b.Property("EventId") - .ValueGeneratedOnAdd() - .HasColumnType("uuid") - .HasColumnName("event_id"); - - b.Property("CreationTime") - .HasColumnType("timestamp with time zone") - .HasColumnName("creation_time"); - - b.Property("DistributedEvent") - .IsRequired() - .HasColumnType("text") - .HasColumnName("distributed_event"); - - b.Property("EventTypeName") - .IsRequired() - .HasMaxLength(32) - .HasColumnType("character varying(32)") - .HasColumnName("event_type_name"); - - b.Property("Status") - .IsRequired() - .HasColumnType("text") - .HasColumnName("status"); - - b.Property("TimesSent") - .HasColumnType("integer") - .HasColumnName("times_sent"); - - b.Property("TransactionId") - .HasColumnType("uuid") - .HasColumnName("transaction_id"); - - b.HasKey("EventId") - .HasName("pk_distributed_event_log"); - - b.ToTable("distributed_event_log", (string)null); - }); - modelBuilder.Entity("HelloShop.OrderingService.Entities.Idempotency.ClientRequest", b => { b.Property("Id") diff --git a/src/HelloShop.OrderingService/Infrastructure/OrderingServiceDbContext.cs b/src/HelloShop.OrderingService/Infrastructure/OrderingServiceDbContext.cs index 16b3ed6..aad3043 100644 --- a/src/HelloShop.OrderingService/Infrastructure/OrderingServiceDbContext.cs +++ b/src/HelloShop.OrderingService/Infrastructure/OrderingServiceDbContext.cs @@ -1,6 +1,7 @@ // Copyright (c) HelloShop Corporation. All rights reserved. // See the license file in the project root for more information. +using HelloShop.EventBus.Logging; using Microsoft.EntityFrameworkCore; using System.Reflection; @@ -13,6 +14,7 @@ namespace HelloShop.OrderingService.Infrastructure base.OnModelCreating(builder); builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly()); + builder.UseDistributedEventLogs(); } } } diff --git a/src/HelloShop.OrderingService/Services/DistributedEventService.cs b/src/HelloShop.OrderingService/Services/DistributedEventService.cs index 5d6ce0e..99c4aa3 100644 --- a/src/HelloShop.OrderingService/Services/DistributedEventService.cs +++ b/src/HelloShop.OrderingService/Services/DistributedEventService.cs @@ -2,88 +2,42 @@ // See the license file in the project root for more information. using HelloShop.EventBus.Abstractions; -using HelloShop.OrderingService.Entities.EventLogs; -using Microsoft.EntityFrameworkCore; +using HelloShop.EventBus.Logging; +using HelloShop.OrderingService.Infrastructure; namespace HelloShop.OrderingService.Services { - public class DistributedEventService(TContext dbContext, IEventBus distributedEventBus, ILogger> logger) : IDistributedEventService, IDisposable where TContext : DbContext + public class DistributedEventService(ILogger logger, IEventBus eventBus, OrderingServiceDbContext dbContext, IDistributedEventLogService eventLogService) : IDistributedEventService { - private volatile bool _disposedValue; - - public async Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default) + public async Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default) { - var eventLogEntry = dbContext.Set().Single(ie => ie.EventId == eventId); + var pendingLogEvents = await eventLogService.RetrieveEventLogsPendingToPublishAsync(transactionId, cancellationToken); - eventLogEntry.Status = status; - - if (status == DistributedEventStatus.InProgress) + foreach (var eventLog in pendingLogEvents) { - eventLogEntry.TimesSent++; + logger.LogInformation("Publishing distributed event: {EventId} from {EventType}.", eventLog.EventId, eventLog.EventTypeName); + + try + { + await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, cancellationToken); + await eventBus.PublishAsync(eventLog.DistributedEvent, cancellationToken); + await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, cancellationToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Error publishing distributed event: {EventId}.", eventLog.EventId); + await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, cancellationToken); + } } - - await dbContext.SaveChangesAsync(cancellationToken); - } - - public async Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default) - { - var result = await dbContext.Set().Where(e => e.TransactionId == transactionId && e.Status == DistributedEventStatus.NotPublished).ToListAsync(cancellationToken: cancellationToken); - - return result.Count != 0 ? result.OrderBy(o => o.CreationTime) : []; } public async Task AddAndSaveEventAsync(DistributedEvent @event, CancellationToken cancellationToken = default) { - var transaction = dbContext.Database.CurrentTransaction ?? throw new InvalidOperationException("This method must be called within a transaction scope."); + logger.LogInformation("Creating and saving distributed event: {EventId} from {EventType}.", @event.Id, @event.GetType().Name); - var eventLog = new DistributedEventLog(@event, transaction.TransactionId); + var transaction = dbContext.Database.CurrentTransaction ?? await dbContext.Database.BeginTransactionAsync(cancellationToken); - await dbContext.AddAsync(eventLog, cancellationToken); - - await dbContext.SaveChangesAsync(cancellationToken); - } - - public async Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default) - { - var pendingEventLogs = await RetrieveEventLogsPendingToPublishAsync(transactionId, cancellationToken); - - foreach (var eventLog in pendingEventLogs) - { - logger.LogInformation("Publishing integration event {EventId} {DistributedEvent}", eventLog.EventId, eventLog.DistributedEvent); - - try - { - await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.InProgress, cancellationToken); - await distributedEventBus.PublishAsync(eventLog.DistributedEvent, cancellationToken); - await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.Published, cancellationToken); - } - catch (Exception ex) - { - logger.LogError(ex, "Error publishing distributed event {EventId}", eventLog.EventId); - - await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.PublishedFailed, cancellationToken); - } - } - } - - protected virtual void Dispose(bool disposing) - { - if (!_disposedValue) - { - if (disposing) - { - dbContext.Dispose(); - } - - _disposedValue = true; - } - } - - public void Dispose() - { - Dispose(disposing: true); - GC.SuppressFinalize(this); + await eventLogService.SaveEventAsync(@event, transaction, cancellationToken); } } - } diff --git a/src/HelloShop.OrderingService/Services/IDistributedEventService.cs b/src/HelloShop.OrderingService/Services/IDistributedEventService.cs index 40f70b8..3e7038d 100644 --- a/src/HelloShop.OrderingService/Services/IDistributedEventService.cs +++ b/src/HelloShop.OrderingService/Services/IDistributedEventService.cs @@ -2,18 +2,13 @@ // See the license file in the project root for more information. using HelloShop.EventBus.Abstractions; -using HelloShop.OrderingService.Entities.EventLogs; namespace HelloShop.OrderingService.Services { public interface IDistributedEventService { - Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default); + Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default); Task AddAndSaveEventAsync(DistributedEvent @event, CancellationToken cancellationToken = default); - - Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default); - - Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default); } } diff --git a/src/HelloShop.ProductService/DistributedEvents/EventHandling/OrderAwaitingValidationDistributedEventHandler.cs b/src/HelloShop.ProductService/DistributedEvents/EventHandling/OrderAwaitingValidationDistributedEventHandler.cs index 46e6cc6..a34a217 100644 --- a/src/HelloShop.ProductService/DistributedEvents/EventHandling/OrderAwaitingValidationDistributedEventHandler.cs +++ b/src/HelloShop.ProductService/DistributedEvents/EventHandling/OrderAwaitingValidationDistributedEventHandler.cs @@ -6,10 +6,11 @@ using HelloShop.EventBus.Abstractions; using HelloShop.ProductService.DistributedEvents.Events; using HelloShop.ProductService.Entities.Products; using HelloShop.ProductService.Infrastructure; +using HelloShop.ProductService.Services; namespace HelloShop.ProductService.DistributedEvents.EventHandling { - public class OrderAwaitingValidationDistributedEventHandler(ProductServiceDbContext dbContext, IEventBus distributedEventBus, IDistributedLock distributedLock, ILogger logger) : IDistributedEventHandler + public class OrderAwaitingValidationDistributedEventHandler(ProductServiceDbContext dbContext, IDistributedEventService distributedEventService, IDistributedLock distributedLock, ILogger logger) : IDistributedEventHandler { public async Task HandleAsync(OrderAwaitingValidationDistributedEvent @event) { @@ -30,7 +31,8 @@ namespace HelloShop.ProductService.DistributedEvents.EventHandling DistributedEvent confirmedEvent = confirmedOrderStockItems.All(c => c.Value) ? new OrderStockConfirmedDistributedEvent(@event.OrderId) : new OrderStockRejectedDistributedEvent(@event.OrderId); - await distributedEventBus.PublishAsync(confirmedEvent); + await distributedEventService.SaveEventAndDbContextChangesAsync(confirmedEvent); + await distributedEventService.PublishThroughEventBusAsync(confirmedEvent); } } } diff --git a/src/HelloShop.ProductService/HelloShop.ProductService.csproj b/src/HelloShop.ProductService/HelloShop.ProductService.csproj index 9795c77..c10b914 100644 --- a/src/HelloShop.ProductService/HelloShop.ProductService.csproj +++ b/src/HelloShop.ProductService/HelloShop.ProductService.csproj @@ -7,6 +7,7 @@ + diff --git a/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.Designer.cs b/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.Designer.cs new file mode 100644 index 0000000..5d52a36 --- /dev/null +++ b/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.Designer.cs @@ -0,0 +1,155 @@ +// +using System; +using HelloShop.ProductService.Infrastructure; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Infrastructure; +using Microsoft.EntityFrameworkCore.Migrations; +using Microsoft.EntityFrameworkCore.Storage.ValueConversion; +using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata; + +#nullable disable + +namespace HelloShop.ProductService.Infrastructure.Migrations +{ + [DbContext(typeof(ProductServiceDbContext))] + [Migration("20250328131028_AddEventLogTable")] + partial class AddEventLogTable + { + /// + protected override void BuildTargetModel(ModelBuilder modelBuilder) + { +#pragma warning disable 612, 618 + modelBuilder + .HasAnnotation("ProductVersion", "9.0.3") + .HasAnnotation("Relational:MaxIdentifierLength", 63); + + NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + + modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b => + { + b.Property("EventId") + .ValueGeneratedOnAdd() + .HasColumnType("uuid") + .HasColumnName("event_id"); + + b.Property("CreationTime") + .HasColumnType("timestamp with time zone") + .HasColumnName("creation_time"); + + b.Property("DistributedEvent") + .IsRequired() + .HasColumnType("text") + .HasColumnName("distributed_event"); + + b.Property("EventTypeName") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)") + .HasColumnName("event_type_name"); + + b.Property("Status") + .IsRequired() + .HasColumnType("text") + .HasColumnName("status"); + + b.Property("TimesSent") + .HasColumnType("integer") + .HasColumnName("times_sent"); + + b.Property("TransactionId") + .HasColumnType("uuid") + .HasColumnName("transaction_id"); + + b.HasKey("EventId") + .HasName("pk_distributed_event_log"); + + b.ToTable("distributed_event_log", (string)null); + }); + + modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Brand", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("Name") + .IsRequired() + .HasMaxLength(32) + .HasColumnType("character varying(32)") + .HasColumnName("name"); + + b.HasKey("Id") + .HasName("pk_brand"); + + b.ToTable("brand", (string)null); + }); + + modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Product", b => + { + b.Property("Id") + .ValueGeneratedOnAdd() + .HasColumnType("integer") + .HasColumnName("id"); + + NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id")); + + b.Property("AvailableStock") + .HasColumnType("integer") + .HasColumnName("available_stock"); + + b.Property("BrandId") + .HasColumnType("integer") + .HasColumnName("brand_id"); + + b.Property("CreationTime") + .HasColumnType("timestamp with time zone") + .HasColumnName("creation_time"); + + b.Property("Description") + .IsRequired() + .HasColumnType("text") + .HasColumnName("description"); + + b.Property("ImageUrl") + .IsRequired() + .HasMaxLength(256) + .HasColumnType("character varying(256)") + .HasColumnName("image_url"); + + b.Property("Name") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)") + .HasColumnName("name"); + + b.Property("Price") + .HasColumnType("numeric") + .HasColumnName("price"); + + b.HasKey("Id") + .HasName("pk_product"); + + b.HasIndex("BrandId") + .HasDatabaseName("ix_product_brand_id"); + + b.ToTable("product", (string)null); + }); + + modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Product", b => + { + b.HasOne("HelloShop.ProductService.Entities.Products.Brand", "Brand") + .WithMany() + .HasForeignKey("BrandId") + .OnDelete(DeleteBehavior.Cascade) + .IsRequired() + .HasConstraintName("fk_product_brand_brand_id"); + + b.Navigation("Brand"); + }); +#pragma warning restore 612, 618 + } + } +} diff --git a/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.cs b/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.cs new file mode 100644 index 0000000..cd80bed --- /dev/null +++ b/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.cs @@ -0,0 +1,41 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using Microsoft.EntityFrameworkCore.Migrations; + +#nullable disable + +namespace HelloShop.ProductService.Infrastructure.Migrations +{ + /// + public partial class AddEventLogTable : Migration + { + /// + protected override void Up(MigrationBuilder migrationBuilder) + { + migrationBuilder.CreateTable( + name: "distributed_event_log", + columns: table => new + { + event_id = table.Column(type: "uuid", nullable: false), + event_type_name = table.Column(type: "character varying(64)", maxLength: 64, nullable: false), + distributed_event = table.Column(type: "text", nullable: false), + status = table.Column(type: "text", nullable: false), + times_sent = table.Column(type: "integer", nullable: false), + creation_time = table.Column(type: "timestamp with time zone", nullable: false), + transaction_id = table.Column(type: "uuid", nullable: false) + }, + constraints: table => + { + table.PrimaryKey("pk_distributed_event_log", x => x.event_id); + }); + } + + /// + protected override void Down(MigrationBuilder migrationBuilder) + { + migrationBuilder.DropTable( + name: "distributed_event_log"); + } + } +} diff --git a/src/HelloShop.ProductService/Infrastructure/Migrations/ProductServiceDbContextModelSnapshot.cs b/src/HelloShop.ProductService/Infrastructure/Migrations/ProductServiceDbContextModelSnapshot.cs index a7aae9b..3fd003d 100644 --- a/src/HelloShop.ProductService/Infrastructure/Migrations/ProductServiceDbContextModelSnapshot.cs +++ b/src/HelloShop.ProductService/Infrastructure/Migrations/ProductServiceDbContextModelSnapshot.cs @@ -22,6 +22,47 @@ namespace HelloShop.ProductService.Infrastructure.Migrations NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder); + modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b => + { + b.Property("EventId") + .ValueGeneratedOnAdd() + .HasColumnType("uuid") + .HasColumnName("event_id"); + + b.Property("CreationTime") + .HasColumnType("timestamp with time zone") + .HasColumnName("creation_time"); + + b.Property("DistributedEvent") + .IsRequired() + .HasColumnType("text") + .HasColumnName("distributed_event"); + + b.Property("EventTypeName") + .IsRequired() + .HasMaxLength(64) + .HasColumnType("character varying(64)") + .HasColumnName("event_type_name"); + + b.Property("Status") + .IsRequired() + .HasColumnType("text") + .HasColumnName("status"); + + b.Property("TimesSent") + .HasColumnType("integer") + .HasColumnName("times_sent"); + + b.Property("TransactionId") + .HasColumnType("uuid") + .HasColumnName("transaction_id"); + + b.HasKey("EventId") + .HasName("pk_distributed_event_log"); + + b.ToTable("distributed_event_log", (string)null); + }); + modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Brand", b => { b.Property("Id") diff --git a/src/HelloShop.ProductService/Infrastructure/ProductServiceDbContext.cs b/src/HelloShop.ProductService/Infrastructure/ProductServiceDbContext.cs index 69cfff7..cc04d29 100644 --- a/src/HelloShop.ProductService/Infrastructure/ProductServiceDbContext.cs +++ b/src/HelloShop.ProductService/Infrastructure/ProductServiceDbContext.cs @@ -1,6 +1,7 @@ // Copyright (c) HelloShop Corporation. All rights reserved. // See the license file in the project root for more information. +using HelloShop.EventBus.Logging; using Microsoft.EntityFrameworkCore; using System.Reflection; @@ -13,5 +14,6 @@ public class ProductServiceDbContext(DbContextOptions o base.OnModelCreating(builder); builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly()); + builder.UseDistributedEventLogs(); } } diff --git a/src/HelloShop.ProductService/Program.cs b/src/HelloShop.ProductService/Program.cs index 2792c3f..ac1ee6a 100644 --- a/src/HelloShop.ProductService/Program.cs +++ b/src/HelloShop.ProductService/Program.cs @@ -4,6 +4,7 @@ using HelloShop.DistributedLock.Dapr; using HelloShop.EventBus.Abstractions; using HelloShop.EventBus.Dapr; +using HelloShop.EventBus.Logging; using HelloShop.ProductService.Constants; using HelloShop.ProductService.Infrastructure; using HelloShop.ProductService.Services; @@ -47,6 +48,7 @@ builder.Services.AddAuthorization().AddRemotePermissionChecker().AddCustomAuthor builder.AddDaprEventBus().AddSubscriptionFromAssembly(); builder.Services.AddDaprDistributedLock(); builder.Services.AddSingleton(TimeProvider.System); +builder.Services.AddDistributedEventLogs().AddTransient(); // End addd extensions services to the container. var app = builder.Build(); diff --git a/src/HelloShop.ProductService/Services/DistributedEventService.cs b/src/HelloShop.ProductService/Services/DistributedEventService.cs new file mode 100644 index 0000000..ade77f6 --- /dev/null +++ b/src/HelloShop.ProductService/Services/DistributedEventService.cs @@ -0,0 +1,37 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using HelloShop.EventBus.Abstractions; +using HelloShop.EventBus.Logging; +using HelloShop.ProductService.Infrastructure; + +namespace HelloShop.ProductService.Services +{ + public class DistributedEventService(ILogger logger, IEventBus eventBus, ProductServiceDbContext dbContext, IDistributedEventLogService eventLogService) : IDistributedEventService + { + public async Task PublishThroughEventBusAsync(DistributedEvent @event) + { + try + { + await eventLogService.MarkEventAsInProgressAsync(@event.Id); + await eventBus.PublishAsync(@event); + await eventLogService.MarkEventAsPublishedAsync(@event.Id); + } + catch (Exception ex) + { + logger.LogError(ex, "Publish through event bus failed for {EventId}", @event.Id); + await eventLogService.MarkEventAsFailedAsync(@event.Id); + } + } + + public async Task SaveEventAndDbContextChangesAsync(DistributedEvent @event) + { + await ResilientTransaction.New(dbContext).ExecuteAsync(async () => + { + var transaction = dbContext.Database.CurrentTransaction ?? await dbContext.Database.BeginTransactionAsync(); + await dbContext.SaveChangesAsync(); + await eventLogService.SaveEventAsync(@event, transaction); + }); + } + } +} diff --git a/src/HelloShop.ProductService/Services/IDistributedEventService.cs b/src/HelloShop.ProductService/Services/IDistributedEventService.cs new file mode 100644 index 0000000..2dde5ac --- /dev/null +++ b/src/HelloShop.ProductService/Services/IDistributedEventService.cs @@ -0,0 +1,14 @@ +// Copyright (c) HelloShop Corporation. All rights reserved. +// See the license file in the project root for more information. + +using HelloShop.EventBus.Abstractions; + +namespace HelloShop.ProductService.Services +{ + public interface IDistributedEventService + { + Task SaveEventAndDbContextChangesAsync(DistributedEvent @event); + + Task PublishThroughEventBusAsync(DistributedEvent @event); + } +}