基于Node.js与两阶段提交的LLM模型原子化部署架构实现


我们的移动端CI/CD平台面临一个棘手的状态一致性问题。当流水线触发一个新的、经过微调的LLM模型上线时,这个动作并非单一操作,而是必须原子化地完成三个后台服务的状态变更:

  1. Model Registry Service: 注册新模型的版本、元数据和访问路径。
  2. GPU Resource Service: 从集群中分配并持久化绑定一组GPU资源给这个新模型。
  3. Billing Service: 激活与该模型相关的计费策略。

任何一步失败,整个系统都会进入危险的中间状态。例如,模型在注册表中可见,但没有分配到任何GPU资源,这将导致所有调用失败。或者,资源已分配,但计费未启动,导致公司资产流失。在真实项目中,这种不一致性引发的故障排查和手动数据修复成本是无法接受的。我们需要一个跨服务的原子性保证。

方案A:基于事件的Saga模式及其局限性

微服务架构中,处理此类问题的标准答案通常是Saga模式。通过一系列的本地事务,如果某个步骤失败,则执行对应的补偿事务来回滚之前的操作。

  • 优势: 服务间解耦,异步执行,天然的高可用性。每个服务只关心自己的事务和补偿逻辑。
  • 劣势:
    1. 补偿逻辑的复杂性: 编写健壮的补偿事务极具挑战。取消GPU资源分配很简单,但如果计费服务已经向第三方支付网关发送了指令,补偿操作可能无法执行或代价高昂。
    2. 最终一致性的窗口: 在补偿完成之前,系统会处于事实上的不一致状态。对于模型部署这种关键操作,我们无法容忍哪怕是几秒钟的“模型已注册但不可用”的状态。
    3. 调试困难: 跨越多个异步消息的事务链,一旦出现问题,追踪根本原因非常痛苦。

在我们的场景下,模型上线是一个低频、高重要性的操作。对该操作的可用性要求(可以容忍几秒的延迟)低于对数据一致性的要求。因此,Saga模式的最终一致性模型,在这里成为了一个无法接受的缺点。

方案B:两阶段提交(2PC)的审慎选择

两阶段提交是一个经典的强一致性分布式事务协议。它通过引入一个“协调者”(Transaction Coordinator)来保证所有“参与者”(Participants)要么全部提交,要么全部回滚。

  • 阶段一:准备(Prepare): 协调者向所有参与者发送“准备”请求。参与者执行本地事务到可以提交的状态,锁定所需资源,并向协调者回应“可以提交”或“不能提交”。
  • 阶段二:提交(Commit):
    • 如果所有参与者都回应“可以提交”,协调者向所有参与者发送“提交”请求。
    • 如果任何一个参与者回应“不能提交”或超时,协调者向所有参与者发送“回滚”请求。

这个方案的缺点同样显著:

  • 同步阻塞: 在整个事务期间(从发送prepare到收到所有commit/abort确认),所有参与者锁定的资源都处于阻塞状态。
  • 协调者单点故障: 如果协调者在第二阶段发送部分commit请求后宕机,一部分参与者提交了事务,另一部分则永远等待指令,导致数据不一致。
  • 网络分区问题: 极端情况下的网络问题可能导致“脑裂”。

尽管有这些众所周知的缺陷,我们最终还是选择了2PC。理由如下:模型部署的事务执行时间很短(通常在秒级),短暂的资源锁定是可接受的。协调者的单点问题可以通过引入高可用机制(例如基于Raft协议的集群)来缓解,但在我们当前实现的v1版本中,我们选择通过完备的监控和告警(Prometheus)以及明确的手动干预预案来暂时规避。最重要的是,2PC提供了我们这个场景下最需要的东西:原子性

核心实现概览

我们的整体架构围绕一个用Node.js实现的中央事务协调器(TC)展开。CI/CD系统通过API触发TC,TC则负责与三个参与者服务进行2PC协议交互。

