使用C#在ASP.NET Core中为NLP工作流构建具备ACID特性的Saga模式协调器


一个看似简单的需求摆在面前:处理用户上传的文档,执行一系列NLP操作——文本提取、情感分析、命名实体识别,最后将结构化结果写入分析数据库并更新索引。这个流程必须是原子性的,要么全部成功,要么就像什么都没发生过一样。在单体应用和单一数据库中,一个TransactionScope就能解决问题。但现实是,文本提取、情感分析、实体识别是三个独立的、可能部署在不同位置的服务,它们各自有自己的数据库或状态存储。传统的分布式事务(2PC)由于其同步阻塞和对特定基础设施的依赖,在现代微服务架构中几乎被弃用。

问题由此转化为:如何在不使用2-PC的情况下,为跨多个服务的业务流程提供类似ACID中的原子性(Atomicity)保证?这直接将我们引向了Saga模式。然而,引入一个成熟的消息队列(如RabbitMQ)和重量级的Saga框架(如MassTransit或NServiceBus)对于我们当前这个内聚性很高的模块来说,显得过于笨重,运维成本也陡增。我们需要的是一个轻量级、高内聚、易于理解和维护的解决方案。

最终的决策是:在ASP.NET Core应用内部,使用C#从头构建一个基于“编排式Saga”(Orchestration-based Saga)的协调器。它将业务流程的持久化状态存储在应用主数据库(PostgreSQL)中,从而在不引入过多外部依赖的情况下,实现长流程的可靠性和补偿能力。

定义Saga的核心契约

设计的起点是定义清晰的契约。一个Saga由多个步骤组成,每个步骤都包含一个正向操作(Action)和一个补偿操作(Compensation)。

// ISagaStep.cs

/// <summary>
/// 定义Saga流程中的一个独立步骤。
/// 每个步骤必须是幂等的,并且提供一个补偿操作。
/// </summary>
/// <typeparam name="TContext">Saga执行的上下文对象类型</typeparam>
public interface ISagaStep<TContext> where TContext : class
{
    /// <summary>
    /// 步骤名称,用于日志记录和状态跟踪。
    /// </summary>
    string Name { get; }

    /// <summary>
    /// 正向操作:执行该步骤的业务逻辑。
    /// </summary>
    /// <param name="context">Saga的共享上下文,用于传递数据。</param>
    /// <returns>一个Task,表示异步操作。</returns>
    Task ExecuteAsync(TContext context);

    /// <summary>
    /// 补偿操作:当Saga后续步骤失败时,回滚当前步骤所做的更改。
    /// </summary>
    /// <param name="context">Saga的共享上下文。</param>
    /// <returns>一个Task,表示异步补偿操作。</returns>
    Task CompensateAsync(TContext context);
}

这个接口是整个模式的基石。TContext是关键,它作为一个状态载体,在Saga的整个生命周期中传递,串联起所有步骤。

接下来是Saga本身的状态持久化模型。我们需要一个实体来记录每个Saga实例的执行进度。这对于系统崩溃后的恢复至关重要。

// SagaState.cs

using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;

public class SagaState
{
    [Key]
    public Guid Id { get; set; }

    [Required]
    public string SagaType { get; set; }

    [Required]
    public int CurrentStep { get; set; }

    [Required]
    public SagaStatus Status { get; set; }
    
    // 使用JSONB/JSON列类型来存储上下文,灵活性高
    [Required]
    [Column(TypeName = "jsonb")]
    public string ContextData { get; set; }

    public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
    
    public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
}

public enum SagaStatus
{
    Pending,
    Executing,
    Compensating,
    Succeeded,
    Failed
}

使用Entity Framework Core,这个SagaState类会被映射到数据库中的一张表。ContextData字段使用jsonb类型(在PostgreSQL中)或nvarchar(max)(在SQL Server中)来序列化我们的TContext对象,这提供了极大的灵活性,无需为每种Saga流程都设计一张新的状态表。

构建Saga协调器

