事务性发件箱模式

This commit is contained in:
hello 2025-03-28 22:27:12 +08:00
parent 8da853a351
commit f007572e7f
30 changed files with 978 additions and 135 deletions

View File

@ -29,9 +29,11 @@
<PackageVersion Include="Microsoft.EntityFrameworkCore" Version="9.0.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Design" Version="9.0.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.InMemory" Version="9.0.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Relational" Version="9.0.3" />
<PackageVersion Include="Microsoft.EntityFrameworkCore.Sqlite" Version="9.0.3" />
<PackageVersion Include="Microsoft.Extensions.Caching.StackExchangeRedis" Version="9.0.2" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.3" />
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.3" />
<PackageVersion Include="Microsoft.Extensions.Http.Resilience" Version="9.3.0" />
<PackageVersion Include="Microsoft.Extensions.Logging.Debug" Version="9.0.3" />
<PackageVersion Include="Microsoft.Extensions.Options" Version="9.0.3" />

View File

@ -45,6 +45,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.DistributedLock",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.DistributedLock.Dapr", "libraries\HelloShop.DistributedLock.Dapr\HelloShop.DistributedLock.Dapr.csproj", "{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloShop.EventBus.Logging", "libraries\HelloShop.EventBus.Logging\HelloShop.EventBus.Logging.csproj", "{ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -125,6 +127,10 @@ Global
{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{37F01A0B-50D6-48BA-8A20-FBADB5524CD7}.Release|Any CPU.Build.0 = Release|Any CPU
{ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -148,6 +154,7 @@ Global
{5D403BF6-9267-4B0F-AEB3-2143C8BBA8CD} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
{86729635-8E31-4C53-81AE-7C410C848219} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
{37F01A0B-50D6-48BA-8A20-FBADB5524CD7} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
{ACBA2FA9-ADBB-4FCD-A303-6D8D4B2131AA} = {02EA681E-C7D8-13C7-8484-4AC65E1B71E8}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {845545A8-2006-46C3-ABD7-5BDF63F3858C}

View File

@ -4,7 +4,7 @@
using HelloShop.EventBus.Abstractions;
using System.Diagnostics.CodeAnalysis;
namespace HelloShop.OrderingService.Entities.EventLogs
namespace HelloShop.EventBus.Logging
{
public enum DistributedEventStatus { NotPublished, InProgress, Published, PublishedFailed }
@ -20,7 +20,7 @@ namespace HelloShop.OrderingService.Entities.EventLogs
public int TimesSent { get; set; }
public DateTimeOffset CreationTime { get; set; } = DateTimeOffset.UtcNow;
public DateTimeOffset CreationTime { get; init; } = TimeProvider.System.GetUtcNow();
public required Guid TransactionId { get; set; }

View File

@ -0,0 +1,20 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using System.Diagnostics.CodeAnalysis;
namespace HelloShop.EventBus.Logging
{
public static class DistributedEventLogExtensions
{
public static void UseDistributedEventLogs(this ModelBuilder builder) => builder.ApplyConfiguration(new EventLogEntityTypeConfiguration());
public static IServiceCollection AddDistributedEventLogs<TContext>([NotNull] this IServiceCollection services) where TContext : DbContext
{
services.AddTransient<IDistributedEventLogService, DistributedEventLogService<TContext>>().AddHostedService<DistributedEventWorker>();
return services;
}
}
}

View File

@ -0,0 +1,51 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
namespace HelloShop.EventBus.Logging
{
public class DistributedEventLogService<TContext>(TContext dbContext) : IDistributedEventLogService where TContext : DbContext
{
public async Task<IEnumerable<DistributedEventLog>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default)
{
return await dbContext.Set<DistributedEventLog>().Where(e => e.TransactionId == transactionId && e.Status == DistributedEventStatus.NotPublished).ToListAsync(cancellationToken: cancellationToken);
}
public async Task<IEnumerable<DistributedEventLog>> RetrieveEventLogsFailedToPublishAsync(CancellationToken cancellationToken = default)
{
return await dbContext.Set<DistributedEventLog>().Where(e => e.Status == DistributedEventStatus.PublishedFailed).ToListAsync(cancellationToken);
}
public async Task SaveEventAsync(DistributedEvent @event, IDbContextTransaction transaction, CancellationToken cancellationToken = default)
{
var eventLog = new DistributedEventLog(@event, transaction.TransactionId);
dbContext.Database.UseTransaction(transaction.GetDbTransaction());
await dbContext.Set<DistributedEventLog>().AddAsync(eventLog, cancellationToken);
await dbContext.SaveChangesAsync(cancellationToken);
}
public async Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default)
{
var eventLogEntry = dbContext.Set<DistributedEventLog>().Single(ie => ie.EventId == eventId);
eventLogEntry.Status = status;
if (status == DistributedEventStatus.InProgress)
{
eventLogEntry.TimesSent++;
}
await dbContext.SaveChangesAsync(cancellationToken);
}
public Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.Published, cancellationToken);
public Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.InProgress, cancellationToken);
public Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default) => UpdateEventStatusAsync(eventId, DistributedEventStatus.PublishedFailed, cancellationToken);
}
}