sequenceDiagram
    participant CI/CD Pipeline
    participant Transaction Coordinator (Node.js)
    participant Model Registry Svc
    participant GPU Resource Svc
    participant Billing Svc

    CI/CD Pipeline->>+Transaction Coordinator (Node.js): POST /v1/transactions (modelInfo)
    Note over Transaction Coordinator (Node.js): 创建事务, 状态: PENDING

    par Phase 1: Prepare
        Transaction Coordinator (Node.js)->>+Model Registry Svc: POST /prepare (txId, modelInfo)
        Model Registry Svc-->>-Transaction Coordinator (Node.js): 200 OK { voted: 'yes' }
    and
        Transaction Coordinator (Node.js)->>+GPU Resource Svc: POST /prepare (txId, modelInfo)
        GPU Resource Svc-->>-Transaction Coordinator (Node.js): 200 OK { voted: 'yes' }
    and
        Transaction Coordinator (Node.js)->>+Billing Svc: POST /prepare (txId, modelInfo)
        Billing Svc-->>-Transaction Coordinator (Node.js): 200 OK { voted: 'yes' }
    end

    Note over Transaction Coordinator (Node.js): 收到所有'yes', 状态: PREPARED
    Note over Transaction Coordinator (Node.js): 持久化事务状态, 准备进入Commit阶段

    par Phase 2: Commit
        Transaction Coordinator (Node.js)->>+Model Registry Svc: POST /commit (txId)
        Model Registry Svc-->>-Transaction Coordinator (Node.js): 200 OK { status: 'committed' }
    and
        Transaction Coordinator (Node.js)->>+GPU Resource Svc: POST /commit (txId)
        GPU Resource Svc-->>-Transaction Coordinator (Node.js): 200 OK { status: 'committed' }
    and
        Transaction Coordinator (Node.js)->>+Billing Svc: POST /commit (txId)
        Billing Svc-->>-Transaction Coordinator (Node.js): 200 OK { status: 'committed' }
    end
    
    Note over Transaction Coordinator (Node.js): 收到所有commit确认, 状态: COMMITTED
    Transaction Coordinator (Node.js)-->>-CI/CD Pipeline: 200 OK { transactionId, status: 'committed' }

事务协调器(Transaction Coordinator)的Node.js实现

协调器的实现是整个架构的核心。它需要管理事务状态、与参与者通信、处理超时和失败,并将关键指标暴露给Prometheus。我们使用TypeScript来增强代码的健壮性。

// src/coordinator/TransactionCoordinator.ts
import { v4 as uuidv4 } from 'uuid';
import axios, { AxiosInstance } from 'axios';
import { Transaction, TransactionState, Participant } from './types';
import { TransactionLogger } from './TransactionLogger';
import { MetricsCollector } from './MetricsCollector';

// 一个生产级的实现需要一个更可靠的持久化层, 例如写入WAL或数据库
// 这里为了演示, 我们使用一个简单的文件日志
const txLogger = new TransactionLogger('./transactions.log');

export class TransactionCoordinator {
    private transactions: Map<string, Transaction> = new Map();
    private httpClient: AxiosInstance;

    constructor(private participants: Participant[]) {
        this.httpClient = axios.create({ timeout: 5000 }); // 设定一个合理的超时
        this.loadTransactionsFromLog();
    }
    
    // 系统启动时从日志恢复未完成的事务
    private async loadTransactionsFromLog() {
        const unresolved = await txLogger.loadUnresolved();
        unresolved.forEach(tx => {
            this.transactions.set(tx.id, tx);
            // 对于处于 PREPARED 状态的事务, 需要重试 commit 或 abort
            if (tx.state === 'PREPARED') {
                // 这是一个简化处理, 实际中需要根据日志判断是commit还是abort
                // 假设日志中记录了最终决策
                console.log(`[Recovery] Retrying commit for transaction ${tx.id}`);
                this.executePhase2(tx, 'COMMIT');
            }
        });
    }