协调器(Orchestrator)是Saga模式的核心。它负责按顺序执行每个步骤,记录进度,并在失败时触发补偿流程。

// SagaOrchestrator.cs

using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using System.Text.Json;

public class SagaOrchestrator<TContext> where TContext : class
{
    private readonly ILogger<SagaOrchestrator<TContext>> _logger;
    private readonly YourDbContext _dbContext;
    private readonly IReadOnlyList<ISagaStep<TContext>> _steps;
    private readonly string _sagaType;

    public SagaOrchestrator(
        ILogger<SagaOrchestrator<TContext>> logger,
        YourDbContext dbContext,
        IEnumerable<ISagaStep<TContext>> steps)
    {
        _logger = logger;
        _dbContext = dbContext;
        _steps = steps.ToList();
        _sagaType = typeof(TContext).Name; // 使用上下文类型作为Saga类型标识
    }

    public async Task<Guid> ExecuteAsync(TContext context)
    {
        var state = new SagaState
        {
            Id = Guid.NewGuid(),
            SagaType = _sagaType,
            CurrentStep = 0,
            Status = SagaStatus.Executing,
            ContextData = JsonSerializer.Serialize(context)
        };

        // 关键:Saga状态的创建本身就是一个事务
        _dbContext.SagaStates.Add(state);
        await _dbContext.SaveChangesAsync();

        _logger.LogInformation("Saga {SagaId} of type {SagaType} started.", state.Id, _sagaType);

        try
        {
            for (int i = 0; i < _steps.Count; i++)
            {
                state.CurrentStep = i;
                
                // 在执行业务逻辑前,先持久化当前进度
                // 这是保证可恢复性的核心。如果服务器在ExecuteAsync期间崩溃,
                // 重启后我们知道是从第`i`步开始重试还是补偿。
                await UpdateStateAsync(state);

                var currentStep = _steps[i];
                _logger.LogInformation("Saga {SagaId}, executing step: {StepName}", state.Id, currentStep.Name);
                
                await currentStep.ExecuteAsync(context);
                
                // 执行成功后更新上下文,以便下一步骤使用
                state.ContextData = JsonSerializer.Serialize(context);
            }

            state.Status = SagaStatus.Succeeded;
            _logger.LogInformation("Saga {SagaId} succeeded.", state.Id);
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Saga {SagaId} failed at step {StepIndex}. Starting compensation.", state.Id, state.CurrentStep);
            state.Status = SagaStatus.Failed; // 先标记为失败
            await UpdateStateAsync(state);   // 持久化失败状态

            await CompensateAsync(state, context);
            
            // 抛出原始异常,让调用方知道操作失败
            throw; 
        }
        finally
        {
            // 最终状态更新
            await UpdateStateAsync(state);
        }
        
        return state.Id;
    }

    private async Task CompensateAsync(SagaState state, TContext context)
    {
        state.Status = SagaStatus.Compensating;
        await UpdateStateAsync(state);

        _logger.LogWarning("Saga {SagaId} entering compensation mode.", state.Id);

        // 从失败步骤的前一步开始,反向执行补偿操作
        for (int i = state.CurrentStep; i >= 0; i--)
        {
            // 确保上下文是补偿时的状态
            // 真实项目中,可能需要从 state.ContextData 反序列化历史版本
            var stepToCompensate = _steps[i];
            try
            {
                _logger.LogWarning("Saga {SagaId}, compensating step: {StepName}", state.Id, stepToCompensate.Name);
                await stepToCompensate.CompensateAsync(context);
                
                // 更新进度,表示该步骤已成功补偿
                state.CurrentStep = i - 1; 
                await UpdateStateAsync(state);
            }
            catch (Exception compEx)
            {
                // 补偿失败是严重问题,需要人工介入。
                _logger.LogCritical(compEx, "Saga {SagaId} CRITICAL FAILURE: Compensation for step {StepName} failed.", state.Id, stepToCompensate.Name);
                // 此时Saga处于“脏”状态,不能继续自动补偿。
                // 状态会停留在Compensating,CurrentStep指向失败的补偿步骤。
                // 需要告警并由运维处理。
                return;
            }
        }
        
        _logger.LogWarning("Saga {SagaId} compensation completed.", state.Id);
    }
    