View File

@ -0,0 +1,54 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
namespace HelloShop.EventBus.Logging
{
public class DistributedEventWorker(IServiceScopeFactory serviceScopeFactory) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
using var scope = serviceScopeFactory.CreateScope();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
var eventLogService = scope.ServiceProvider.GetRequiredService<IDistributedEventLogService>();
var logger = scope.ServiceProvider.GetRequiredService<ILogger<DistributedEventWorker>>();
try
{
var failedEventLogs = await eventLogService.RetrieveEventLogsFailedToPublishAsync(stoppingToken);
foreach (var eventLog in failedEventLogs)
{
DistributedEvent @event = eventLog.DistributedEvent;
try
{
await eventLogService.MarkEventAsInProgressAsync(@event.Id, stoppingToken);
await eventBus.PublishAsync(@event, stoppingToken);
await eventLogService.MarkEventAsPublishedAsync(@event.Id, stoppingToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Publish through event bus failed for {EventId}.", @event.Id);
await eventLogService.MarkEventAsFailedAsync(@event.Id, stoppingToken);
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "An error occurred while retrieving failed event logs.");
}
await Task.Delay(TimeSpan.FromSeconds(30), stoppingToken);
}
}
}
}

View File

@ -2,24 +2,23 @@
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
using HelloShop.OrderingService.Entities.EventLogs;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using System.Text.Json;
namespace HelloShop.OrderingService.Infrastructure.EntityConfigurations.EventLogs
namespace HelloShop.EventBus.Logging
{
public class DistributedEventLogEntityTypeConfiguration : IEntityTypeConfiguration<DistributedEventLog>
public class EventLogEntityTypeConfiguration : IEntityTypeConfiguration<DistributedEventLog>
{
private static readonly JsonSerializerOptions s_jsonSerializerOptions = new(JsonSerializerDefaults.Web);
public void Configure(EntityTypeBuilder<DistributedEventLog> builder)
{
builder.HasKey(x => x.EventId);
builder.Property(x => x.EventTypeName).HasMaxLength(32);
builder.Property(x => x.EventTypeName).HasMaxLength(64);
builder.Property(x => x.Status).HasConversion<string>();
builder.Property(x => x.DistributedEvent).HasConversion(v => JsonSerializer.Serialize(v, v.GetType(), s_jsonSerializerOptions), v => JsonSerializer.Deserialize<DistributedEvent>(v, s_jsonSerializerOptions)!);
}
}
}
}

View File

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\HelloShop.EventBus.Abstractions\HelloShop.EventBus.Abstractions.csproj" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,23 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
using Microsoft.EntityFrameworkCore.Storage;
namespace HelloShop.EventBus.Logging
{
public interface IDistributedEventLogService
{
Task<IEnumerable<DistributedEventLog>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default);
Task<IEnumerable<DistributedEventLog>> RetrieveEventLogsFailedToPublishAsync(CancellationToken cancellationToken = default);
Task SaveEventAsync(DistributedEvent @event, IDbContextTransaction transaction, CancellationToken cancellationToken = default);
Task MarkEventAsPublishedAsync(Guid eventId, CancellationToken cancellationToken = default);
Task MarkEventAsInProgressAsync(Guid eventId, CancellationToken cancellationToken = default);
Task MarkEventAsFailedAsync(Guid eventId, CancellationToken cancellationToken = default);
}
}

View File

@ -0,0 +1,23 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using Microsoft.EntityFrameworkCore;
namespace HelloShop.EventBus.Logging
{
public class ResilientTransaction(DbContext dbContext)
{
public static ResilientTransaction New(DbContext context) => new(context);
public async Task ExecuteAsync(Func<Task> action)
{
var strategy = dbContext.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(async () =>
{
await using var transaction = await dbContext.Database.BeginTransactionAsync();
await action();
await transaction.CommitAsync();
});
}
}
}

View File

@ -7,7 +7,7 @@ namespace HelloShop.EventBus.RabbitMQ
{
public string ExchangeName { get; set; } = "event-bus-exchange";
public required string QueueName { get; set; }
public required string QueueName { get; set; } = "event-bus-queue";
public int RetryCount { get; set; } = 10;
}

View File

@ -41,7 +41,7 @@ namespace HelloShop.OrderingService.Behaviors
_logger.LogInformation("Commit transaction {TransactionId} for {CommandName}", transaction.TransactionId, typeName);
}
await dbContext.SaveChangesAsync(cancellationToken);
await transaction.CommitAsync();
await eventService.PublishEventsThroughEventBusAsync(transaction.TransactionId, cancellationToken);

View File