    public async executeTransaction(payload: any): Promise<Transaction> {
        const transactionId = uuidv4();
        const transaction: Transaction = {
            id: transactionId,
            state: 'PENDING',
            payload,
            participants: this.participants.map(p => ({ ...p, vote: null, state: 'PENDING' })),
            startTime: Date.now(),
        };
        this.transactions.set(transactionId, transaction);
        await txLogger.log(transaction);

        // --- Phase 1: Prepare ---
        const prepareStartTime = Date.now();
        const preparePromises = transaction.participants.map(p => this.sendPrepare(transaction, p));
        const results = await Promise.allSettled(preparePromises);
        MetricsCollector.observeTransactionPhaseDuration('prepare', (Date.now() - prepareStartTime) / 1000);

        const votes = results.map((res, index) => {
            const participant = transaction.participants[index];
            if (res.status === 'fulfilled' && res.value) {
                participant.vote = 'YES';
                return 'YES';
            } else {
                participant.vote = 'NO';
                MetricsCollector.incrementParticipantFailures(participant.name, 'prepare');
                return 'NO';
            }
        });

        // --- Decision ---
        const decision: 'COMMIT' | 'ABORT' = votes.every(v => v === 'YES') ? 'COMMIT' : 'ABORT';
        
        transaction.state = 'PREPARED';
        await txLogger.log(transaction, decision); // 持久化最终决定, 这是关键

        // --- Phase 2: Commit/Abort ---
        const phase2StartTime = Date.now();
        await this.executePhase2(transaction, decision);
        MetricsCollector.observeTransactionPhaseDuration(decision.toLowerCase(), (Date.now() - phase2StartTime) / 1000);

        return transaction;
    }

    private async sendPrepare(transaction: Transaction, participant: Participant): Promise<boolean> {
        try {
            console.log(`[${transaction.id}] Preparing participant: ${participant.name}`);
            const response = await this.httpClient.post(`${participant.url}/prepare`, {
                transactionId: transaction.id,
                payload: transaction.payload,
            });
            return response.status === 200;
        } catch (error) {
            console.error(`[${transaction.id}] Participant ${participant.name} failed to prepare:`, error.message);
            return false;
        }
    }

    private async executePhase2(transaction: Transaction, decision: 'COMMIT' | 'ABORT') {
        const action = decision === 'COMMIT' ? 'commit' : 'abort';
        console.log(`[${transaction.id}] Executing phase 2: ${action.toUpperCase()}`);

        const promises = transaction.participants.map(async (p) => {
            try {
                // 这里需要重试机制
                await this.httpClient.post(`${p.url}/${action}`, { transactionId: transaction.id });
                p.state = decision === 'COMMIT' ? 'COMMITTED' : 'ABORTED';
            } catch (error) {
                console.error(`[${transaction.id}] Participant ${p.name} failed to ${action}:`, error.message);
                MetricsCollector.incrementParticipantFailures(p.name, action);
                p.state = 'UNKNOWN'; // 标记为未知状态, 需要人工干预
            }
        });
        
        await Promise.all(promises);
        
        const finalState = transaction.participants.every(p => p.state === 'COMMITTED') ? 'COMMITTED' : 'ABORTED';
        transaction.state = finalState;
        transaction.endTime = Date.now();
        
        MetricsCollector.incrementTransactionTotal(finalState.toLowerCase());
        MetricsCollector.observeTransactionDuration((transaction.endTime - transaction.startTime) / 1000);
        
        await txLogger.log(transaction);
        this.transactions.delete(transaction.id); // 清理已完成的事务
    }
}

这段代码展示了协调器的核心逻辑。关键点在于:

  • 状态持久化: 在做出COMMITABORT决定后,必须先将这个决定写入持久化日志(txLogger.log(transaction, decision)),然后再向参与者发送指令。这样即使协调者崩溃重启,也能从日志中恢复事务的最终状态,并继续完成第二阶段。
  • 超时与重试: 在生产环境中,httpClient的调用需要包含更复杂的重试逻辑(例如指数退避),尤其是在第二阶段,因为指令必须送达。
  • 可观测性: MetricsCollector在关键路径上记录了事务计数、延迟和失败信息。

