使用 Dapr 实现分布式事件总线

This commit is contained in:
hello 2024-09-19 20:57:51 +08:00
parent dbfb59f461
commit 385b09d482
5 changed files with 175 additions and 0 deletions

View File

@ -0,0 +1,29 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using Dapr;
using System.Text.Json.Serialization;
namespace HelloShop.ServiceDefaults.DistributedEvents.DaprBuildingBlocks
{
public class DaprCloudEvent<TData>(TData data) : CloudEvent<TData>(data)
{
/// <summary>
/// CloudEvent 'pubsubname' attribute.
/// </summary>
[JsonPropertyName("pubsubname")]
public required string PubSubName { get; init; }
/// <summary>
/// CloudEvent 'topic' attribute.
/// </summary>
[JsonPropertyName("topic")]
public required string Topic { get; init; }
/// <summary>
/// CloudEvent 'time' attribute.
/// </summary>
[JsonPropertyName("time")]
public required DateTimeOffset Time { get; init; }
}
}

View File

@ -0,0 +1,25 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using Dapr.Client;
using HelloShop.ServiceDefaults.DistributedEvents.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace HelloShop.ServiceDefaults.DistributedEvents.DaprBuildingBlocks
{
public class DaprDistributedEventBus(DaprClient daprClient, IOptions<DaprDistributedEventBusOptions> options, ILogger<DaprDistributedEventBus> logger) : IDistributedEventBus
{
public async Task PublishAsync(DistributedEvent @event, CancellationToken cancellationToken = default)
{
string pubSubName = options.Value.PubSubName;
string topicName = @event.GetType().Name;
logger.LogInformation("Publishing event {@Event} to {PubsubName}.{TopicName}", @event, pubSubName, topicName);
object? data = Convert.ChangeType(@event, @event.GetType());
await daprClient.PublishEventAsync(pubSubName, topicName, data, cancellationToken);
}
}
}

View File

@ -0,0 +1,104 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
using FluentValidation;
using FluentValidation.Results;
using HelloShop.ServiceDefaults.DistributedEvents.Abstractions;
using Microsoft.AspNetCore.Authentication;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using System.Text.Json;
namespace HelloShop.ServiceDefaults.DistributedEvents.DaprBuildingBlocks
{
public static class DaprDistributedEventBusExtensions
{
private const string DefaultSectionName = "DaprDistributedEventBus";
public static IDistributedEventBusBuilder AddDaprDistributedEventBus(this IHostApplicationBuilder builder, string sectionName = DefaultSectionName)
{
ArgumentNullException.ThrowIfNull(builder);
builder.Services.AddDaprClient();
DaprDistributedEventBusOptions daprOptions = new();
builder.Configuration.GetSection(sectionName).Bind(daprOptions);
builder.Services.AddSingleton(Options.Create(daprOptions));
builder.Services.AddSingleton<IDistributedEventBus, DaprDistributedEventBus>();
if (daprOptions.RequireAuthenticatedDaprApiToken)
{
builder.Services.AddAuthentication().AddDapr();
builder.Services.Configure<AuthorizationOptions>(options => options.AddDapr());
}
return new DistributedEventBusBuilder(builder.Services);
}
private class DistributedEventBusBuilder(IServiceCollection services) : IDistributedEventBusBuilder
{
public IServiceCollection Services => services;
}
public static WebApplication MapDaprDistributedEventBus(this WebApplication app)
{
ArgumentNullException.ThrowIfNull(app);
var eventBusOptions = app.Services.GetRequiredService<IOptions<DistributedEventBusOptions>>().Value;
var daprEventBusOptions = app.Services.GetRequiredService<IOptions<DaprDistributedEventBusOptions>>().Value;
RouteHandlerBuilder routeHandler = app.MapPost($"/api/DistributedEvents", async (DaprCloudEvent<JsonElement> cloudEvent, IHttpContextAccessor contextAccessor) =>
{
var httpContext = contextAccessor.HttpContext ?? throw new InvalidOperationException("HTTP context not available.");
if (string.IsNullOrWhiteSpace(cloudEvent.Topic) || !eventBusOptions.EventTypes.TryGetValue(cloudEvent.Topic, out Type? eventType) || eventType is null)
{
return Results.NotFound();
}
var jsonOptions = httpContext.RequestServices.GetRequiredService<IOptions<Microsoft.AspNetCore.Mvc.JsonOptions>>().Value;
if (JsonSerializer.Deserialize(cloudEvent.Data.GetRawText(), eventType, jsonOptions.JsonSerializerOptions) is not DistributedEvent @event)
{
return Results.BadRequest();
}
if (httpContext.RequestServices.GetService(typeof(IValidator<>).MakeGenericType(eventType)) is IValidator validator)
{
ValidationResult validationResult = await validator.ValidateAsync(new ValidationContext<DistributedEvent>(@event));
if (!validationResult.IsValid)
{
return Results.ValidationProblem(validationResult.ToDictionary());
}
}
foreach (var handler in httpContext.RequestServices.GetKeyedServices<IDistributedEventHandler>(eventType))
{
await handler.HandleAsync(@event);
}
return Results.Ok();
}).WithTags(nameof(DistributedEvent));
foreach (var subscription in eventBusOptions.EventTypes)
{
routeHandler.WithTopic(daprEventBusOptions.PubSubName, subscription.Key, enableRawPayload: false);
}
app.MapSubscribeHandler();
return app;
}
}
}

View File

@ -0,0 +1,16 @@
// Copyright (c) HelloShop Corporation. All rights reserved.
// See the license file in the project root for more information.
namespace HelloShop.ServiceDefaults.DistributedEvents.DaprBuildingBlocks
{
public class DaprDistributedEventBusOptions
{
public const string SectionName = "DaprDistributedEventBus";
public string PubSubName { get; set; } = "event-bus-pubsub";
public bool RequireAuthenticatedDaprApiToken { get; set; } = false;
public string? DaprApiToken { get; set; }
}
}

View File

@ -7,6 +7,7 @@
</PropertyGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Dapr.AspNetCore" Version="1.14.0" />
<PackageReference Include="Microsoft.Extensions.Http.Resilience" Version="8.7.0" />
<PackageReference Include="Microsoft.Extensions.ServiceDiscovery" Version="8.0.2" />
<PackageReference Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.9.0" />