@ -3,6 +3,7 @@
using HelloShop.EventBus.Abstractions;
using HelloShop.EventBus.Dapr;
using HelloShop.EventBus.Logging;
using HelloShop.OrderingService.Behaviors;
using HelloShop.OrderingService.Constants;
using HelloShop.OrderingService.Infrastructure;
@ -67,7 +68,7 @@ namespace HelloShop.OrderingService.Extensions
builder.Services.AddHostedService<GracePeriodWorker>();
builder.Services.AddHostedService<PaymentWorker>();
builder.Services.AddTransient<IDistributedEventService, DistributedEventService<OrderingServiceDbContext>>();
builder.Services.AddDistributedEventLogs<OrderingServiceDbContext>().AddTransient<IDistributedEventService, DistributedEventService>();
builder.Services.AddOpenApi();

View File

@ -6,6 +6,7 @@
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\libraries\HelloShop.EventBus.Dapr\HelloShop.EventBus.Dapr.csproj" />
<ProjectReference Include="..\..\libraries\HelloShop.EventBus.Logging\HelloShop.EventBus.Logging.csproj" />
<ProjectReference Include="..\HelloShop.ServiceDefaults\HelloShop.ServiceDefaults.csproj" />
</ItemGroup>
<ItemGroup>

View File

@ -1,9 +1,12 @@
@HelloShop.OrderingService_HostAddress = https://localhost:8105
@HelloWorld.OrderingService_HostAddress = https://localhost:8105
@AccessToken = eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3NDI4Njg1ODAsImV4cCI6MTc2NDkwMDU4MCwiaWF0IjoxNzQyODY4NTgwfQ.6Wm6S4CNtKi9lGqxam4_ZnDebnTXVxycDubbv0DLy2c
POST {{HelloShop.OrderingService_HostAddress}}/api/orders
###
POST {{HelloWorld.OrderingService_HostAddress}}/api/orders
Content-Type: application/json
x-request-id: 8ddc1737-6a31-4b4f-bccd-63d0966348a1
Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4
x-request-id: 1ddc1737-6a32-4b4f-bccd-63d0966348a3
Authorization : Bearer {{AccessToken}}
{
"country": "USA",
@ -36,12 +39,20 @@ Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwi
###
GET {{HelloShop.OrderingService_HostAddress}}/api/orders
GET {{HelloWorld.OrderingService_HostAddress}}/api/orders
Content-Type: application/json
Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4
Authorization : Bearer {{AccessToken}}
###
GET {{HelloShop.OrderingService_HostAddress}}/api/orders/1
GET {{HelloWorld.OrderingService_HostAddress}}/api/orders/1
Content-Type: application/json
Authorization : Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJuYW1laWQiOiIxIiwidW5pcXVlX25hbWUiOiJhZG1pbiIsInJvbGVpZCI6IjEiLCJuYmYiOjE3MjA1NzY3NDYsImV4cCI6MTc0MjYwODc0NiwiaWF0IjoxNzIwNTc2NzQ2fQ.ju_D3zeGLKqJYVckbb8Y3yNkp40nOqRAJrdOsISs4d4
Authorization : Bearer {{AccessToken}}
###
PUT {{HelloWorld.OrderingService_HostAddress}}/api/orders/ship/1
Content-Type: application/json
x-request-id: 1ddc1737-6a32-4b4f-bccd-63d0966348a1
Authorization : Bearer {{AccessToken}}

View File

@ -0,0 +1,347 @@
// <auto-generated />
using System;
using HelloShop.OrderingService.Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace HelloShop.OrderingService.Infrastructure.Migrations
{
[DbContext(typeof(OrderingServiceDbContext))]
[Migration("20250328130944_AddEventLogTable")]
partial class AddEventLogTable
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "9.0.3")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b =>
{
b.Property<Guid>("EventId")
.ValueGeneratedOnAdd()
.HasColumnType("uuid")
.HasColumnName("event_id");
b.Property<DateTimeOffset>("CreationTime")
.HasColumnType("timestamp with time zone")
.HasColumnName("creation_time");
b.Property<string>("DistributedEvent")
.IsRequired()
.HasColumnType("text")
.HasColumnName("distributed_event");
b.Property<string>("EventTypeName")
.IsRequired()
.HasMaxLength(64)
.HasColumnType("character varying(64)")
.HasColumnName("event_type_name");
b.Property<string>("Status")
.IsRequired()
.HasColumnType("text")
.HasColumnName("status");
b.Property<int>("TimesSent")
.HasColumnType("integer")
.HasColumnName("times_sent");
b.Property<Guid>("TransactionId")
.HasColumnType("uuid")
.HasColumnName("transaction_id");
b.HasKey("EventId")
.HasName("pk_distributed_event_log");
b.ToTable("distributed_event_log", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer")
.HasColumnName("id");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(16)
.HasColumnType("character varying(16)")
.HasColumnName("name");
b.HasKey("Id")
.HasName("pk_buyer");
b.ToTable("buyer", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer")
.HasColumnName("id");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<string>("Alias")
.IsRequired()
.HasMaxLength(16)
.HasColumnType("character varying(16)")
.HasColumnName("alias");
b.Property<int>("BuyerId")
.HasColumnType("integer")
.HasColumnName("buyer_id");
b.Property<string>("CardHolderName")
.IsRequired()
.HasMaxLength(16)
.HasColumnType("character varying(16)")
.HasColumnName("card_holder_name");
b.Property<string>("CardNumber")
.IsRequired()
.HasMaxLength(16)
.HasColumnType("character varying(16)")
.HasColumnName("card_number");
b.Property<DateTimeOffset?>("Expiration")
.HasColumnType("timestamp with time zone")
.HasColumnName("expiration");
b.Property<string>("SecurityNumber")
.HasMaxLength(6)
.HasColumnType("character varying(6)")
.HasColumnName("security_number");
b.HasKey("Id")
.HasName("pk_payment_method");
b.HasIndex("BuyerId")
.HasDatabaseName("ix_payment_method_buyer_id");
b.ToTable("payment_method", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Idempotency.ClientRequest", b =>
{
b.Property<Guid>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("uuid")
.HasColumnName("id");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(64)
.HasColumnType("character varying(64)")
.HasColumnName("name");
b.Property<DateTimeOffset>("Time")
.HasColumnType("timestamp with time zone")
.HasColumnName("time");
b.HasKey("Id")
.HasName("pk_client_request");
b.ToTable("client_request", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer")
.HasColumnName("id");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<int>("BuyerId")
.HasColumnType("integer")
.HasColumnName("buyer_id");
b.Property<string>("Description")
.HasMaxLength(64)
.HasColumnType("character varying(64)")
.HasColumnName("description");
b.Property<DateTimeOffset>("OrderDate")
.HasColumnType("timestamp with time zone")
.HasColumnName("order_date");
b.Property<string>("OrderStatus")
.IsRequired()
.HasColumnType("text")
.HasColumnName("order_status");
b.Property<int?>("PaymentMethodId")
.HasColumnType("integer")
.HasColumnName("payment_method_id");
b.HasKey("Id")
.HasName("pk_order");
b.HasIndex("BuyerId")
.HasDatabaseName("ix_order_buyer_id");
b.HasIndex("PaymentMethodId")
.HasDatabaseName("ix_order_payment_method_id");
b.ToTable("order", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.OrderItem", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer")
.HasColumnName("id");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<int>("OrderId")
.HasColumnType("integer")
.HasColumnName("order_id");
b.Property<string>("PictureUrl")
.IsRequired()
.HasMaxLength(256)
.HasColumnType("character varying(256)")
.HasColumnName("picture_url");
b.Property<int>("ProductId")
.HasColumnType("integer")
.HasColumnName("product_id");
b.Property<string>("ProductName")
.IsRequired()
.HasMaxLength(16)
.HasColumnType("character varying(16)")
.HasColumnName("product_name");
b.Property<decimal>("UnitPrice")
.HasColumnType("numeric")
.HasColumnName("unit_price");
b.Property<int>("Units")
.HasColumnType("integer")
.HasColumnName("units");
b.HasKey("Id")
.HasName("pk_order_item");
b.HasIndex("OrderId")
.HasDatabaseName("ix_order_item_order_id");
b.ToTable("order_item", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", b =>
{
b.HasOne("HelloShop.OrderingService.Entities.Buyers.Buyer", null)
.WithMany("PaymentMethods")
.HasForeignKey("BuyerId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired()
.HasConstraintName("fk_payment_method_buyer_buyer_id");
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b =>
{
b.HasOne("HelloShop.OrderingService.Entities.Buyers.Buyer", null)
.WithMany()
.HasForeignKey("BuyerId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired()
.HasConstraintName("fk_order_buyer_buyer_id");
b.HasOne("HelloShop.OrderingService.Entities.Buyers.PaymentMethod", null)
.WithMany()
.HasForeignKey("PaymentMethodId")
.OnDelete(DeleteBehavior.Restrict)
.HasConstraintName("fk_order_payment_method_payment_method_id");
b.OwnsOne("HelloShop.OrderingService.Entities.Orders.Address", "Address", b1 =>
{
b1.Property<int>("OrderId")
.HasColumnType("integer")
.HasColumnName("id");
b1.Property<string>("City")
.IsRequired()
.HasMaxLength(16)
.HasColumnType("character varying(16)")
.HasColumnName("City");
b1.Property<string>("Country")
.IsRequired()
.HasMaxLength(8)
.HasColumnType("character varying(8)")
.HasColumnName("Country");
b1.Property<string>("State")
.IsRequired()
.HasMaxLength(16)
.HasColumnType("character varying(16)")
.HasColumnName("State");
b1.Property<string>("Street")
.IsRequired()
.HasMaxLength(32)
.HasColumnType("character varying(32)")
.HasColumnName("Street");
b1.Property<string>("ZipCode")
.IsRequired()
.HasMaxLength(6)
.HasColumnType("character varying(6)")
.HasColumnName("ZipCode");
b1.HasKey("OrderId");
b1.ToTable("order");
b1.WithOwner()
.HasForeignKey("OrderId")
.HasConstraintName("fk_order_order_id");
});
b.Navigation("Address")
.IsRequired();
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.OrderItem", b =>
{
b.HasOne("HelloShop.OrderingService.Entities.Orders.Order", null)
.WithMany("OrderItems")
.HasForeignKey("OrderId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired()
.HasConstraintName("fk_order_item_order_order_id");
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b =>
{
b.Navigation("PaymentMethods");
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Orders.Order", b =>
{
b.Navigation("OrderItems");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,41 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace HelloShop.OrderingService.Infrastructure.Migrations
{
/// <inheritdoc />
public partial class AddEventLogTable : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.AlterColumn<string>(
name: "event_type_name",
table: "distributed_event_log",
type: "character varying(64)",
maxLength: 64,
nullable: false,
oldClrType: typeof(string),
oldType: "character varying(32)",
oldMaxLength: 32);
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.AlterColumn<string>(
name: "event_type_name",
table: "distributed_event_log",
type: "character varying(32)",
maxLength: 32,
nullable: false,
oldClrType: typeof(string),
oldType: "character varying(64)",
oldMaxLength: 64);
}
}
}

View File

@ -22,6 +22,47 @@ namespace HelloShop.OrderingService.Infrastructure.Migrations
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b =>
{
b.Property<Guid>("EventId")
.ValueGeneratedOnAdd()
.HasColumnType("uuid")
.HasColumnName("event_id");
b.Property<DateTimeOffset>("CreationTime")
.HasColumnType("timestamp with time zone")
.HasColumnName("creation_time");
b.Property<string>("DistributedEvent")
.IsRequired()
.HasColumnType("text")
.HasColumnName("distributed_event");
b.Property<string>("EventTypeName")
.IsRequired()
.HasMaxLength(64)
.HasColumnType("character varying(64)")
.HasColumnName("event_type_name");
b.Property<string>("Status")
.IsRequired()
.HasColumnType("text")
.HasColumnName("status");
b.Property<int>("TimesSent")
.HasColumnType("integer")
.HasColumnName("times_sent");
b.Property<Guid>("TransactionId")
.HasColumnType("uuid")
.HasColumnName("transaction_id");
b.HasKey("EventId")
.HasName("pk_distributed_event_log");
b.ToTable("distributed_event_log", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Buyers.Buyer", b =>
{
b.Property<int>("Id")
@ -92,47 +133,6 @@ namespace HelloShop.OrderingService.Infrastructure.Migrations
b.ToTable("payment_method", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.EventLogs.DistributedEventLog", b =>
{
b.Property<Guid>("EventId")
.ValueGeneratedOnAdd()
.HasColumnType("uuid")
.HasColumnName("event_id");
b.Property<DateTimeOffset>("CreationTime")
.HasColumnType("timestamp with time zone")
.HasColumnName("creation_time");
b.Property<string>("DistributedEvent")
.IsRequired()
.HasColumnType("text")
.HasColumnName("distributed_event");
b.Property<string>("EventTypeName")
.IsRequired()
.HasMaxLength(32)
.HasColumnType("character varying(32)")
.HasColumnName("event_type_name");
b.Property<string>("Status")
.IsRequired()
.HasColumnType("text")
.HasColumnName("status");
b.Property<int>("TimesSent")
.HasColumnType("integer")
.HasColumnName("times_sent");
b.Property<Guid>("TransactionId")
.HasColumnType("uuid")
.HasColumnName("transaction_id");
b.HasKey("EventId")
.HasName("pk_distributed_event_log");
b.ToTable("distributed_event_log", (string)null);
});
modelBuilder.Entity("HelloShop.OrderingService.Entities.Idempotency.ClientRequest", b =>
{
b.Property<Guid>("Id")

View File

@ -1,6 +1,7 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using HelloShop.EventBus.Logging;
using Microsoft.EntityFrameworkCore;
using System.Reflection;
@ -13,6 +14,7 @@ namespace HelloShop.OrderingService.Infrastructure
base.OnModelCreating(builder);
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
builder.UseDistributedEventLogs();
}
}
}

View File

@ -2,88 +2,42 @@
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
using HelloShop.OrderingService.Entities.EventLogs;
using Microsoft.EntityFrameworkCore;
using HelloShop.EventBus.Logging;
using HelloShop.OrderingService.Infrastructure;
namespace HelloShop.OrderingService.Services
{
public class DistributedEventService<TContext>(TContext dbContext, IEventBus distributedEventBus, ILogger<DistributedEventService<TContext>> logger) : IDistributedEventService, IDisposable where TContext : DbContext
public class DistributedEventService(ILogger<DistributedEventService> logger, IEventBus eventBus, OrderingServiceDbContext dbContext, IDistributedEventLogService eventLogService) : IDistributedEventService
{
private volatile bool _disposedValue;
public async Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default)
public async Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default)
{
var eventLogEntry = dbContext.Set<DistributedEventLog>().Single(ie => ie.EventId == eventId);
var pendingLogEvents = await eventLogService.RetrieveEventLogsPendingToPublishAsync(transactionId, cancellationToken);
eventLogEntry.Status = status;
if (status == DistributedEventStatus.InProgress)
foreach (var eventLog in pendingLogEvents)
{
eventLogEntry.TimesSent++;
logger.LogInformation("Publishing distributed event: {EventId} from {EventType}.", eventLog.EventId, eventLog.EventTypeName);
try
{
await eventLogService.MarkEventAsInProgressAsync(eventLog.EventId, cancellationToken);
await eventBus.PublishAsync(eventLog.DistributedEvent, cancellationToken);
await eventLogService.MarkEventAsPublishedAsync(eventLog.EventId, cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Error publishing distributed event: {EventId}.", eventLog.EventId);
await eventLogService.MarkEventAsFailedAsync(eventLog.EventId, cancellationToken);
}
}
await dbContext.SaveChangesAsync(cancellationToken);
}
public async Task<IEnumerable<DistributedEventLog>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default)
{
var result = await dbContext.Set<DistributedEventLog>().Where(e => e.TransactionId == transactionId && e.Status == DistributedEventStatus.NotPublished).ToListAsync(cancellationToken: cancellationToken);
return result.Count != 0 ? result.OrderBy(o => o.CreationTime) : [];
}
public async Task AddAndSaveEventAsync(DistributedEvent @event, CancellationToken cancellationToken = default)
{
var transaction = dbContext.Database.CurrentTransaction ?? throw new InvalidOperationException("This method must be called within a transaction scope.");
logger.LogInformation("Creating and saving distributed event: {EventId} from {EventType}.", @event.Id, @event.GetType().Name);
var eventLog = new DistributedEventLog(@event, transaction.TransactionId);
var transaction = dbContext.Database.CurrentTransaction ?? await dbContext.Database.BeginTransactionAsync(cancellationToken);
await dbContext.AddAsync(eventLog, cancellationToken);
await dbContext.SaveChangesAsync(cancellationToken);
}
public async Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default)
{
var pendingEventLogs = await RetrieveEventLogsPendingToPublishAsync(transactionId, cancellationToken);
foreach (var eventLog in pendingEventLogs)
{
logger.LogInformation("Publishing integration event {EventId} {DistributedEvent}", eventLog.EventId, eventLog.DistributedEvent);
try
{
await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.InProgress, cancellationToken);
await distributedEventBus.PublishAsync(eventLog.DistributedEvent, cancellationToken);
await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.Published, cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Error publishing distributed event {EventId}", eventLog.EventId);
await UpdateEventStatusAsync(eventLog.EventId, DistributedEventStatus.PublishedFailed, cancellationToken);
}
}
}
protected virtual void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
dbContext.Dispose();
}
_disposedValue = true;
}
}
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
await eventLogService.SaveEventAsync(@event, transaction, cancellationToken);
}
}
}

View File

@ -2,18 +2,13 @@
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
using HelloShop.OrderingService.Entities.EventLogs;
namespace HelloShop.OrderingService.Services
{
public interface IDistributedEventService
{
Task<IEnumerable<DistributedEventLog>> RetrieveEventLogsPendingToPublishAsync(Guid transactionId, CancellationToken cancellationToken = default);
Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default);
Task AddAndSaveEventAsync(DistributedEvent @event, CancellationToken cancellationToken = default);
Task UpdateEventStatusAsync(Guid eventId, DistributedEventStatus status, CancellationToken cancellationToken = default);
Task PublishEventsThroughEventBusAsync(Guid transactionId, CancellationToken cancellationToken = default);
}
}

View File

@ -6,10 +6,11 @@ using HelloShop.EventBus.Abstractions;
using HelloShop.ProductService.DistributedEvents.Events;
using HelloShop.ProductService.Entities.Products;
using HelloShop.ProductService.Infrastructure;
using HelloShop.ProductService.Services;
namespace HelloShop.ProductService.DistributedEvents.EventHandling
{
public class OrderAwaitingValidationDistributedEventHandler(ProductServiceDbContext dbContext, IEventBus distributedEventBus, IDistributedLock distributedLock, ILogger<OrderAwaitingValidationDistributedEventHandler> logger) : IDistributedEventHandler<OrderAwaitingValidationDistributedEvent>
public class OrderAwaitingValidationDistributedEventHandler(ProductServiceDbContext dbContext, IDistributedEventService distributedEventService, IDistributedLock distributedLock, ILogger<OrderAwaitingValidationDistributedEventHandler> logger) : IDistributedEventHandler<OrderAwaitingValidationDistributedEvent>
{
public async Task HandleAsync(OrderAwaitingValidationDistributedEvent @event)
{
@ -30,7 +31,8 @@ namespace HelloShop.ProductService.DistributedEvents.EventHandling
DistributedEvent confirmedEvent = confirmedOrderStockItems.All(c => c.Value) ? new OrderStockConfirmedDistributedEvent(@event.OrderId) : new OrderStockRejectedDistributedEvent(@event.OrderId);
await distributedEventBus.PublishAsync(confirmedEvent);
await distributedEventService.SaveEventAndDbContextChangesAsync(confirmedEvent);
await distributedEventService.PublishThroughEventBusAsync(confirmedEvent);
}
}
}

View File

@ -7,6 +7,7 @@
<ItemGroup>
<ProjectReference Include="..\..\libraries\HelloShop.DistributedLock.Dapr\HelloShop.DistributedLock.Dapr.csproj" />
<ProjectReference Include="..\..\libraries\HelloShop.EventBus.Dapr\HelloShop.EventBus.Dapr.csproj" />
<ProjectReference Include="..\..\libraries\HelloShop.EventBus.Logging\HelloShop.EventBus.Logging.csproj" />
<ProjectReference Include="..\HelloShop.ServiceDefaults\HelloShop.ServiceDefaults.csproj" />
</ItemGroup>
<ItemGroup>

View File

@ -0,0 +1,155 @@
// <auto-generated />
using System;
using HelloShop.ProductService.Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Infrastructure;
using Microsoft.EntityFrameworkCore.Migrations;
using Microsoft.EntityFrameworkCore.Storage.ValueConversion;
using Npgsql.EntityFrameworkCore.PostgreSQL.Metadata;
#nullable disable
namespace HelloShop.ProductService.Infrastructure.Migrations
{
[DbContext(typeof(ProductServiceDbContext))]
[Migration("20250328131028_AddEventLogTable")]
partial class AddEventLogTable
{
/// <inheritdoc />
protected override void BuildTargetModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "9.0.3")
.HasAnnotation("Relational:MaxIdentifierLength", 63);
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b =>
{
b.Property<Guid>("EventId")
.ValueGeneratedOnAdd()
.HasColumnType("uuid")
.HasColumnName("event_id");
b.Property<DateTimeOffset>("CreationTime")
.HasColumnType("timestamp with time zone")
.HasColumnName("creation_time");
b.Property<string>("DistributedEvent")
.IsRequired()
.HasColumnType("text")
.HasColumnName("distributed_event");
b.Property<string>("EventTypeName")
.IsRequired()
.HasMaxLength(64)
.HasColumnType("character varying(64)")
.HasColumnName("event_type_name");
b.Property<string>("Status")
.IsRequired()
.HasColumnType("text")
.HasColumnName("status");
b.Property<int>("TimesSent")
.HasColumnType("integer")
.HasColumnName("times_sent");
b.Property<Guid>("TransactionId")
.HasColumnType("uuid")
.HasColumnName("transaction_id");
b.HasKey("EventId")
.HasName("pk_distributed_event_log");
b.ToTable("distributed_event_log", (string)null);
});
modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Brand", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer")
.HasColumnName("id");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(32)
.HasColumnType("character varying(32)")
.HasColumnName("name");
b.HasKey("Id")
.HasName("pk_brand");
b.ToTable("brand", (string)null);
});
modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Product", b =>
{
b.Property<int>("Id")
.ValueGeneratedOnAdd()
.HasColumnType("integer")
.HasColumnName("id");
NpgsqlPropertyBuilderExtensions.UseIdentityByDefaultColumn(b.Property<int>("Id"));
b.Property<int>("AvailableStock")
.HasColumnType("integer")
.HasColumnName("available_stock");
b.Property<int>("BrandId")
.HasColumnType("integer")
.HasColumnName("brand_id");
b.Property<DateTimeOffset>("CreationTime")
.HasColumnType("timestamp with time zone")
.HasColumnName("creation_time");
b.Property<string>("Description")
.IsRequired()
.HasColumnType("text")
.HasColumnName("description");
b.Property<string>("ImageUrl")
.IsRequired()
.HasMaxLength(256)
.HasColumnType("character varying(256)")
.HasColumnName("image_url");
b.Property<string>("Name")
.IsRequired()
.HasMaxLength(64)
.HasColumnType("character varying(64)")
.HasColumnName("name");
b.Property<decimal>("Price")
.HasColumnType("numeric")
.HasColumnName("price");
b.HasKey("Id")
.HasName("pk_product");
b.HasIndex("BrandId")
.HasDatabaseName("ix_product_brand_id");
b.ToTable("product", (string)null);
});
modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Product", b =>
{
b.HasOne("HelloShop.ProductService.Entities.Products.Brand", "Brand")
.WithMany()
.HasForeignKey("BrandId")
.OnDelete(DeleteBehavior.Cascade)
.IsRequired()
.HasConstraintName("fk_product_brand_brand_id");
b.Navigation("Brand");
});
#pragma warning restore 612, 618
}
}
}

