diff --git a/Directory.Packages.props b/Directory.Packages.props
index adc1b31..0cd8288 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -29,9 +29,11 @@
+
+
diff --git a/HelloShop.sln b/HelloShop.sln
index 31a6e5c..4790144 100644
--- a/HelloShop.sln
+++ b/HelloShop.sln
@@ -45,6 +45,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.DistributedLock",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.DistributedLock.Dapr", "libraries\HelloShop.DistributedLock.Dapr\HelloShop.DistributedLock.Dapr.csproj", "{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.EventBus.Logging", "libraries\HelloShop.EventBus.Logging\HelloShop.EventBus.Logging.csproj", "{ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -125,6 +127,10 @@ Global
{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -148,6 +154,7 @@ Global
{5D403BF6-9267-4B0F-AEB3-2143C8BBA8CD} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
{86729635-8E31-4C53-81AE-7C410C848219} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
{37F01A0B-50D6-48BA-8A20-FBADB5524CD7} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
+ {ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {845545A8-2006-46C3-ABD7-5BDF63F3858C}
diff --git a/src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs b/libraries/HelloShop.EventBus.Logging/DistributedEventLog.cs
similarity index 90%
rename from src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs
rename to libraries/HelloShop.EventBus.Logging/DistributedEventLog.cs
index d6d11a7..f3b8f3c 100644
--- a/src/HelloShop.OrderingService/Entities/EventLogs/DistributedEventLog.cs
+++ b/libraries/HelloShop.EventBus.Logging/DistributedEventLog.cs
@@ -4,7 +4,7 @@
using HelloShop.EventBus.Abstractions;
using System.Diagnostics.CodeAnalysis;
-namespace HelloShop.OrderingService.Entities.EventLogs
+namespace HelloShop.EventBus.Logging
{
public enum DistributedEventStatus { NotPublished, InProgress, Published, PublishedFailed }
@@ -20,7 +20,7 @@ namespace HelloShop.OrderingService.Entities.EventLogs
public int TimesSent { get; set; }
- public DateTimeOffset CreationTime { get; set; } = DateTimeOffset.UtcNow;
+ public DateTimeOffset CreationTime { get; init; } = TimeProvider.System.GetUtcNow();
public required Guid TransactionId { get; set; }
diff --git a/libraries/HelloShop.EventBus.Logging/DistributedEventLogExtensions.cs b/libraries/HelloShop.EventBus.Logging/DistributedEventLogExtensions.cs
new file mode 100644
index 0000000..10def70
--- /dev/null
+++ b/libraries/HelloShop.EventBus.Logging/DistributedEventLogExtensions.cs
@@ -0,0 +1,20 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using System.Diagnostics.CodeAnalysis;
+
+namespace HelloShop.EventBus.Logging
+{
+ public static class DistributedEventLogExtensions
+ {
+ public static void UseDistributedEventLogs(this ModelBuilder builder) => builder.ApplyConfiguration(new EventLogEntityTypeConfiguration());
+
+ public static IServiceCollection AddDistributedEventLogs([NotNull] this IServiceCollection services) where TContext : DbContext
+ {
+ services.AddTransient>().AddHostedService();
+ return services;
+ }
+ }
+}
diff --git a/libraries/HelloShop.EventBus.Logging/DistributedEventLogService.cs b/libraries/HelloShop.EventBus.Logging/DistributedEventLogService.cs
new file mode 100644
index 0000000..b8b36e7
--- /dev/null
+++ b/libraries/HelloShop.EventBus.Logging/DistributedEventLogService.cs
@@ -0,0 +1,51 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+using HelloShop.EventBus.Abstractions;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Storage;
+
+namespace HelloShop.EventBus.Logging
+{
+ public class DistributedEventLogService(TContext dbContext) : IDistributedEventLogService where TContext : DbContext
+ {
+ public async Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default)
+ {
+ return await dbContext.Set().Where(e => e.TransactionId == transactionId && e.Status == DistributedEventStatus.NotPublished).ToListAsync(cancellationToken: cancellationToken);
+ }
+
+ public async Task> RetrieveEventLogsFailedToPublishAsync(CancellationToken cancellationToken = default)
+ {
+ return await dbContext.Set().Where(e => e.Status == DistributedEventStatus.PublishedFailed).ToListAsync(cancellationToken);
+ }
+
+ public async Task SaveEventAsync(DistributedEvent @event, IDbContextTransaction transaction, CancellationToken cancellationToken = default)
+ {
+ var eventLog = new DistributedEventLog(@event, transaction.TransactionId);
+ dbContext.Database.UseTransaction(transaction.GetDbTransaction());
+ await dbContext.Set().AddAsync(eventLog, cancellationToken);
+
+ await dbContext.SaveChangesAsync(cancellationToken);
+ }
+
+ public async Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default)
+ {
+ var eventLogEntry = dbContext.Set().Single(ie => ie.EventId == eventId);
+
+ eventLogEntry.Status = status;
+
+ if (status == DistributedEventStatus.InProgress)
+ {
+ eventLogEntry.TimesSent++;
+ }
+
+ await dbContext.SaveChangesAsync(cancellationToken);
+ }
+
+ public Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.Published, cancellationToken);
+
+ public Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.InProgress, cancellationToken);
+
+ public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.PublishedFailed, cancellationToken);
+ }
+}
diff --git a/libraries/HelloShop.EventBus.Logging/DistributedEventWorker.cs b/libraries/HelloShop.EventBus.Logging/DistributedEventWorker.cs
new file mode 100644
index 0000000..2a04dfb
--- /dev/null
+++ b/libraries/HelloShop.EventBus.Logging/DistributedEventWorker.cs
@@ -0,0 +1,54 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+
+using HelloShop.EventBus.Abstractions;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+
+namespace HelloShop.EventBus.Logging
+{
+ public class DistributedEventWorker(IServiceScopeFactory serviceScopeFactory) : BackgroundService
+ {
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ using var scope = serviceScopeFactory.CreateScope();
+
+ var eventBus = scope.ServiceProvider.GetRequiredService();
+ var eventLogService = scope.ServiceProvider.GetRequiredService();
+ var logger = scope.ServiceProvider.GetRequiredService>();
+
+ try
+ {
+ var failedEventLogs = await eventLogService.RetrieveEventLogsFailedToPublishAsync(stoppingToken);
+
+ foreach (var eventLog in failedEventLogs)
+ {
+ DistributedEvent @event = eventLog.DistributedEvent;
+
+ try
+ {
+ await eventLogService.MarkEventAsInProgressAsync(@event.Id, stoppingToken);
+ await eventBus.PublishAsync(@event, stoppingToken);
+ await eventLogService.MarkEventAsPublishedAsync(@event.Id, stoppingToken);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Publish through event bus failed for {EventId}.", @event.Id);
+ await eventLogService.MarkEventAsFailedAsync(@event.Id, stoppingToken);
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "An error occurred while retrieving failed event logs.");
+ }
+
+ await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
+ }
+ }
+ }
+}
diff --git a/src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs b/libraries/HelloShop.EventBus.Logging/EventLogEntityTypeConfiguration.cs
similarity index 78%
rename from src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs
rename to libraries/HelloShop.EventBus.Logging/EventLogEntityTypeConfiguration.cs
index 35b00a5..0cbe1f6 100644
--- a/src/HelloShop.OrderingService/Infrastructure/EntityConfigurations/EventLogs/DistributedEventLogEntityTypeConfiguration.cs
+++ b/libraries/HelloShop.EventBus.Logging/EventLogEntityTypeConfiguration.cs
@@ -2,24 +2,23 @@
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
-using HelloShop.OrderingService.Entities.EventLogs;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using System.Text.Json;
-namespace HelloShop.OrderingService.Infrastructure.EntityConfigurations.EventLogs
+namespace HelloShop.EventBus.Logging
{
- public class DistributedEventLogEntityTypeConfiguration : IEntityTypeConfiguration
+ public class EventLogEntityTypeConfiguration : IEntityTypeConfiguration
{
private static readonly JsonSerializerOptions s_jsonSerializerOptions = new(JsonSerializerDefaults.Web);
public void Configure(EntityTypeBuilder builder)
{
builder.HasKey(x => x.EventId);
- builder.Property(x => x.EventTypeName).HasMaxLength(32);
+ builder.Property(x => x.EventTypeName).HasMaxLength(64);
builder.Property(x => x.Status).HasConversion();
builder.Property(x => x.DistributedEvent).HasConversion(v => JsonSerializer.Serialize(v, v.GetType(), s_jsonSerializerOptions), v => JsonSerializer.Deserialize(v, s_jsonSerializerOptions)!);
}
}
-}
+}
\ No newline at end of file
diff --git a/libraries/HelloShop.EventBus.Logging/HelloShop.EventBus.Logging.csproj b/libraries/HelloShop.EventBus.Logging/HelloShop.EventBus.Logging.csproj
new file mode 100644
index 0000000..8794144
--- /dev/null
+++ b/libraries/HelloShop.EventBus.Logging/HelloShop.EventBus.Logging.csproj
@@ -0,0 +1,17 @@
+
+
+
+ net9.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
diff --git a/libraries/HelloShop.EventBus.Logging/IDistributedEventLogService.cs b/libraries/HelloShop.EventBus.Logging/IDistributedEventLogService.cs
new file mode 100644
index 0000000..50afb91
--- /dev/null
+++ b/libraries/HelloShop.EventBus.Logging/IDistributedEventLogService.cs
@@ -0,0 +1,23 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+using HelloShop.EventBus.Abstractions;
+using Microsoft.EntityFrameworkCore.Storage;
+
+namespace HelloShop.EventBus.Logging
+{
+ public interface IDistributedEventLogService
+ {
+ Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default);
+
+ Task> RetrieveEventLogsFailedToPublishAsync(CancellationToken cancellationToken = default);
+
+ Task SaveEventAsync(DistributedEvent @event, IDbContextTransaction transaction, CancellationToken cancellationToken = default);
+
+ Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default);
+
+ Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default);
+
+ Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default);
+ }
+}
\ No newline at end of file
diff --git a/libraries/HelloShop.EventBus.Logging/ResilientTransaction.cs b/libraries/HelloShop.EventBus.Logging/ResilientTransaction.cs
new file mode 100644
index 0000000..7ad0c29
--- /dev/null
+++ b/libraries/HelloShop.EventBus.Logging/ResilientTransaction.cs
@@ -0,0 +1,23 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+using Microsoft.EntityFrameworkCore;
+
+namespace HelloShop.EventBus.Logging
+{
+ public class ResilientTransaction(DbContext dbContext)
+ {
+ public static ResilientTransaction New(DbContext context) => new(context);
+
+ public async Task ExecuteAsync(Func action)
+ {
+ var strategy = dbContext.Database.CreateExecutionStrategy();
+ await strategy.ExecuteAsync(async () =>
+ {
+ await using var transaction = await dbContext.Database.BeginTransactionAsync();
+ await action();
+ await transaction.CommitAsync();
+ });
+ }
+ }
+}
diff --git a/libraries/HelloShop.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs b/libraries/HelloShop.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs
index e679898..16d5d50 100644
--- a/libraries/HelloShop.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs
+++ b/libraries/HelloShop.EventBus.RabbitMQ/RabbitMQEventBusOptions.cs
@@ -7,7 +7,7 @@ namespace HelloShop.EventBus.RabbitMQ
{
public string ExchangeName { get; set; } = "event-bus-exchange";
- public required string QueueName { get; set; }
+ public required string QueueName { get; set; } = "event-bus-queue";
public int RetryCount { get; set; } = 10;
}
diff --git a/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs b/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs
index 568e4ac..dfecb59 100644
--- a/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs
+++ b/src/HelloShop.OrderingService/Behaviors/TransactionBehavior.cs
@@ -41,7 +41,7 @@ namespace HelloShop.OrderingService.Behaviors
_logger.LogInformation("Commit transaction {TransactionId} for {CommandName}", transaction.TransactionId, typeName);
}
-
+ await dbContext.SaveChangesAsync(cancellationToken);
await transaction.CommitAsync();
await eventService.PublishEventsThroughEventBusAsync(transaction.TransactionId, cancellationToken);
diff --git a/src/HelloShop.OrderingService/Extensions/Extensions.cs b/src/HelloShop.OrderingService/Extensions/Extensions.cs
index f81a088..d564458 100644
--- a/src/HelloShop.OrderingService/Extensions/Extensions.cs
+++ b/src/HelloShop.OrderingService/Extensions/Extensions.cs
@@ -3,6 +3,7 @@
using HelloShop.EventBus.Abstractions;
using HelloShop.EventBus.Dapr;
+using HelloShop.EventBus.Logging;
using HelloShop.OrderingService.Behaviors;
using HelloShop.OrderingService.Constants;
using HelloShop.OrderingService.Infrastructure;
@@ -67,7 +68,7 @@ namespace HelloShop.OrderingService.Extensions
builder.Services.AddHostedService();
builder.Services.AddHostedService();
- builder.Services.AddTransient>();
+ builder.Services.AddDistributedEventLogs().AddTransient();
builder.Services.AddOpenApi();
diff --git a/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj b/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj
index 5ad96cf..e0ddc97 100644
--- a/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj
+++ b/src/HelloShop.OrderingService/HelloShop.OrderingService.csproj
@@ -6,6 +6,7 @@
+
diff --git a/src/HelloShop.OrderingService/HelloShop.OrderingService.http b/src/HelloShop.OrderingService/HelloShop.OrderingService.http
index 900eb89..79147f5 100644
--- a/src/HelloShop.OrderingService/HelloShop.OrderingService.http
+++ b/src/HelloShop.OrderingService/HelloShop.OrderingService.http
@@ -1,9 +1,12 @@
-@HelloShop.OrderingService_HostAddress = https://localhost:8105
+@HelloWorld.OrderingService_HostAddress = https://localhost:8105
+@AccessToken = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3NDI4Njg1ODAsImV4cCI6MTc2NDkwMDU4MCwiaWF0IjoxNzQyODY4NTgwfQ.6Wm6S4CNtKi9lGqxam4_ZnDebnTXVxycDubbv0DLy2c
-POST {{HelloShop.OrderingService_HostAddress}}/api/orders
+###
+
+POST {{HelloWorld.OrderingService_HostAddress}}/api/orders
Content-Type: application/json
-x-request-id: 8ddc1737-6a31-4b4f-bccd-63d0966348a1
-Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4
+x-request-id: 1ddc1737-6a32-4b4f-bccd-63d0966348a3
+Authorization : Bearer {{AccessToken}}
{
"country": "USA",
@@ -36,12 +39,20 @@ Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwi
###
-GET {{HelloShop.OrderingService_HostAddress}}/api/orders
+GET {{HelloWorld.OrderingService_HostAddress}}/api/orders
Content-Type: application/json
-Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4
+Authorization : Bearer {{AccessToken}}
###
-GET {{HelloShop.OrderingService_HostAddress}}/api/orders/1
+GET {{HelloWorld.OrderingService_HostAddress}}/api/orders/1
Content-Type: application/json
-Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4
+Authorization : Bearer {{AccessToken}}
+
+###
+
+
+PUT {{HelloWorld.OrderingService_HostAddress}}/api/orders/ship/1
+Content-Type: application/json
+x-request-id: 1ddc1737-6a32-4b4f-bccd-63d0966348a1
+Authorization : Bearer {{AccessToken}}
diff --git a/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.Designer.cs b/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.Designer.cs
new file mode 100644
index 0000000..9f3f429
--- /dev/null
+++ b/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.Designer.cs
@@ -0,0 +1,347 @@
+//
+using System;
+using HelloShop.OrderingService.Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+
+#nullable disable
+
+namespace HelloShop.OrderingService.Infrastructure.Migrations
+{
+ [DbContext(typeof(OrderingServiceDbContext))]
+ [Migration("20250328130944_AddEventLogTable")]
+ partial class AddEventLogTable
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "9.0.3")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b =>
+ {
+ b.Property("EventId")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("event_id");
+
+ b.Property("CreationTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("creation_time");
+
+ b.Property("DistributedEvent")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("distributed_event");
+
+ b.Property("EventTypeName")
+ .IsRequired()
+ .HasMaxLength(64)
+ .HasColumnType("character varying(64)")
+ .HasColumnName("event_type_name");
+
+ b.Property("Status")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("status");
+
+ b.Property("TimesSent")
+ .HasColumnType("integer")
+ .HasColumnName("times_sent");
+
+ b.Property("TransactionId")
+ .HasColumnType("uuid")
+ .HasColumnName("transaction_id");
+
+ b.HasKey("EventId")
+ .HasName("pk_distributed_event_log");
+
+ b.ToTable("distributed_event_log", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("integer")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("Name")
+ .IsRequired()
+ .HasMaxLength(16)
+ .HasColumnType("character varying(16)")
+ .HasColumnName("name");
+
+ b.HasKey("Id")
+ .HasName("pk_buyer");
+
+ b.ToTable("buyer", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("integer")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("Alias")
+ .IsRequired()
+ .HasMaxLength(16)
+ .HasColumnType("character varying(16)")
+ .HasColumnName("alias");
+
+ b.Property("BuyerId")
+ .HasColumnType("integer")
+ .HasColumnName("buyer_id");
+
+ b.Property("CardHolderName")
+ .IsRequired()
+ .HasMaxLength(16)
+ .HasColumnType("character varying(16)")
+ .HasColumnName("card_holder_name");
+
+ b.Property("CardNumber")
+ .IsRequired()
+ .HasMaxLength(16)
+ .HasColumnType("character varying(16)")
+ .HasColumnName("card_number");
+
+ b.Property("Expiration")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("expiration");
+
+ b.Property("SecurityNumber")
+ .HasMaxLength(6)
+ .HasColumnType("character varying(6)")
+ .HasColumnName("security_number");
+
+ b.HasKey("Id")
+ .HasName("pk_payment_method");
+
+ b.HasIndex("BuyerId")
+ .HasDatabaseName("ix_payment_method_buyer_id");
+
+ b.ToTable("payment_method", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Idempotency.ClientRequest", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("id");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasMaxLength(64)
+ .HasColumnType("character varying(64)")
+ .HasColumnName("name");
+
+ b.Property("Time")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("time");
+
+ b.HasKey("Id")
+ .HasName("pk_client_request");
+
+ b.ToTable("client_request", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("integer")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("BuyerId")
+ .HasColumnType("integer")
+ .HasColumnName("buyer_id");
+
+ b.Property("Description")
+ .HasMaxLength(64)
+ .HasColumnType("character varying(64)")
+ .HasColumnName("description");
+
+ b.Property("OrderDate")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("order_date");
+
+ b.Property("OrderStatus")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("order_status");
+
+ b.Property("PaymentMethodId")
+ .HasColumnType("integer")
+ .HasColumnName("payment_method_id");
+
+ b.HasKey("Id")
+ .HasName("pk_order");
+
+ b.HasIndex("BuyerId")
+ .HasDatabaseName("ix_order_buyer_id");
+
+ b.HasIndex("PaymentMethodId")
+ .HasDatabaseName("ix_order_payment_method_id");
+
+ b.ToTable("order", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.OrderItem", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("integer")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("OrderId")
+ .HasColumnType("integer")
+ .HasColumnName("order_id");
+
+ b.Property("PictureUrl")
+ .IsRequired()
+ .HasMaxLength(256)
+ .HasColumnType("character varying(256)")
+ .HasColumnName("picture_url");
+
+ b.Property("ProductId")
+ .HasColumnType("integer")
+ .HasColumnName("product_id");
+
+ b.Property("ProductName")
+ .IsRequired()
+ .HasMaxLength(16)
+ .HasColumnType("character varying(16)")
+ .HasColumnName("product_name");
+
+ b.Property("UnitPrice")
+ .HasColumnType("numeric")
+ .HasColumnName("unit_price");
+
+ b.Property("Units")
+ .HasColumnType("integer")
+ .HasColumnName("units");
+
+ b.HasKey("Id")
+ .HasName("pk_order_item");
+
+ b.HasIndex("OrderId")
+ .HasDatabaseName("ix_order_item_order_id");
+
+ b.ToTable("order_item", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", b =>
+ {
+ b.HasOne("HelloShop.OrderingService.Entities.Buyers.Buyer", null)
+ .WithMany("PaymentMethods")
+ .HasForeignKey("BuyerId")
+ .OnDelete(DeleteBehavior.Cascade)
+ .IsRequired()
+ .HasConstraintName("fk_payment_method_buyer_buyer_id");
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b =>
+ {
+ b.HasOne("HelloShop.OrderingService.Entities.Buyers.Buyer", null)
+ .WithMany()
+ .HasForeignKey("BuyerId")
+ .OnDelete(DeleteBehavior.Cascade)
+ .IsRequired()
+ .HasConstraintName("fk_order_buyer_buyer_id");
+
+ b.HasOne("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", null)
+ .WithMany()
+ .HasForeignKey("PaymentMethodId")
+ .OnDelete(DeleteBehavior.Restrict)
+ .HasConstraintName("fk_order_payment_method_payment_method_id");
+
+ b.OwnsOne("HelloShop.OrderingService.Entities.Orders.Address", "Address", b1 =>
+ {
+ b1.Property("OrderId")
+ .HasColumnType("integer")
+ .HasColumnName("id");
+
+ b1.Property("City")
+ .IsRequired()
+ .HasMaxLength(16)
+ .HasColumnType("character varying(16)")
+ .HasColumnName("City");
+
+ b1.Property("Country")
+ .IsRequired()
+ .HasMaxLength(8)
+ .HasColumnType("character varying(8)")
+ .HasColumnName("Country");
+
+ b1.Property("State")
+ .IsRequired()
+ .HasMaxLength(16)
+ .HasColumnType("character varying(16)")
+ .HasColumnName("State");
+
+ b1.Property("Street")
+ .IsRequired()
+ .HasMaxLength(32)
+ .HasColumnType("character varying(32)")
+ .HasColumnName("Street");
+
+ b1.Property("ZipCode")
+ .IsRequired()
+ .HasMaxLength(6)
+ .HasColumnType("character varying(6)")
+ .HasColumnName("ZipCode");
+
+ b1.HasKey("OrderId");
+
+ b1.ToTable("order");
+
+ b1.WithOwner()
+ .HasForeignKey("OrderId")
+ .HasConstraintName("fk_order_order_id");
+ });
+
+ b.Navigation("Address")
+ .IsRequired();
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.OrderItem", b =>
+ {
+ b.HasOne("HelloShop.OrderingService.Entities.Orders.Order", null)
+ .WithMany("OrderItems")
+ .HasForeignKey("OrderId")
+ .OnDelete(DeleteBehavior.Cascade)
+ .IsRequired()
+ .HasConstraintName("fk_order_item_order_order_id");
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b =>
+ {
+ b.Navigation("PaymentMethods");
+ });
+
+ modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b =>
+ {
+ b.Navigation("OrderItems");
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.cs b/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.cs
new file mode 100644
index 0000000..44e1344
--- /dev/null
+++ b/src/HelloShop.OrderingService/Infrastructure/Migrations/20250328130944_AddEventLogTable.cs
@@ -0,0 +1,41 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+using Microsoft.EntityFrameworkCore.Migrations;
+
+#nullable disable
+
+namespace HelloShop.OrderingService.Infrastructure.Migrations
+{
+ ///
+ public partial class AddEventLogTable : Migration
+ {
+ ///
+ protected override void Up(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.AlterColumn(
+ name: "event_type_name",
+ table: "distributed_event_log",
+ type: "character varying(64)",
+ maxLength: 64,
+ nullable: false,
+ oldClrType: typeof(string),
+ oldType: "character varying(32)",
+ oldMaxLength: 32);
+ }
+
+ ///
+ protected override void Down(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.AlterColumn(
+ name: "event_type_name",
+ table: "distributed_event_log",
+ type: "character varying(32)",
+ maxLength: 32,
+ nullable: false,
+ oldClrType: typeof(string),
+ oldType: "character varying(64)",
+ oldMaxLength: 64);
+ }
+ }
+}
diff --git a/src/HelloShop.OrderingService/Infrastructure/Migrations/OrderingServiceDbContextModelSnapshot.cs b/src/HelloShop.OrderingService/Infrastructure/Migrations/OrderingServiceDbContextModelSnapshot.cs
index ac2a830..958bf51 100644
--- a/src/HelloShop.OrderingService/Infrastructure/Migrations/OrderingServiceDbContextModelSnapshot.cs
+++ b/src/HelloShop.OrderingService/Infrastructure/Migrations/OrderingServiceDbContextModelSnapshot.cs
@@ -22,6 +22,47 @@ namespace HelloShop.OrderingService.Infrastructure.Migrations
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+ modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b =>
+ {
+ b.Property("EventId")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("event_id");
+
+ b.Property("CreationTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("creation_time");
+
+ b.Property("DistributedEvent")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("distributed_event");
+
+ b.Property("EventTypeName")
+ .IsRequired()
+ .HasMaxLength(64)
+ .HasColumnType("character varying(64)")
+ .HasColumnName("event_type_name");
+
+ b.Property("Status")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("status");
+
+ b.Property("TimesSent")
+ .HasColumnType("integer")
+ .HasColumnName("times_sent");
+
+ b.Property("TransactionId")
+ .HasColumnType("uuid")
+ .HasColumnName("transaction_id");
+
+ b.HasKey("EventId")
+ .HasName("pk_distributed_event_log");
+
+ b.ToTable("distributed_event_log", (string)null);
+ });
+
modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b =>
{
b.Property("Id")
@@ -92,47 +133,6 @@ namespace HelloShop.OrderingService.Infrastructure.Migrations
b.ToTable("payment_method", (string)null);
});
- modelBuilder.Entity("HelloShop.OrderingService.Entities.EventLogs.DistributedEventLog", b =>
- {
- b.Property("EventId")
- .ValueGeneratedOnAdd()
- .HasColumnType("uuid")
- .HasColumnName("event_id");
-
- b.Property("CreationTime")
- .HasColumnType("timestamp with time zone")
- .HasColumnName("creation_time");
-
- b.Property("DistributedEvent")
- .IsRequired()
- .HasColumnType("text")
- .HasColumnName("distributed_event");
-
- b.Property("EventTypeName")
- .IsRequired()
- .HasMaxLength(32)
- .HasColumnType("character varying(32)")
- .HasColumnName("event_type_name");
-
- b.Property("Status")
- .IsRequired()
- .HasColumnType("text")
- .HasColumnName("status");
-
- b.Property("TimesSent")
- .HasColumnType("integer")
- .HasColumnName("times_sent");
-
- b.Property("TransactionId")
- .HasColumnType("uuid")
- .HasColumnName("transaction_id");
-
- b.HasKey("EventId")
- .HasName("pk_distributed_event_log");
-
- b.ToTable("distributed_event_log", (string)null);
- });
-
modelBuilder.Entity("HelloShop.OrderingService.Entities.Idempotency.ClientRequest", b =>
{
b.Property("Id")
diff --git a/src/HelloShop.OrderingService/Infrastructure/OrderingServiceDbContext.cs b/src/HelloShop.OrderingService/Infrastructure/OrderingServiceDbContext.cs
index 16b3ed6..aad3043 100644
--- a/src/HelloShop.OrderingService/Infrastructure/OrderingServiceDbContext.cs
+++ b/src/HelloShop.OrderingService/Infrastructure/OrderingServiceDbContext.cs
@@ -1,6 +1,7 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
+using HelloShop.EventBus.Logging;
using Microsoft.EntityFrameworkCore;
using System.Reflection;
@@ -13,6 +14,7 @@ namespace HelloShop.OrderingService.Infrastructure
base.OnModelCreating(builder);
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
+ builder.UseDistributedEventLogs();
}
}
}
diff --git a/src/HelloShop.OrderingService/Services/DistributedEventService.cs b/src/HelloShop.OrderingService/Services/DistributedEventService.cs
index 5d6ce0e..99c4aa3 100644
--- a/src/HelloShop.OrderingService/Services/DistributedEventService.cs
+++ b/src/HelloShop.OrderingService/Services/DistributedEventService.cs
@@ -2,88 +2,42 @@
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
-using HelloShop.OrderingService.Entities.EventLogs;
-using Microsoft.EntityFrameworkCore;
+using HelloShop.EventBus.Logging;
+using HelloShop.OrderingService.Infrastructure;
namespace HelloShop.OrderingService.Services
{
- public class DistributedEventService(TContext dbContext, IEventBus distributedEventBus, ILogger> logger) : IDistributedEventService, IDisposable where TContext : DbContext
+ public class DistributedEventService(ILogger logger, IEventBus eventBus, OrderingServiceDbContext dbContext, IDistributedEventLogService eventLogService) : IDistributedEventService
{
- private volatile bool _disposedValue;
-
- public async Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default)
+ public async Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default)
{
- var eventLogEntry = dbContext.Set().Single(ie => ie.EventId == eventId);
+ var pendingLogEvents = await eventLogService.RetrieveEventLogsPendingToPublishAsync(transactionId, cancellationToken);
- eventLogEntry.Status = status;
-
- if (status == DistributedEventStatus.InProgress)
+ foreach (var eventLog in pendingLogEvents)
{
- eventLogEntry.TimesSent++;
+ logger.LogInformation("Publishing distributed event: {EventId} from {EventType}.", eventLog.EventId, eventLog.EventTypeName);
+
+ try
+ {
+ await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, cancellationToken);
+ await eventBus.PublishAsync(eventLog.DistributedEvent, cancellationToken);
+ await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, cancellationToken);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Error publishing distributed event: {EventId}.", eventLog.EventId);
+ await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, cancellationToken);
+ }
}
-
- await dbContext.SaveChangesAsync(cancellationToken);
- }
-
- public async Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default)
- {
- var result = await dbContext.Set().Where(e => e.TransactionId == transactionId && e.Status == DistributedEventStatus.NotPublished).ToListAsync(cancellationToken: cancellationToken);
-
- return result.Count != 0 ? result.OrderBy(o => o.CreationTime) : [];
}
public async Task AddAndSaveEventAsync(DistributedEvent @event, CancellationToken cancellationToken = default)
{
- var transaction = dbContext.Database.CurrentTransaction ?? throw new InvalidOperationException("This method must be called within a transaction scope.");
+ logger.LogInformation("Creating and saving distributed event: {EventId} from {EventType}.", @event.Id, @event.GetType().Name);
- var eventLog = new DistributedEventLog(@event, transaction.TransactionId);
+ var transaction = dbContext.Database.CurrentTransaction ?? await dbContext.Database.BeginTransactionAsync(cancellationToken);
- await dbContext.AddAsync(eventLog, cancellationToken);
-
- await dbContext.SaveChangesAsync(cancellationToken);
- }
-
- public async Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default)
- {
- var pendingEventLogs = await RetrieveEventLogsPendingToPublishAsync(transactionId, cancellationToken);
-
- foreach (var eventLog in pendingEventLogs)
- {
- logger.LogInformation("Publishing integration event {EventId} {DistributedEvent}", eventLog.EventId, eventLog.DistributedEvent);
-
- try
- {
- await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.InProgress, cancellationToken);
- await distributedEventBus.PublishAsync(eventLog.DistributedEvent, cancellationToken);
- await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.Published, cancellationToken);
- }
- catch (Exception ex)
- {
- logger.LogError(ex, "Error publishing distributed event {EventId}", eventLog.EventId);
-
- await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.PublishedFailed, cancellationToken);
- }
- }
- }
-
- protected virtual void Dispose(bool disposing)
- {
- if (!_disposedValue)
- {
- if (disposing)
- {
- dbContext.Dispose();
- }
-
- _disposedValue = true;
- }
- }
-
- public void Dispose()
- {
- Dispose(disposing: true);
- GC.SuppressFinalize(this);
+ await eventLogService.SaveEventAsync(@event, transaction, cancellationToken);
}
}
-
}
diff --git a/src/HelloShop.OrderingService/Services/IDistributedEventService.cs b/src/HelloShop.OrderingService/Services/IDistributedEventService.cs
index 40f70b8..3e7038d 100644
--- a/src/HelloShop.OrderingService/Services/IDistributedEventService.cs
+++ b/src/HelloShop.OrderingService/Services/IDistributedEventService.cs
@@ -2,18 +2,13 @@
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
-using HelloShop.OrderingService.Entities.EventLogs;
namespace HelloShop.OrderingService.Services
{
public interface IDistributedEventService
{
- Task> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default);
+ Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default);
Task AddAndSaveEventAsync(DistributedEvent @event, CancellationToken cancellationToken = default);
-
- Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default);
-
- Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default);
}
}
diff --git a/src/HelloShop.ProductService/DistributedEvents/EventHandling/OrderAwaitingValidationDistributedEventHandler.cs b/src/HelloShop.ProductService/DistributedEvents/EventHandling/OrderAwaitingValidationDistributedEventHandler.cs
index 46e6cc6..a34a217 100644
--- a/src/HelloShop.ProductService/DistributedEvents/EventHandling/OrderAwaitingValidationDistributedEventHandler.cs
+++ b/src/HelloShop.ProductService/DistributedEvents/EventHandling/OrderAwaitingValidationDistributedEventHandler.cs
@@ -6,10 +6,11 @@ using HelloShop.EventBus.Abstractions;
using HelloShop.ProductService.DistributedEvents.Events;
using HelloShop.ProductService.Entities.Products;
using HelloShop.ProductService.Infrastructure;
+using HelloShop.ProductService.Services;
namespace HelloShop.ProductService.DistributedEvents.EventHandling
{
- public class OrderAwaitingValidationDistributedEventHandler(ProductServiceDbContext dbContext, IEventBus distributedEventBus, IDistributedLock distributedLock, ILogger logger) : IDistributedEventHandler
+ public class OrderAwaitingValidationDistributedEventHandler(ProductServiceDbContext dbContext, IDistributedEventService distributedEventService, IDistributedLock distributedLock, ILogger logger) : IDistributedEventHandler
{
public async Task HandleAsync(OrderAwaitingValidationDistributedEvent @event)
{
@@ -30,7 +31,8 @@ namespace HelloShop.ProductService.DistributedEvents.EventHandling
DistributedEvent confirmedEvent = confirmedOrderStockItems.All(c => c.Value) ? new OrderStockConfirmedDistributedEvent(@event.OrderId) : new OrderStockRejectedDistributedEvent(@event.OrderId);
- await distributedEventBus.PublishAsync(confirmedEvent);
+ await distributedEventService.SaveEventAndDbContextChangesAsync(confirmedEvent);
+ await distributedEventService.PublishThroughEventBusAsync(confirmedEvent);
}
}
}
diff --git a/src/HelloShop.ProductService/HelloShop.ProductService.csproj b/src/HelloShop.ProductService/HelloShop.ProductService.csproj
index 9795c77..c10b914 100644
--- a/src/HelloShop.ProductService/HelloShop.ProductService.csproj
+++ b/src/HelloShop.ProductService/HelloShop.ProductService.csproj
@@ -7,6 +7,7 @@
+
diff --git a/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.Designer.cs b/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.Designer.cs
new file mode 100644
index 0000000..5d52a36
--- /dev/null
+++ b/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.Designer.cs
@@ -0,0 +1,155 @@
+//
+using System;
+using HelloShop.ProductService.Infrastructure;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore.Infrastructure;
+using Microsoft.EntityFrameworkCore.Migrations;
+using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
+using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
+
+#nullable disable
+
+namespace HelloShop.ProductService.Infrastructure.Migrations
+{
+ [DbContext(typeof(ProductServiceDbContext))]
+ [Migration("20250328131028_AddEventLogTable")]
+ partial class AddEventLogTable
+ {
+ ///
+ protected override void BuildTargetModel(ModelBuilder modelBuilder)
+ {
+#pragma warning disable 612, 618
+ modelBuilder
+ .HasAnnotation("ProductVersion", "9.0.3")
+ .HasAnnotation("Relational:MaxIdentifierLength", 63);
+
+ NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+
+ modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b =>
+ {
+ b.Property("EventId")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("event_id");
+
+ b.Property("CreationTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("creation_time");
+
+ b.Property("DistributedEvent")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("distributed_event");
+
+ b.Property("EventTypeName")
+ .IsRequired()
+ .HasMaxLength(64)
+ .HasColumnType("character varying(64)")
+ .HasColumnName("event_type_name");
+
+ b.Property("Status")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("status");
+
+ b.Property("TimesSent")
+ .HasColumnType("integer")
+ .HasColumnName("times_sent");
+
+ b.Property("TransactionId")
+ .HasColumnType("uuid")
+ .HasColumnName("transaction_id");
+
+ b.HasKey("EventId")
+ .HasName("pk_distributed_event_log");
+
+ b.ToTable("distributed_event_log", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Brand", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("integer")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("Name")
+ .IsRequired()
+ .HasMaxLength(32)
+ .HasColumnType("character varying(32)")
+ .HasColumnName("name");
+
+ b.HasKey("Id")
+ .HasName("pk_brand");
+
+ b.ToTable("brand", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Product", b =>
+ {
+ b.Property("Id")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("integer")
+ .HasColumnName("id");
+
+ NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property("Id"));
+
+ b.Property("AvailableStock")
+ .HasColumnType("integer")
+ .HasColumnName("available_stock");
+
+ b.Property("BrandId")
+ .HasColumnType("integer")
+ .HasColumnName("brand_id");
+
+ b.Property("CreationTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("creation_time");
+
+ b.Property("Description")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("description");
+
+ b.Property("ImageUrl")
+ .IsRequired()
+ .HasMaxLength(256)
+ .HasColumnType("character varying(256)")
+ .HasColumnName("image_url");
+
+ b.Property("Name")
+ .IsRequired()
+ .HasMaxLength(64)
+ .HasColumnType("character varying(64)")
+ .HasColumnName("name");
+
+ b.Property("Price")
+ .HasColumnType("numeric")
+ .HasColumnName("price");
+
+ b.HasKey("Id")
+ .HasName("pk_product");
+
+ b.HasIndex("BrandId")
+ .HasDatabaseName("ix_product_brand_id");
+
+ b.ToTable("product", (string)null);
+ });
+
+ modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Product", b =>
+ {
+ b.HasOne("HelloShop.ProductService.Entities.Products.Brand", "Brand")
+ .WithMany()
+ .HasForeignKey("BrandId")
+ .OnDelete(DeleteBehavior.Cascade)
+ .IsRequired()
+ .HasConstraintName("fk_product_brand_brand_id");
+
+ b.Navigation("Brand");
+ });
+#pragma warning restore 612, 618
+ }
+ }
+}
diff --git a/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.cs b/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.cs
new file mode 100644
index 0000000..cd80bed
--- /dev/null
+++ b/src/HelloShop.ProductService/Infrastructure/Migrations/20250328131028_AddEventLogTable.cs
@@ -0,0 +1,41 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+using Microsoft.EntityFrameworkCore.Migrations;
+
+#nullable disable
+
+namespace HelloShop.ProductService.Infrastructure.Migrations
+{
+ ///
+ public partial class AddEventLogTable : Migration
+ {
+ ///
+ protected override void Up(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.CreateTable(
+ name: "distributed_event_log",
+ columns: table => new
+ {
+ event_id = table.Column(type: "uuid", nullable: false),
+ event_type_name = table.Column(type: "character varying(64)", maxLength: 64, nullable: false),
+ distributed_event = table.Column(type: "text", nullable: false),
+ status = table.Column(type: "text", nullable: false),
+ times_sent = table.Column(type: "integer", nullable: false),
+ creation_time = table.Column(type: "timestamp with time zone", nullable: false),
+ transaction_id = table.Column(type: "uuid", nullable: false)
+ },
+ constraints: table =>
+ {
+ table.PrimaryKey("pk_distributed_event_log", x => x.event_id);
+ });
+ }
+
+ ///
+ protected override void Down(MigrationBuilder migrationBuilder)
+ {
+ migrationBuilder.DropTable(
+ name: "distributed_event_log");
+ }
+ }
+}
diff --git a/src/HelloShop.ProductService/Infrastructure/Migrations/ProductServiceDbContextModelSnapshot.cs b/src/HelloShop.ProductService/Infrastructure/Migrations/ProductServiceDbContextModelSnapshot.cs
index a7aae9b..3fd003d 100644
--- a/src/HelloShop.ProductService/Infrastructure/Migrations/ProductServiceDbContextModelSnapshot.cs
+++ b/src/HelloShop.ProductService/Infrastructure/Migrations/ProductServiceDbContextModelSnapshot.cs
@@ -22,6 +22,47 @@ namespace HelloShop.ProductService.Infrastructure.Migrations
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
+ modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b =>
+ {
+ b.Property("EventId")
+ .ValueGeneratedOnAdd()
+ .HasColumnType("uuid")
+ .HasColumnName("event_id");
+
+ b.Property("CreationTime")
+ .HasColumnType("timestamp with time zone")
+ .HasColumnName("creation_time");
+
+ b.Property("DistributedEvent")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("distributed_event");
+
+ b.Property("EventTypeName")
+ .IsRequired()
+ .HasMaxLength(64)
+ .HasColumnType("character varying(64)")
+ .HasColumnName("event_type_name");
+
+ b.Property("Status")
+ .IsRequired()
+ .HasColumnType("text")
+ .HasColumnName("status");
+
+ b.Property("TimesSent")
+ .HasColumnType("integer")
+ .HasColumnName("times_sent");
+
+ b.Property("TransactionId")
+ .HasColumnType("uuid")
+ .HasColumnName("transaction_id");
+
+ b.HasKey("EventId")
+ .HasName("pk_distributed_event_log");
+
+ b.ToTable("distributed_event_log", (string)null);
+ });
+
modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Brand", b =>
{
b.Property("Id")
diff --git a/src/HelloShop.ProductService/Infrastructure/ProductServiceDbContext.cs b/src/HelloShop.ProductService/Infrastructure/ProductServiceDbContext.cs
index 69cfff7..cc04d29 100644
--- a/src/HelloShop.ProductService/Infrastructure/ProductServiceDbContext.cs
+++ b/src/HelloShop.ProductService/Infrastructure/ProductServiceDbContext.cs
@@ -1,6 +1,7 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
+using HelloShop.EventBus.Logging;
using Microsoft.EntityFrameworkCore;
using System.Reflection;
@@ -13,5 +14,6 @@ public class ProductServiceDbContext(DbContextOptions o
base.OnModelCreating(builder);
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
+ builder.UseDistributedEventLogs();
}
}
diff --git a/src/HelloShop.ProductService/Program.cs b/src/HelloShop.ProductService/Program.cs
index 2792c3f..ac1ee6a 100644
--- a/src/HelloShop.ProductService/Program.cs
+++ b/src/HelloShop.ProductService/Program.cs
@@ -4,6 +4,7 @@
using HelloShop.DistributedLock.Dapr;
using HelloShop.EventBus.Abstractions;
using HelloShop.EventBus.Dapr;
+using HelloShop.EventBus.Logging;
using HelloShop.ProductService.Constants;
using HelloShop.ProductService.Infrastructure;
using HelloShop.ProductService.Services;
@@ -47,6 +48,7 @@ builder.Services.AddAuthorization().AddRemotePermissionChecker().AddCustomAuthor
builder.AddDaprEventBus().AddSubscriptionFromAssembly();
builder.Services.AddDaprDistributedLock();
builder.Services.AddSingleton(TimeProvider.System);
+builder.Services.AddDistributedEventLogs().AddTransient();
// End addd extensions services to the container.
var app = builder.Build();
diff --git a/src/HelloShop.ProductService/Services/DistributedEventService.cs b/src/HelloShop.ProductService/Services/DistributedEventService.cs
new file mode 100644
index 0000000..ade77f6
--- /dev/null
+++ b/src/HelloShop.ProductService/Services/DistributedEventService.cs
@@ -0,0 +1,37 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+using HelloShop.EventBus.Abstractions;
+using HelloShop.EventBus.Logging;
+using HelloShop.ProductService.Infrastructure;
+
+namespace HelloShop.ProductService.Services
+{
+ public class DistributedEventService(ILogger logger, IEventBus eventBus, ProductServiceDbContext dbContext, IDistributedEventLogService eventLogService) : IDistributedEventService
+ {
+ public async Task PublishThroughEventBusAsync(DistributedEvent @event)
+ {
+ try
+ {
+ await eventLogService.MarkEventAsInProgressAsync(@event.Id);
+ await eventBus.PublishAsync(@event);
+ await eventLogService.MarkEventAsPublishedAsync(@event.Id);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, "Publish through event bus failed for {EventId}", @event.Id);
+ await eventLogService.MarkEventAsFailedAsync(@event.Id);
+ }
+ }
+
+ public async Task SaveEventAndDbContextChangesAsync(DistributedEvent @event)
+ {
+ await ResilientTransaction.New(dbContext).ExecuteAsync(async () =>
+ {
+ var transaction = dbContext.Database.CurrentTransaction ?? await dbContext.Database.BeginTransactionAsync();
+ await dbContext.SaveChangesAsync();
+ await eventLogService.SaveEventAsync(@event, transaction);
+ });
+ }
+ }
+}
diff --git a/src/HelloShop.ProductService/Services/IDistributedEventService.cs b/src/HelloShop.ProductService/Services/IDistributedEventService.cs
new file mode 100644
index 0000000..2dde5ac
--- /dev/null
+++ b/src/HelloShop.ProductService/Services/IDistributedEventService.cs
@@ -0,0 +1,14 @@
+// Copyright (c) HelloShop Corporation. All rights reserved.
+// See the license file in the project root for more information.
+
+using HelloShop.EventBus.Abstractions;
+
+namespace HelloShop.ProductService.Services
+{
+ public interface IDistributedEventService
+ {
+ Task SaveEventAndDbContextChangesAsync(DistributedEvent @event);
+
+ Task PublishThroughEventBusAsync(DistributedEvent @event);
+ }
+}