参与者(Participant)的实现

每个参与者服务都需要实现/prepare/commit/abort三个接口。下面是一个GPU资源服务的简化示例。

// src/participants/GpuResourceService.ts
import express from 'express';
import { Database } from 'sqlite3'; // 伪代码, 假设使用数据库

const app = express();
app.use(express.json());

const db = new Database(':memory:'); // 生产环境使用真实数据库
db.serialize(() => {
    db.run("CREATE TABLE gpus (id TEXT PRIMARY KEY, status TEXT, reserved_by TEXT)");
    db.run("INSERT INTO gpus VALUES ('gpu-001', 'available', NULL)");
    db.run("INSERT INTO gpus VALUES ('gpu-002', 'available', NULL)");
});

// 保存处于 prepare 状态的事务信息
const preparedTransactions = new Map<string, { gpuId: string }>();

// Phase 1: Prepare
app.post('/prepare', (req, res) => {
    const { transactionId, payload } = req.body;
    
    // 1. 检查是否有可用资源
    db.get("SELECT id FROM gpus WHERE status = 'available' LIMIT 1", (err, row) => {
        if (err || !row) {
            console.error(`[${transactionId}] No available GPUs for reservation.`);
            return res.status(500).send('No available resources');
        }
        
        const gpuId = row.id;

        // 2. 锁定资源 (核心)
        // 将状态更新为 'reserved' 并记录事务ID
        db.run("UPDATE gpus SET status = 'reserved', reserved_by = ? WHERE id = ?", [transactionId, gpuId], (updateErr) => {
            if (updateErr) {
                return res.status(500).send('Failed to lock resource');
            }
            
            // 3. 记录预备状态, 以便 commit/abort 时使用
            preparedTransactions.set(transactionId, { gpuId });
            console.log(`[${transactionId}] Reserved GPU ${gpuId}. Voted YES.`);
            res.status(200).send('Prepared');
        });
    });
});

// Phase 2: Commit
app.post('/commit', (req, res) => {
    const { transactionId } = req.body;
    const txInfo = preparedTransactions.get(transactionId);

    if (!txInfo) {
        // 如果找不到事务信息, 可能是已经提交或协调者重试
        // 幂等性处理: 检查资源是否已是 committed 状态
        return res.status(200).send('Already committed or unknown transaction');
    }
    
    // 将资源状态从 'reserved' 正式变更为 'in_use'
    db.run("UPDATE gpus SET status = 'in_use' WHERE id = ? AND reserved_by = ?", [txInfo.gpuId, transactionId], (err) => {
        if (err) {
            // 严重错误, 需要告警
            return res.status(500).send('Failed to commit resource');
        }
        preparedTransactions.delete(transactionId);
        console.log(`[${transactionId}] Committed GPU ${txInfo.gpuId}.`);
        res.status(200).send('Committed');
    });
});

// Phase 2: Abort
app.post('/abort', (req, res) => {
    const { transactionId } = req.body;
    const txInfo = preparedTransactions.get(transactionId);
    
    if (!txInfo) {
        return res.status(200).send('Already aborted or unknown transaction');
    }

    // 释放锁定的资源
    db.run("UPDATE gpus SET status = 'available', reserved_by = NULL WHERE id = ? AND reserved_by = ?", [txInfo.gpuId, transactionId], (err) => {
        if (err) {
             // 严重错误, 需要告警
            return res.status(500).send('Failed to abort resource');
        }
        preparedTransactions.delete(transactionId);
        console.log(`[${transactionId}] Aborted reservation for GPU ${txInfo.gpuId}.`);
        res.status(200).send('Aborted');
    });
});

app.listen(3002, () => console.log('GPU Resource Service listening on port 3002'));

参与者的关键在于/prepare接口。它必须执行所有可能失败的操作(如检查库存、权限),然后将资源锁定在一个中间状态(reserved),并确保这个状态能持久化。/commit/abort接口的逻辑则相对简单,通常是不可失败的,并且需要保证幂等性,因为协调者可能会重试。