    private async Task UpdateStateAsync(SagaState state)
    {
        state.UpdatedAt = DateTime.UtcNow;
        _dbContext.SagaStates.Update(state);
        // 使用独立的事务来保存Saga状态,确保与业务逻辑的事务分离
        await _dbContext.SaveChangesAsync();
    }
}

这个协调器的实现有几个关键的设计考量:

  1. 持久化先行:在执行任何一个步骤的ExecuteAsync之前,协调器会先更新数据库中的SagaState,将CurrentStep设置为当前步骤的索引。如果系统在执行步骤期间崩溃,恢复程序只需读取SagaState表,就能知道哪些Saga实例中断了,以及它们中断在哪一步,从而决定是重试还是补偿。
  2. 补偿流程:当任何ExecuteAsync抛出异常时,CompensateAsync会被触发。它会从失败的步骤开始,反向调用所有已成功执行步骤的CompensateAsync方法。
  3. 补偿失败处理:补偿操作本身也可能失败。这是一个严重问题,代码中用LogCritical记录并停止了进一步的自动补偿。这种“脏”状态的Saga需要人工干预。在生产环境中,这会触发高优先级警报。
  4. 状态与业务逻辑分离:Saga状态的SaveChangesAsync和业务步骤内部的数据操作应该在不同的事务中。协调器只负责流程状态的ACID,而每个ISagaStep负责其自身业务范畴内的ACID。

应用到NLP工作流

现在,我们将这个通用协调器应用到具体的NLP文档处理流程中。

首先,定义我们的上下文DocumentProcessingContext

// DocumentProcessingContext.cs
public class DocumentProcessingContext
{
    public Guid DocumentId { get; set; }
    public string RawContent { get; set; }
    public string ExtractedText { get; set; }
    public float? SentimentScore { get; set; }
    public List<string> Entities { get; set; } = new();

    // 追踪每个步骤创建的资源ID,用于补偿
    public Guid? TextResourceId { get; set; }
    public Guid? SentimentAnalysisId { get; set; }
    public Guid? EntityRecognitionId { get; set; }
}

上下文对象不仅传递数据,还记录了每个步骤创建的资源的ID,这对于精确补偿至关重要。

然后,为NLP流程的每一步实现ISagaStep

// Steps/ExtractTextStep.cs
public class ExtractTextStep : ISagaStep<DocumentProcessingContext>
{
    private readonly ITextExtractionService _textExtractionService;
    public string Name => "TextExtraction";

    public ExtractTextStep(ITextExtractionService textExtractionService)
    {
        _textExtractionService = textExtractionService;
    }

    public async Task ExecuteAsync(DocumentProcessingContext context)
    {
        var result = await _textExtractionService.ExtractAsync(context.DocumentId, context.RawContent);
        context.ExtractedText = result.Text;
        context.TextResourceId = result.ResourceId; // 记录创建的资源ID
    }

    public async Task CompensateAsync(DocumentProcessingContext context)
    {
        if (context.TextResourceId.HasValue)
        {
            // 补偿操作:删除已提取和存储的文本资源
            await _textExtractionService.DeleteExtractedTextAsync(context.TextResourceId.Value);
        }
    }
}

// Steps/AnalyzeSentimentStep.cs
public class AnalyzeSentimentStep : ISagaStep<DocumentProcessingContext>
{
    private readonly ISentimentService _sentimentService;
    public string Name => "SentimentAnalysis";

    public AnalyzeSentimentStep(ISentimentService sentimentService)
    {
        _sentimentService = sentimentService;
    }

    public async Task ExecuteAsync(DocumentProcessingContext context)
    {
        // 模拟一个可能失败的操作
        if (context.ExtractedText.Contains("FAIL_SENTIMENT"))
        {
            throw new InvalidOperationException("Sentiment analysis failed due to specific content.");
        }
        var result = await _sentimentService.AnalyzeAsync(context.ExtractedText);
        context.SentimentScore = result.Score;
        context.SentimentAnalysisId = result.AnalysisId;
    }