View File

@ -0,0 +1,41 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using Microsoft.EntityFrameworkCore.Migrations;
#nullable disable
namespace HelloShop.ProductService.Infrastructure.Migrations
{
/// <inheritdoc />
public partial class AddEventLogTable : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "distributed_event_log",
columns: table => new
{
event_id = table.Column<Guid>(type: "uuid", nullable: false),
event_type_name = table.Column<string>(type: "character varying(64)", maxLength: 64, nullable: false),
distributed_event = table.Column<string>(type: "text", nullable: false),
status = table.Column<string>(type: "text", nullable: false),
times_sent = table.Column<int>(type: "integer", nullable: false),
creation_time = table.Column<DateTimeOffset>(type: "timestamp with time zone", nullable: false),
transaction_id = table.Column<Guid>(type: "uuid", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("pk_distributed_event_log", x => x.event_id);
});
}
/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "distributed_event_log");
}
}
}

View File

@ -22,6 +22,47 @@ namespace HelloShop.ProductService.Infrastructure.Migrations
NpgsqlModelBuilderExtensions.UseIdentityByDefaultColumns(modelBuilder);
modelBuilder.Entity("HelloShop.EventBus.Logging.DistributedEventLog", b =>
{
b.Property<Guid>("EventId")
.ValueGeneratedOnAdd()
.HasColumnType("uuid")
.HasColumnName("event_id");
b.Property<DateTimeOffset>("CreationTime")
.HasColumnType("timestamp with time zone")
.HasColumnName("creation_time");
b.Property<string>("DistributedEvent")
.IsRequired()
.HasColumnType("text")
.HasColumnName("distributed_event");
b.Property<string>("EventTypeName")
.IsRequired()
.HasMaxLength(64)
.HasColumnType("character varying(64)")
.HasColumnName("event_type_name");
b.Property<string>("Status")
.IsRequired()
.HasColumnType("text")
.HasColumnName("status");
b.Property<int>("TimesSent")
.HasColumnType("integer")
.HasColumnName("times_sent");
b.Property<Guid>("TransactionId")
.HasColumnType("uuid")
.HasColumnName("transaction_id");
b.HasKey("EventId")
.HasName("pk_distributed_event_log");
b.ToTable("distributed_event_log", (string)null);
});
modelBuilder.Entity("HelloShop.ProductService.Entities.Products.Brand", b =>
{
b.Property<int>("Id")

View File

@ -1,6 +1,7 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using HelloShop.EventBus.Logging;
using Microsoft.EntityFrameworkCore;
using System.Reflection;
@ -13,5 +14,6 @@ public class ProductServiceDbContext(DbContextOptions<ProductServiceDbContext> o
base.OnModelCreating(builder);
builder.ApplyConfigurationsFromAssembly(Assembly.GetExecutingAssembly());
builder.UseDistributedEventLogs();
}
}

View File

@ -4,6 +4,7 @@
using HelloShop.DistributedLock.Dapr;
using HelloShop.EventBus.Abstractions;
using HelloShop.EventBus.Dapr;
using HelloShop.EventBus.Logging;
using HelloShop.ProductService.Constants;
using HelloShop.ProductService.Infrastructure;
using HelloShop.ProductService.Services;
@ -47,6 +48,7 @@ builder.Services.AddAuthorization().AddRemotePermissionChecker().AddCustomAuthor
builder.AddDaprEventBus().AddSubscriptionFromAssembly();
builder.Services.AddDaprDistributedLock();
builder.Services.AddSingleton(TimeProvider.System);
builder.Services.AddDistributedEventLogs<ProductServiceDbContext>().AddTransient<IDistributedEventService, DistributedEventService>();
// End addd extensions services to the container.
var app = builder.Build();

View File

@ -0,0 +1,37 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
using HelloShop.EventBus.Logging;
using HelloShop.ProductService.Infrastructure;
namespace HelloShop.ProductService.Services
{
public class DistributedEventService(ILogger<DistributedEventService> logger, IEventBus eventBus, ProductServiceDbContext dbContext, IDistributedEventLogService eventLogService) : IDistributedEventService
{
public async Task PublishThroughEventBusAsync(DistributedEvent @event)
{
try
{
await eventLogService.MarkEventAsInProgressAsync(@event.Id);
await eventBus.PublishAsync(@event);
await eventLogService.MarkEventAsPublishedAsync(@event.Id);
}
catch (Exception ex)
{
logger.LogError(ex, "Publish through event bus failed for {EventId}", @event.Id);
await eventLogService.MarkEventAsFailedAsync(@event.Id);
}
}
public async Task SaveEventAndDbContextChangesAsync(DistributedEvent @event)
{
await ResilientTransaction.New(dbContext).ExecuteAsync(async () =>
{
var transaction = dbContext.Database.CurrentTransaction ?? await dbContext.Database.BeginTransactionAsync();
await dbContext.SaveChangesAsync();
await eventLogService.SaveEventAsync(@event, transaction);
});
}
}
}

View File

@ -0,0 +1,14 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using HelloShop.EventBus.Abstractions;
namespace HelloShop.ProductService.Services
{
public interface IDistributedEventService
{
Task SaveEventAndDbContextChangesAsync(DistributedEvent @event);
Task PublishThroughEventBusAsync(DistributedEvent @event);
}
}