一个看似简单的需求摆在面前:处理用户上传的文档,执行一系列NLP操作——文本提取、情感分析、命名实体识别,最后将结构化结果写入分析数据库并更新索引。这个流程必须是原子性的,要么全部成功,要么就像什么都没发生过一样。在单体应用和单一数据库中,一个TransactionScope就能解决问题。但现实是,文本提取、情感分析、实体识别是三个独立的、可能部署在不同位置的服务,它们各自有自己的数据库或状态存储。传统的分布式事务(2PC)由于其同步阻塞和对特定基础设施的依赖,在现代微服务架构中几乎被弃用。
问题由此转化为:如何在不使用2-PC的情况下,为跨多个服务的业务流程提供类似ACID中的原子性(Atomicity)保证?这直接将我们引向了Saga模式。然而,引入一个成熟的消息队列(如RabbitMQ)和重量级的Saga框架(如MassTransit或NServiceBus)对于我们当前这个内聚性很高的模块来说,显得过于笨重,运维成本也陡增。我们需要的是一个轻量级、高内聚、易于理解和维护的解决方案。
最终的决策是:在ASP.NET Core应用内部,使用C#从头构建一个基于“编排式Saga”(Orchestration-based Saga)的协调器。它将业务流程的持久化状态存储在应用主数据库(PostgreSQL)中,从而在不引入过多外部依赖的情况下,实现长流程的可靠性和补偿能力。
定义Saga的核心契约
设计的起点是定义清晰的契约。一个Saga由多个步骤组成,每个步骤都包含一个正向操作(Action)和一个补偿操作(Compensation)。
// ISagaStep.cs
/// <summary>
/// 定义Saga流程中的一个独立步骤。
/// 每个步骤必须是幂等的,并且提供一个补偿操作。
/// </summary>
/// <typeparam name="TContext">Saga执行的上下文对象类型</typeparam>
public interface ISagaStep<TContext> where TContext : class
{
/// <summary>
/// 步骤名称,用于日志记录和状态跟踪。
/// </summary>
string Name { get; }
/// <summary>
/// 正向操作:执行该步骤的业务逻辑。
/// </summary>
/// <param name="context">Saga的共享上下文,用于传递数据。</param>
/// <returns>一个Task,表示异步操作。</returns>
Task ExecuteAsync(TContext context);
/// <summary>
/// 补偿操作:当Saga后续步骤失败时,回滚当前步骤所做的更改。
/// </summary>
/// <param name="context">Saga的共享上下文。</param>
/// <returns>一个Task,表示异步补偿操作。</returns>
Task CompensateAsync(TContext context);
}
这个接口是整个模式的基石。TContext是关键,它作为一个状态载体,在Saga的整个生命周期中传递,串联起所有步骤。
接下来是Saga本身的状态持久化模型。我们需要一个实体来记录每个Saga实例的执行进度。这对于系统崩溃后的恢复至关重要。
// SagaState.cs
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
public class SagaState
{
[Key]
public Guid Id { get; set; }
[Required]
public string SagaType { get; set; }
[Required]
public int CurrentStep { get; set; }
[Required]
public SagaStatus Status { get; set; }
// 使用JSONB/JSON列类型来存储上下文,灵活性高
[Required]
[Column(TypeName = "jsonb")]
public string ContextData { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
}
public enum SagaStatus
{
Pending,
Executing,
Compensating,
Succeeded,
Failed
}
使用Entity Framework Core,这个SagaState类会被映射到数据库中的一张表。ContextData字段使用jsonb类型(在PostgreSQL中)或nvarchar(max)(在SQL Server中)来序列化我们的TContext对象,这提供了极大的灵活性,无需为每种Saga流程都设计一张新的状态表。
构建Saga协调器
协调器(Orchestrator)是Saga模式的核心。它负责按顺序执行每个步骤,记录进度,并在失败时触发补偿流程。
// SagaOrchestrator.cs
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using System.Text.Json;
public class SagaOrchestrator<TContext> where TContext : class
{
private readonly ILogger<SagaOrchestrator<TContext>> _logger;
private readonly YourDbContext _dbContext;
private readonly IReadOnlyList<ISagaStep<TContext>> _steps;
private readonly string _sagaType;
public SagaOrchestrator(
ILogger<SagaOrchestrator<TContext>> logger,
YourDbContext dbContext,
IEnumerable<ISagaStep<TContext>> steps)
{
_logger = logger;
_dbContext = dbContext;
_steps = steps.ToList();
_sagaType = typeof(TContext).Name; // 使用上下文类型作为Saga类型标识
}
public async Task<Guid> ExecuteAsync(TContext context)
{
var state = new SagaState
{
Id = Guid.NewGuid(),
SagaType = _sagaType,
CurrentStep = 0,
Status = SagaStatus.Executing,
ContextData = JsonSerializer.Serialize(context)
};
// 关键:Saga状态的创建本身就是一个事务
_dbContext.SagaStates.Add(state);
await _dbContext.SaveChangesAsync();
_logger.LogInformation("Saga {SagaId} of type {SagaType} started.", state.Id, _sagaType);
try
{
for (int i = 0; i < _steps.Count; i++)
{
state.CurrentStep = i;
// 在执行业务逻辑前,先持久化当前进度
// 这是保证可恢复性的核心。如果服务器在ExecuteAsync期间崩溃,
// 重启后我们知道是从第`i`步开始重试还是补偿。
await UpdateStateAsync(state);
var currentStep = _steps[i];
_logger.LogInformation("Saga {SagaId}, executing step: {StepName}", state.Id, currentStep.Name);
await currentStep.ExecuteAsync(context);
// 执行成功后更新上下文,以便下一步骤使用
state.ContextData = JsonSerializer.Serialize(context);
}
state.Status = SagaStatus.Succeeded;
_logger.LogInformation("Saga {SagaId} succeeded.", state.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Saga {SagaId} failed at step {StepIndex}. Starting compensation.", state.Id, state.CurrentStep);
state.Status = SagaStatus.Failed; // 先标记为失败
await UpdateStateAsync(state); // 持久化失败状态
await CompensateAsync(state, context);
// 抛出原始异常,让调用方知道操作失败
throw;
}
finally
{
// 最终状态更新
await UpdateStateAsync(state);
}
return state.Id;
}
private async Task CompensateAsync(SagaState state, TContext context)
{
state.Status = SagaStatus.Compensating;
await UpdateStateAsync(state);
_logger.LogWarning("Saga {SagaId} entering compensation mode.", state.Id);
// 从失败步骤的前一步开始,反向执行补偿操作
for (int i = state.CurrentStep; i >= 0; i--)
{
// 确保上下文是补偿时的状态
// 真实项目中,可能需要从 state.ContextData 反序列化历史版本
var stepToCompensate = _steps[i];
try
{
_logger.LogWarning("Saga {SagaId}, compensating step: {StepName}", state.Id, stepToCompensate.Name);
await stepToCompensate.CompensateAsync(context);
// 更新进度,表示该步骤已成功补偿
state.CurrentStep = i - 1;
await UpdateStateAsync(state);
}
catch (Exception compEx)
{
// 补偿失败是严重问题,需要人工介入。
_logger.LogCritical(compEx, "Saga {SagaId} CRITICAL FAILURE: Compensation for step {StepName} failed.", state.Id, stepToCompensate.Name);
// 此时Saga处于“脏”状态,不能继续自动补偿。
// 状态会停留在Compensating,CurrentStep指向失败的补偿步骤。
// 需要告警并由运维处理。
return;
}
}
_logger.LogWarning("Saga {SagaId} compensation completed.", state.Id);
}
private async Task UpdateStateAsync(SagaState state)
{
state.UpdatedAt = DateTime.UtcNow;
_dbContext.SagaStates.Update(state);
// 使用独立的事务来保存Saga状态,确保与业务逻辑的事务分离
await _dbContext.SaveChangesAsync();
}
}
这个协调器的实现有几个关键的设计考量:
- 持久化先行:在执行任何一个步骤的
ExecuteAsync之前,协调器会先更新数据库中的SagaState,将CurrentStep设置为当前步骤的索引。如果系统在执行步骤期间崩溃,恢复程序只需读取SagaState表,就能知道哪些Saga实例中断了,以及它们中断在哪一步,从而决定是重试还是补偿。 - 补偿流程:当任何
ExecuteAsync抛出异常时,CompensateAsync会被触发。它会从失败的步骤开始,反向调用所有已成功执行步骤的CompensateAsync方法。 - 补偿失败处理:补偿操作本身也可能失败。这是一个严重问题,代码中用
LogCritical记录并停止了进一步的自动补偿。这种“脏”状态的Saga需要人工干预。在生产环境中,这会触发高优先级警报。 - 状态与业务逻辑分离:Saga状态的
SaveChangesAsync和业务步骤内部的数据操作应该在不同的事务中。协调器只负责流程状态的ACID,而每个ISagaStep负责其自身业务范畴内的ACID。
应用到NLP工作流
现在,我们将这个通用协调器应用到具体的NLP文档处理流程中。
首先,定义我们的上下文DocumentProcessingContext。
// DocumentProcessingContext.cs
public class DocumentProcessingContext
{
public Guid DocumentId { get; set; }
public string RawContent { get; set; }
public string ExtractedText { get; set; }
public float? SentimentScore { get; set; }
public List<string> Entities { get; set; } = new();
// 追踪每个步骤创建的资源ID,用于补偿
public Guid? TextResourceId { get; set; }
public Guid? SentimentAnalysisId { get; set; }
public Guid? EntityRecognitionId { get; set; }
}
上下文对象不仅传递数据,还记录了每个步骤创建的资源的ID,这对于精确补偿至关重要。
然后,为NLP流程的每一步实现ISagaStep。
// Steps/ExtractTextStep.cs
public class ExtractTextStep : ISagaStep<DocumentProcessingContext>
{
private readonly ITextExtractionService _textExtractionService;
public string Name => "TextExtraction";
public ExtractTextStep(ITextExtractionService textExtractionService)
{
_textExtractionService = textExtractionService;
}
public async Task ExecuteAsync(DocumentProcessingContext context)
{
var result = await _textExtractionService.ExtractAsync(context.DocumentId, context.RawContent);
context.ExtractedText = result.Text;
context.TextResourceId = result.ResourceId; // 记录创建的资源ID
}
public async Task CompensateAsync(DocumentProcessingContext context)
{
if (context.TextResourceId.HasValue)
{
// 补偿操作:删除已提取和存储的文本资源
await _textExtractionService.DeleteExtractedTextAsync(context.TextResourceId.Value);
}
}
}
// Steps/AnalyzeSentimentStep.cs
public class AnalyzeSentimentStep : ISagaStep<DocumentProcessingContext>
{
private readonly ISentimentService _sentimentService;
public string Name => "SentimentAnalysis";
public AnalyzeSentimentStep(ISentimentService sentimentService)
{
_sentimentService = sentimentService;
}
public async Task ExecuteAsync(DocumentProcessingContext context)
{
// 模拟一个可能失败的操作
if (context.ExtractedText.Contains("FAIL_SENTIMENT"))
{
throw new InvalidOperationException("Sentiment analysis failed due to specific content.");
}
var result = await _sentimentService.AnalyzeAsync(context.ExtractedText);
context.SentimentScore = result.Score;
context.SentimentAnalysisId = result.AnalysisId;
}
public async Task CompensateAsync(DocumentProcessingContext context)
{
if (context.SentimentAnalysisId.HasValue)
{
await _sentimentService.DeleteAnalysisResultAsync(context.SentimentAnalysisId.Value);
}
}
}
// Steps/RecognizeEntitiesStep.cs
public class RecognizeEntitiesStep : ISagaStep<DocumentProcessingContext>
{
private readonly IEntityRecognitionService _entityService;
public string Name => "EntityRecognition";
public RecognizeEntitiesStep(IEntityRecognitionService entityService)
{
_entityService = entityService;
}
public async Task ExecuteAsync(DocumentProcessingContext context)
{
var result = await _entityService.RecognizeAsync(context.ExtractedText);
context.Entities.AddRange(result.Entities);
context.EntityRecognitionId = result.RecognitionId;
}
public async Task CompensateAsync(DocumentProcessingContext context)
{
if (context.EntityRecognitionId.HasValue)
{
await _entityService.DeleteRecognitionResultAsync(context.EntityRecognitionId.Value);
}
}
}
每个步骤都封装了单一职责:调用一个外部或内部服务,并知道如何撤销自己所做的操作。
在ASP.NET Core中组装和使用
最后,在Program.cs或Startup.cs中将所有组件注册到依赖注入容器。
// Program.cs (Minimal API example)
// 1. 注册DbContext
builder.Services.AddDbContext<YourDbContext>(options =>
options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection")));
// 2. 注册NLP服务(模拟实现)
builder.Services.AddScoped<ITextExtractionService, MockTextExtractionService>();
builder.Services.AddScoped<ISentimentService, MockSentimentService>();
builder.Services.AddScoped<IEntityRecognitionService, MockEntityRecognitionService>();
// 3. 注册Saga步骤
// 注册顺序很重要,它定义了Saga的执行顺序
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, ExtractTextStep>();
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, AnalyzeSentimentStep>();
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, RecognizeEntitiesStep>();
// 4. 注册Saga协调器
builder.Services.AddScoped<SagaOrchestrator<DocumentProcessingContext>>(provider =>
{
var logger = provider.GetRequiredService<ILogger<SagaOrchestrator<DocumentProcessingContext>>>();
var dbContext = provider.GetRequiredService<YourDbContext>();
// 从DI容器中解析所有已注册的该类型Saga的步骤
var steps = provider.GetServices<ISagaStep<DocumentProcessingContext>>();
return new SagaOrchestrator<DocumentProcessingContext>(logger, dbContext, steps);
});
var app = builder.Build();
// 5. 创建API端点来触发Saga
app.MapPost("/process-document", async (string rawContent, SagaOrchestrator<DocumentProcessingContext> orchestrator) =>
{
var context = new DocumentProcessingContext
{
DocumentId = Guid.NewGuid(),
RawContent = rawContent
};
try
{
var sagaId = await orchestrator.ExecuteAsync(context);
return Results.Ok(new { SagaId = sagaId, Message = "Document processing started and completed successfully." });
}
catch (Exception ex)
{
// 协调器会重新抛出异常,以便API层返回适当的错误响应
return Results.Problem(
detail: $"Document processing failed and was rolled back. Error: {ex.Message}",
statusCode: 500
);
}
});
app.Run();
通过依赖注入,协调器在创建时会自动获得所有已注册的ISagaStep<DocumentProcessingContext>实例,并且顺序与注册顺序一致。这使得添加、删除或重排步骤变得非常简单,只需要修改Program.cs中的注册代码即可。
流程可视化
使用Mermaid可以清晰地展示成功和失败两种场景下的流程。
成功流程:
sequenceDiagram
participant Client
participant API
participant Orchestrator
participant Step1 as ExtractText
participant Step2 as AnalyzeSentiment
participant Step3 as RecognizeEntities
participant DB as SagaStateDB
Client->>+API: POST /process-document
API->>+Orchestrator: ExecuteAsync(context)
Orchestrator->>+DB: Create SagaState (Status: Executing)
DB-->>-Orchestrator: SagaId
Orchestrator->>+DB: Update State (CurrentStep=0)
DB-->>-Orchestrator: OK
Orchestrator->>+Step1: ExecuteAsync()
Step1-->>-Orchestrator: Success
Orchestrator->>+DB: Update State (CurrentStep=1)
DB-->>-Orchestrator: OK
Orchestrator->>+Step2: ExecuteAsync()
Step2-->>-Orchestrator: Success
Orchestrator->>+DB: Update State (CurrentStep=2)
DB-->>-Orchestrator: OK
Orchestrator->>+Step3: ExecuteAsync()
Step3-->>-Orchestrator: Success
Orchestrator->>+DB: Update State (Status: Succeeded)
DB-->>-Orchestrator: OK
Orchestrator-->>-API: Returns SagaId
API-->>-Client: 200 OK
失败与补偿流程:
sequenceDiagram
participant Client
participant API
participant Orchestrator
participant Step1 as ExtractText
participant Step2 as AnalyzeSentiment
participant Step3 as RecognizeEntities
participant DB as SagaStateDB
Client->>+API: POST /process-document
API->>+Orchestrator: ExecuteAsync(context)
Note over Orchestrator, Step1: Step 1 (ExtractText) executes successfully.
Orchestrator->>+Step1: ExecuteAsync()
Step1-->>-Orchestrator: Success
Orchestrator->>+DB: Update State (CurrentStep=1)
DB-->>-Orchestrator: OK
Note over Orchestrator, Step2: Step 2 (AnalyzeSentiment) fails.
Orchestrator->>+Step2: ExecuteAsync()
Step2-->>-Orchestrator: Throws Exception
Orchestrator->>+DB: Update State (Status: Failed)
DB-->>-Orchestrator: OK
Orchestrator->>Orchestrator: Start Compensation
Orchestrator->>+DB: Update State (Status: Compensating)
DB-->>-Orchestrator: OK
Note right of Orchestrator: Compensating from failed step's index (1), which is Step2. It was never successful, so its compensation is skipped. Now compensating step 0.
Orchestrator->>+Step1: CompensateAsync()
Step1-->>-Orchestrator: Compensation Success
Orchestrator->>+DB: Update State (CurrentStep=-1)
DB-->>-Orchestrator: OK
Orchestrator-->>-API: Throws Exception
API-->>-Client: 500 Internal Server Error
当前方案的局限性与展望
这个轻量级的编排式Saga协调器,在为单个服务内的复杂业务流程提供原子性保证方面表现出色,它避免了引入外部消息中间件的复杂性。但它的边界也十分清晰。首先,它是一个“进程内”的协调器,不适用于跨多个独立部署微服务的场景。其次,Saga的恢复机制是被动的。如果应用在Saga执行中途崩溃,需要一个外部的、定时的恢复作业(Recovery Job)来扫描SagaState表,找出那些长时间处于Executing或Compensating状态的实例,并尝试重新触发它们的执行或补偿逻辑。
未来的迭代可以探索将Saga状态的变更发布为领域事件,由一个独立的、高可用的Saga工作流引擎来订阅和驱动,从而将流程协调的职责从主应用中剥离出来,使其成为一个更通用的平台能力。此外,可以为ISagaStep增加超时和重试策略,以应对依赖服务的瞬时故障,进一步增强整个工作流的韧性。