    public async Task CompensateAsync(DocumentProcessingContext context)
    {
        if (context.SentimentAnalysisId.HasValue)
        {
            await _sentimentService.DeleteAnalysisResultAsync(context.SentimentAnalysisId.Value);
        }
    }
}

// Steps/RecognizeEntitiesStep.cs
public class RecognizeEntitiesStep : ISagaStep<DocumentProcessingContext>
{
    private readonly IEntityRecognitionService _entityService;
    public string Name => "EntityRecognition";

    public RecognizeEntitiesStep(IEntityRecognitionService entityService)
    {
        _entityService = entityService;
    }

    public async Task ExecuteAsync(DocumentProcessingContext context)
    {
        var result = await _entityService.RecognizeAsync(context.ExtractedText);
        context.Entities.AddRange(result.Entities);
        context.EntityRecognitionId = result.RecognitionId;
    }

    public async Task CompensateAsync(DocumentProcessingContext context)
    {
        if (context.EntityRecognitionId.HasValue)
        {
            await _entityService.DeleteRecognitionResultAsync(context.EntityRecognitionId.Value);
        }
    }
}

每个步骤都封装了单一职责:调用一个外部或内部服务,并知道如何撤销自己所做的操作。

在ASP.NET Core中组装和使用

最后,在Program.csStartup.cs中将所有组件注册到依赖注入容器。

// Program.cs (Minimal API example)

// 1. 注册DbContext
builder.Services.AddDbContext<YourDbContext>(options =>
    options.UseNpgsql(builder.Configuration.GetConnectionString("DefaultConnection")));

// 2. 注册NLP服务(模拟实现)
builder.Services.AddScoped<ITextExtractionService, MockTextExtractionService>();
builder.Services.AddScoped<ISentimentService, MockSentimentService>();
builder.Services.AddScoped<IEntityRecognitionService, MockEntityRecognitionService>();

// 3. 注册Saga步骤
// 注册顺序很重要,它定义了Saga的执行顺序
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, ExtractTextStep>();
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, AnalyzeSentimentStep>();
builder.Services.AddScoped<ISagaStep<DocumentProcessingContext>, RecognizeEntitiesStep>();

// 4. 注册Saga协调器
builder.Services.AddScoped<SagaOrchestrator<DocumentProcessingContext>>(provider =>
{
    var logger = provider.GetRequiredService<ILogger<SagaOrchestrator<DocumentProcessingContext>>>();
    var dbContext = provider.GetRequiredService<YourDbContext>();
    // 从DI容器中解析所有已注册的该类型Saga的步骤
    var steps = provider.GetServices<ISagaStep<DocumentProcessingContext>>();
    return new SagaOrchestrator<DocumentProcessingContext>(logger, dbContext, steps);
});

var app = builder.Build();

// 5. 创建API端点来触发Saga
app.MapPost("/process-document", async (string rawContent, SagaOrchestrator<DocumentProcessingContext> orchestrator) =>
{
    var context = new DocumentProcessingContext
    {
        DocumentId = Guid.NewGuid(),
        RawContent = rawContent
    };

    try
    {
        var sagaId = await orchestrator.ExecuteAsync(context);
        return Results.Ok(new { SagaId = sagaId, Message = "Document processing started and completed successfully." });
    }
    catch (Exception ex)
    {
        // 协调器会重新抛出异常,以便API层返回适当的错误响应
        return Results.Problem(
            detail: $"Document processing failed and was rolled back. Error: {ex.Message}",
            statusCode: 500
        );
    }
});

app.Run();

通过依赖注入,协调器在创建时会自动获得所有已注册的ISagaStep<DocumentProcessingContext>实例,并且顺序与注册顺序一致。这使得添加、删除或重排步骤变得非常简单,只需要修改Program.cs中的注册代码即可。

流程可视化