基于Prometheus的可观测性建设

仅仅实现协议是不够的。对于2PC这种脆弱的协议,没有强大的可观测性就等于在黑暗中驾驶。我们使用prom-client库将协调器的内部状态暴露为Prometheus指标。

// src/coordinator/MetricsCollector.ts
import client from 'prom-client';

const register = new client.Registry();
client.collectDefaultMetrics({ register });

const transactionTotal = new client.Counter({
    name: 'llm_deploy_transactions_total',
    help: 'Total number of LLM deployment transactions',
    labelNames: ['status'], // 'committed', 'aborted'
});
register.registerMetric(transactionTotal);

const transactionDuration = new client.Histogram({
    name: 'llm_deploy_transaction_duration_seconds',
    help: 'Duration of LLM deployment transactions in seconds',
    buckets: [0.1, 0.5, 1, 2, 5, 10], // 根据业务场景调整 buckets
});
register.registerMetric(transactionDuration);

const transactionPhaseDuration = new client.Histogram({
    name: 'llm_deploy_transaction_phase_duration_seconds',
    help: 'Duration of transaction phases',
    labelNames: ['phase'], // 'prepare', 'commit', 'abort'
});
register.registerMetric(transactionPhaseDuration);

const participantFailures = new client.Counter({
    name: 'llm_deploy_participant_failures_total',
    help: 'Total failures per participant and phase',
    labelNames: ['participant', 'phase'],
});
register.registerMetric(participantFailures);


export class MetricsCollector {
    static incrementTransactionTotal(status: 'committed' | 'aborted') {
        transactionTotal.labels(status).inc();
    }
    
    static observeTransactionDuration(seconds: number) {
        transactionDuration.observe(seconds);
    }
    
    static observeTransactionPhaseDuration(phase: string, seconds: number) {
        transactionPhaseDuration.labels(phase).observe(seconds);
    }

    static incrementParticipantFailures(participantName: string, phase: string) {
        participantFailures.labels(participantName, phase).inc();
    }
    
    static async getMetrics() {
        return register.metrics();
    }
}

// 在主服务中暴露 /metrics 端点
// app.get('/metrics', async (req, res) => {
//     res.set('Content-Type', register.contentType);
//     res.end(await MetricsCollector.getMetrics());
// });

通过这些指标,我们可以构建Dashboard和告警规则:

  • 告警规则: rate(llm_deploy_transactions_total{status="aborted"}[5m]) > 0 -> 持续有事务被中止,可能存在系统性问题。
  • 告警规则: increase(llm_deploy_participant_failures_total[10m]) > 0 -> 某个参与者持续失败,需要立即介入。
  • Dashboard: 监控llm_deploy_transaction_duration_seconds的p99分位数,可以发现事务性能退化。

架构的局限性与未来路径

当前这套基于Node.js实现的2PC方案,解决了我们对LLM模型部署原子性的燃眉之急。但它远非一个完美的分布式事务解决方案。其固有的同步阻塞模型限制了它的吞吐量,使其不适用于任何高频场景。

协调者是整个系统的核心瓶颈与风险点。虽然我们通过持久化日志和重启恢复机制增强了其鲁棒性,但它依然不是一个真正的高可用组件。生产环境的下一步迭代,必然是引入基于Raft或Paxos的共识算法,将协调者本身做成一个高可用的分布式集群,但这将使系统的复杂性指数级增长。

此外,2PC协议强行将多个独立的服务在事务执行期间紧密耦合在一起,这与微服务所倡导的松耦合、独立演进的理念背道而驰。因此,这个架构选择是一个经过深思熟虑的权衡,是用架构的优雅性换取特定业务场景下不可妥协的一致性保证。它应该被视为一把锋利的手术刀,仅用于处理那些无法通过最终一致性方案解决的、至关重要且低频的核心操作。


  目录