我们的移动端CI/CD平台面临一个棘手的状态一致性问题。当流水线触发一个新的、经过微调的LLM模型上线时,这个动作并非单一操作,而是必须原子化地完成三个后台服务的状态变更:
- Model Registry Service: 注册新模型的版本、元数据和访问路径。
- GPU Resource Service: 从集群中分配并持久化绑定一组GPU资源给这个新模型。
- Billing Service: 激活与该模型相关的计费策略。
任何一步失败,整个系统都会进入危险的中间状态。例如,模型在注册表中可见,但没有分配到任何GPU资源,这将导致所有调用失败。或者,资源已分配,但计费未启动,导致公司资产流失。在真实项目中,这种不一致性引发的故障排查和手动数据修复成本是无法接受的。我们需要一个跨服务的原子性保证。
方案A:基于事件的Saga模式及其局限性
微服务架构中,处理此类问题的标准答案通常是Saga模式。通过一系列的本地事务,如果某个步骤失败,则执行对应的补偿事务来回滚之前的操作。
- 优势: 服务间解耦,异步执行,天然的高可用性。每个服务只关心自己的事务和补偿逻辑。
- 劣势:
- 补偿逻辑的复杂性: 编写健壮的补偿事务极具挑战。取消GPU资源分配很简单,但如果计费服务已经向第三方支付网关发送了指令,补偿操作可能无法执行或代价高昂。
- 最终一致性的窗口: 在补偿完成之前,系统会处于事实上的不一致状态。对于模型部署这种关键操作,我们无法容忍哪怕是几秒钟的“模型已注册但不可用”的状态。
- 调试困难: 跨越多个异步消息的事务链,一旦出现问题,追踪根本原因非常痛苦。
在我们的场景下,模型上线是一个低频、高重要性的操作。对该操作的可用性要求(可以容忍几秒的延迟)低于对数据一致性的要求。因此,Saga模式的最终一致性模型,在这里成为了一个无法接受的缺点。
方案B:两阶段提交(2PC)的审慎选择
两阶段提交是一个经典的强一致性分布式事务协议。它通过引入一个“协调者”(Transaction Coordinator)来保证所有“参与者”(Participants)要么全部提交,要么全部回滚。
- 阶段一:准备(Prepare): 协调者向所有参与者发送“准备”请求。参与者执行本地事务到可以提交的状态,锁定所需资源,并向协调者回应“可以提交”或“不能提交”。
- 阶段二:提交(Commit):
- 如果所有参与者都回应“可以提交”,协调者向所有参与者发送“提交”请求。
- 如果任何一个参与者回应“不能提交”或超时,协调者向所有参与者发送“回滚”请求。
这个方案的缺点同样显著:
- 同步阻塞: 在整个事务期间(从发送prepare到收到所有commit/abort确认),所有参与者锁定的资源都处于阻塞状态。
- 协调者单点故障: 如果协调者在第二阶段发送部分commit请求后宕机,一部分参与者提交了事务,另一部分则永远等待指令,导致数据不一致。
- 网络分区问题: 极端情况下的网络问题可能导致“脑裂”。
尽管有这些众所周知的缺陷,我们最终还是选择了2PC。理由如下:模型部署的事务执行时间很短(通常在秒级),短暂的资源锁定是可接受的。协调者的单点问题可以通过引入高可用机制(例如基于Raft协议的集群)来缓解,但在我们当前实现的v1版本中,我们选择通过完备的监控和告警(Prometheus)以及明确的手动干预预案来暂时规避。最重要的是,2PC提供了我们这个场景下最需要的东西:原子性。
核心实现概览
我们的整体架构围绕一个用Node.js实现的中央事务协调器(TC)展开。CI/CD系统通过API触发TC,TC则负责与三个参与者服务进行2PC协议交互。
sequenceDiagram
participant CI/CD Pipeline
participant Transaction Coordinator (Node.js)
participant Model Registry Svc
participant GPU Resource Svc
participant Billing Svc
CI/CD Pipeline->>+Transaction Coordinator (Node.js): POST /v1/transactions (modelInfo)
Note over Transaction Coordinator (Node.js): 创建事务, 状态: PENDING
par Phase 1: Prepare
Transaction Coordinator (Node.js)->>+Model Registry Svc: POST /prepare (txId, modelInfo)
Model Registry Svc-->>-Transaction Coordinator (Node.js): 200 OK { voted: 'yes' }
and
Transaction Coordinator (Node.js)->>+GPU Resource Svc: POST /prepare (txId, modelInfo)
GPU Resource Svc-->>-Transaction Coordinator (Node.js): 200 OK { voted: 'yes' }
and
Transaction Coordinator (Node.js)->>+Billing Svc: POST /prepare (txId, modelInfo)
Billing Svc-->>-Transaction Coordinator (Node.js): 200 OK { voted: 'yes' }
end
Note over Transaction Coordinator (Node.js): 收到所有'yes', 状态: PREPARED
Note over Transaction Coordinator (Node.js): 持久化事务状态, 准备进入Commit阶段
par Phase 2: Commit
Transaction Coordinator (Node.js)->>+Model Registry Svc: POST /commit (txId)
Model Registry Svc-->>-Transaction Coordinator (Node.js): 200 OK { status: 'committed' }
and
Transaction Coordinator (Node.js)->>+GPU Resource Svc: POST /commit (txId)
GPU Resource Svc-->>-Transaction Coordinator (Node.js): 200 OK { status: 'committed' }
and
Transaction Coordinator (Node.js)->>+Billing Svc: POST /commit (txId)
Billing Svc-->>-Transaction Coordinator (Node.js): 200 OK { status: 'committed' }
end
Note over Transaction Coordinator (Node.js): 收到所有commit确认, 状态: COMMITTED
Transaction Coordinator (Node.js)-->>-CI/CD Pipeline: 200 OK { transactionId, status: 'committed' }
事务协调器(Transaction Coordinator)的Node.js实现
协调器的实现是整个架构的核心。它需要管理事务状态、与参与者通信、处理超时和失败,并将关键指标暴露给Prometheus。我们使用TypeScript来增强代码的健壮性。
// src/coordinator/TransactionCoordinator.ts
import { v4 as uuidv4 } from 'uuid';
import axios, { AxiosInstance } from 'axios';
import { Transaction, TransactionState, Participant } from './types';
import { TransactionLogger } from './TransactionLogger';
import { MetricsCollector } from './MetricsCollector';
// 一个生产级的实现需要一个更可靠的持久化层, 例如写入WAL或数据库
// 这里为了演示, 我们使用一个简单的文件日志
const txLogger = new TransactionLogger('./transactions.log');
export class TransactionCoordinator {
private transactions: Map<string, Transaction> = new Map();
private httpClient: AxiosInstance;
constructor(private participants: Participant[]) {
this.httpClient = axios.create({ timeout: 5000 }); // 设定一个合理的超时
this.loadTransactionsFromLog();
}
// 系统启动时从日志恢复未完成的事务
private async loadTransactionsFromLog() {
const unresolved = await txLogger.loadUnresolved();
unresolved.forEach(tx => {
this.transactions.set(tx.id, tx);
// 对于处于 PREPARED 状态的事务, 需要重试 commit 或 abort
if (tx.state === 'PREPARED') {
// 这是一个简化处理, 实际中需要根据日志判断是commit还是abort
// 假设日志中记录了最终决策
console.log(`[Recovery] Retrying commit for transaction ${tx.id}`);
this.executePhase2(tx, 'COMMIT');
}
});
}
public async executeTransaction(payload: any): Promise<Transaction> {
const transactionId = uuidv4();
const transaction: Transaction = {
id: transactionId,
state: 'PENDING',
payload,
participants: this.participants.map(p => ({ ...p, vote: null, state: 'PENDING' })),
startTime: Date.now(),
};
this.transactions.set(transactionId, transaction);
await txLogger.log(transaction);
// --- Phase 1: Prepare ---
const prepareStartTime = Date.now();
const preparePromises = transaction.participants.map(p => this.sendPrepare(transaction, p));
const results = await Promise.allSettled(preparePromises);
MetricsCollector.observeTransactionPhaseDuration('prepare', (Date.now() - prepareStartTime) / 1000);
const votes = results.map((res, index) => {
const participant = transaction.participants[index];
if (res.status === 'fulfilled' && res.value) {
participant.vote = 'YES';
return 'YES';
} else {
participant.vote = 'NO';
MetricsCollector.incrementParticipantFailures(participant.name, 'prepare');
return 'NO';
}
});
// --- Decision ---
const decision: 'COMMIT' | 'ABORT' = votes.every(v => v === 'YES') ? 'COMMIT' : 'ABORT';
transaction.state = 'PREPARED';
await txLogger.log(transaction, decision); // 持久化最终决定, 这是关键
// --- Phase 2: Commit/Abort ---
const phase2StartTime = Date.now();
await this.executePhase2(transaction, decision);
MetricsCollector.observeTransactionPhaseDuration(decision.toLowerCase(), (Date.now() - phase2StartTime) / 1000);
return transaction;
}
private async sendPrepare(transaction: Transaction, participant: Participant): Promise<boolean> {
try {
console.log(`[${transaction.id}] Preparing participant: ${participant.name}`);
const response = await this.httpClient.post(`${participant.url}/prepare`, {
transactionId: transaction.id,
payload: transaction.payload,
});
return response.status === 200;
} catch (error) {
console.error(`[${transaction.id}] Participant ${participant.name} failed to prepare:`, error.message);
return false;
}
}
private async executePhase2(transaction: Transaction, decision: 'COMMIT' | 'ABORT') {
const action = decision === 'COMMIT' ? 'commit' : 'abort';
console.log(`[${transaction.id}] Executing phase 2: ${action.toUpperCase()}`);
const promises = transaction.participants.map(async (p) => {
try {
// 这里需要重试机制
await this.httpClient.post(`${p.url}/${action}`, { transactionId: transaction.id });
p.state = decision === 'COMMIT' ? 'COMMITTED' : 'ABORTED';
} catch (error) {
console.error(`[${transaction.id}] Participant ${p.name} failed to ${action}:`, error.message);
MetricsCollector.incrementParticipantFailures(p.name, action);
p.state = 'UNKNOWN'; // 标记为未知状态, 需要人工干预
}
});
await Promise.all(promises);
const finalState = transaction.participants.every(p => p.state === 'COMMITTED') ? 'COMMITTED' : 'ABORTED';
transaction.state = finalState;
transaction.endTime = Date.now();
MetricsCollector.incrementTransactionTotal(finalState.toLowerCase());
MetricsCollector.observeTransactionDuration((transaction.endTime - transaction.startTime) / 1000);
await txLogger.log(transaction);
this.transactions.delete(transaction.id); // 清理已完成的事务
}
}
这段代码展示了协调器的核心逻辑。关键点在于:
- 状态持久化: 在做出
COMMIT或ABORT决定后,必须先将这个决定写入持久化日志(txLogger.log(transaction, decision)),然后再向参与者发送指令。这样即使协调者崩溃重启,也能从日志中恢复事务的最终状态,并继续完成第二阶段。 - 超时与重试: 在生产环境中,
httpClient的调用需要包含更复杂的重试逻辑(例如指数退避),尤其是在第二阶段,因为指令必须送达。 - 可观测性:
MetricsCollector在关键路径上记录了事务计数、延迟和失败信息。
参与者(Participant)的实现
每个参与者服务都需要实现/prepare、/commit和/abort三个接口。下面是一个GPU资源服务的简化示例。
// src/participants/GpuResourceService.ts
import express from 'express';
import { Database } from 'sqlite3'; // 伪代码, 假设使用数据库
const app = express();
app.use(express.json());
const db = new Database(':memory:'); // 生产环境使用真实数据库
db.serialize(() => {
db.run("CREATE TABLE gpus (id TEXT PRIMARY KEY, status TEXT, reserved_by TEXT)");
db.run("INSERT INTO gpus VALUES ('gpu-001', 'available', NULL)");
db.run("INSERT INTO gpus VALUES ('gpu-002', 'available', NULL)");
});
// 保存处于 prepare 状态的事务信息
const preparedTransactions = new Map<string, { gpuId: string }>();
// Phase 1: Prepare
app.post('/prepare', (req, res) => {
const { transactionId, payload } = req.body;
// 1. 检查是否有可用资源
db.get("SELECT id FROM gpus WHERE status = 'available' LIMIT 1", (err, row) => {
if (err || !row) {
console.error(`[${transactionId}] No available GPUs for reservation.`);
return res.status(500).send('No available resources');
}
const gpuId = row.id;
// 2. 锁定资源 (核心)
// 将状态更新为 'reserved' 并记录事务ID
db.run("UPDATE gpus SET status = 'reserved', reserved_by = ? WHERE id = ?", [transactionId, gpuId], (updateErr) => {
if (updateErr) {
return res.status(500).send('Failed to lock resource');
}
// 3. 记录预备状态, 以便 commit/abort 时使用
preparedTransactions.set(transactionId, { gpuId });
console.log(`[${transactionId}] Reserved GPU ${gpuId}. Voted YES.`);
res.status(200).send('Prepared');
});
});
});
// Phase 2: Commit
app.post('/commit', (req, res) => {
const { transactionId } = req.body;
const txInfo = preparedTransactions.get(transactionId);
if (!txInfo) {
// 如果找不到事务信息, 可能是已经提交或协调者重试
// 幂等性处理: 检查资源是否已是 committed 状态
return res.status(200).send('Already committed or unknown transaction');
}
// 将资源状态从 'reserved' 正式变更为 'in_use'
db.run("UPDATE gpus SET status = 'in_use' WHERE id = ? AND reserved_by = ?", [txInfo.gpuId, transactionId], (err) => {
if (err) {
// 严重错误, 需要告警
return res.status(500).send('Failed to commit resource');
}
preparedTransactions.delete(transactionId);
console.log(`[${transactionId}] Committed GPU ${txInfo.gpuId}.`);
res.status(200).send('Committed');
});
});
// Phase 2: Abort
app.post('/abort', (req, res) => {
const { transactionId } = req.body;
const txInfo = preparedTransactions.get(transactionId);
if (!txInfo) {
return res.status(200).send('Already aborted or unknown transaction');
}
// 释放锁定的资源
db.run("UPDATE gpus SET status = 'available', reserved_by = NULL WHERE id = ? AND reserved_by = ?", [txInfo.gpuId, transactionId], (err) => {
if (err) {
// 严重错误, 需要告警
return res.status(500).send('Failed to abort resource');
}
preparedTransactions.delete(transactionId);
console.log(`[${transactionId}] Aborted reservation for GPU ${txInfo.gpuId}.`);
res.status(200).send('Aborted');
});
});
app.listen(3002, () => console.log('GPU Resource Service listening on port 3002'));
参与者的关键在于/prepare接口。它必须执行所有可能失败的操作(如检查库存、权限),然后将资源锁定在一个中间状态(reserved),并确保这个状态能持久化。/commit和/abort接口的逻辑则相对简单,通常是不可失败的,并且需要保证幂等性,因为协调者可能会重试。
基于Prometheus的可观测性建设
仅仅实现协议是不够的。对于2PC这种脆弱的协议,没有强大的可观测性就等于在黑暗中驾驶。我们使用prom-client库将协调器的内部状态暴露为Prometheus指标。
// src/coordinator/MetricsCollector.ts
import client from 'prom-client';
const register = new client.Registry();
client.collectDefaultMetrics({ register });
const transactionTotal = new client.Counter({
name: 'llm_deploy_transactions_total',
help: 'Total number of LLM deployment transactions',
labelNames: ['status'], // 'committed', 'aborted'
});
register.registerMetric(transactionTotal);
const transactionDuration = new client.Histogram({
name: 'llm_deploy_transaction_duration_seconds',
help: 'Duration of LLM deployment transactions in seconds',
buckets: [0.1, 0.5, 1, 2, 5, 10], // 根据业务场景调整 buckets
});
register.registerMetric(transactionDuration);
const transactionPhaseDuration = new client.Histogram({
name: 'llm_deploy_transaction_phase_duration_seconds',
help: 'Duration of transaction phases',
labelNames: ['phase'], // 'prepare', 'commit', 'abort'
});
register.registerMetric(transactionPhaseDuration);
const participantFailures = new client.Counter({
name: 'llm_deploy_participant_failures_total',
help: 'Total failures per participant and phase',
labelNames: ['participant', 'phase'],
});
register.registerMetric(participantFailures);
export class MetricsCollector {
static incrementTransactionTotal(status: 'committed' | 'aborted') {
transactionTotal.labels(status).inc();
}
static observeTransactionDuration(seconds: number) {
transactionDuration.observe(seconds);
}
static observeTransactionPhaseDuration(phase: string, seconds: number) {
transactionPhaseDuration.labels(phase).observe(seconds);
}
static incrementParticipantFailures(participantName: string, phase: string) {
participantFailures.labels(participantName, phase).inc();
}
static async getMetrics() {
return register.metrics();
}
}
// 在主服务中暴露 /metrics 端点
// app.get('/metrics', async (req, res) => {
// res.set('Content-Type', register.contentType);
// res.end(await MetricsCollector.getMetrics());
// });
通过这些指标,我们可以构建Dashboard和告警规则:
- 告警规则:
rate(llm_deploy_transactions_total{status="aborted"}[5m]) > 0-> 持续有事务被中止,可能存在系统性问题。 - 告警规则:
increase(llm_deploy_participant_failures_total[10m]) > 0-> 某个参与者持续失败,需要立即介入。 - Dashboard: 监控
llm_deploy_transaction_duration_seconds的p99分位数,可以发现事务性能退化。
架构的局限性与未来路径
当前这套基于Node.js实现的2PC方案,解决了我们对LLM模型部署原子性的燃眉之急。但它远非一个完美的分布式事务解决方案。其固有的同步阻塞模型限制了它的吞吐量,使其不适用于任何高频场景。
协调者是整个系统的核心瓶颈与风险点。虽然我们通过持久化日志和重启恢复机制增强了其鲁棒性,但它依然不是一个真正的高可用组件。生产环境的下一步迭代,必然是引入基于Raft或Paxos的共识算法,将协调者本身做成一个高可用的分布式集群,但这将使系统的复杂性指数级增长。
此外,2PC协议强行将多个独立的服务在事务执行期间紧密耦合在一起,这与微服务所倡导的松耦合、独立演进的理念背道而驰。因此,这个架构选择是一个经过深思熟虑的权衡,是用架构的优雅性换取特定业务场景下不可妥协的一致性保证。它应该被视为一把锋利的手术刀,仅用于处理那些无法通过最终一致性方案解决的、至关重要且低频的核心操作。