使用Mermaid可以清晰地展示成功和失败两种场景下的流程。

成功流程:

sequenceDiagram
    participant Client
    participant API
    participant Orchestrator
    participant Step1 as ExtractText
    participant Step2 as AnalyzeSentiment
    participant Step3 as RecognizeEntities
    participant DB as SagaStateDB

    Client->>+API: POST /process-document
    API->>+Orchestrator: ExecuteAsync(context)
    Orchestrator->>+DB: Create SagaState (Status: Executing)
    DB-->>-Orchestrator: SagaId
    Orchestrator->>+DB: Update State (CurrentStep=0)
    DB-->>-Orchestrator: OK
    Orchestrator->>+Step1: ExecuteAsync()
    Step1-->>-Orchestrator: Success
    Orchestrator->>+DB: Update State (CurrentStep=1)
    DB-->>-Orchestrator: OK
    Orchestrator->>+Step2: ExecuteAsync()
    Step2-->>-Orchestrator: Success
    Orchestrator->>+DB: Update State (CurrentStep=2)
    DB-->>-Orchestrator: OK
    Orchestrator->>+Step3: ExecuteAsync()
    Step3-->>-Orchestrator: Success
    Orchestrator->>+DB: Update State (Status: Succeeded)
    DB-->>-Orchestrator: OK
    Orchestrator-->>-API: Returns SagaId
    API-->>-Client: 200 OK

失败与补偿流程:

sequenceDiagram
    participant Client
    participant API
    participant Orchestrator
    participant Step1 as ExtractText
    participant Step2 as AnalyzeSentiment
    participant Step3 as RecognizeEntities
    participant DB as SagaStateDB

    Client->>+API: POST /process-document
    API->>+Orchestrator: ExecuteAsync(context)
    
    Note over Orchestrator, Step1: Step 1 (ExtractText) executes successfully.
    Orchestrator->>+Step1: ExecuteAsync()
    Step1-->>-Orchestrator: Success
    
    Orchestrator->>+DB: Update State (CurrentStep=1)
    DB-->>-Orchestrator: OK
    
    Note over Orchestrator, Step2: Step 2 (AnalyzeSentiment) fails.
    Orchestrator->>+Step2: ExecuteAsync()
    Step2-->>-Orchestrator: Throws Exception
    
    Orchestrator->>+DB: Update State (Status: Failed)
    DB-->>-Orchestrator: OK
    
    Orchestrator->>Orchestrator: Start Compensation
    Orchestrator->>+DB: Update State (Status: Compensating)
    DB-->>-Orchestrator: OK
    
    Note right of Orchestrator: Compensating from failed step's index (1), which is Step2. It was never successful, so its compensation is skipped. Now compensating step 0.
    
    Orchestrator->>+Step1: CompensateAsync()
    Step1-->>-Orchestrator: Compensation Success
    
    Orchestrator->>+DB: Update State (CurrentStep=-1)
    DB-->>-Orchestrator: OK

    Orchestrator-->>-API: Throws Exception
    API-->>-Client: 500 Internal Server Error

当前方案的局限性与展望

这个轻量级的编排式Saga协调器,在为单个服务内的复杂业务流程提供原子性保证方面表现出色,它避免了引入外部消息中间件的复杂性。但它的边界也十分清晰。首先,它是一个“进程内”的协调器,不适用于跨多个独立部署微服务的场景。其次,Saga的恢复机制是被动的。如果应用在Saga执行中途崩溃,需要一个外部的、定时的恢复作业(Recovery Job)来扫描SagaState表,找出那些长时间处于ExecutingCompensating状态的实例,并尝试重新触发它们的执行或补偿逻辑。

未来的迭代可以探索将Saga状态的变更发布为领域事件,由一个独立的、高可用的Saga工作流引擎来订阅和驱动,从而将流程协调的职责从主应用中剥离出来,使其成为一个更通用的平台能力。此外,可以为ISagaStep增加超时和重试策略,以应对依赖服务的瞬时故障,进一步增强整个工作流的韧性。


  目录