处理跨多个服务或模块的长时间运行业务流程,数据一致性是一个绕不开的难题。一个典型的场景是订单创建:你需要锁定库存、处理支付、创建订单记录,可能还需要通知物流。这些步骤中任何一步失败,整个系统都必须回退到一致的状态。如果服务在执行到一半时崩溃重启,又该如何从中断点安全地恢复?
在真实项目中,我们经常看到这样的实现:一个庞大的try...catch
块,里面串联着多个await
调用。这种方式脆弱且难以维护。当补偿逻辑(例如,支付成功但库存锁定失败后需要退款)变得复杂时,代码会迅速退化成难以理解的嵌套if-else
。更严重的是,它无法处理进程崩溃的场景,一次意外的重启就可能导致数据永久不一致——钱扣了,但订单没创建,库存也没解锁。
另一种方案是引入数据库状态字段,例如order_status
,通过PENDING
, PAID
, INVENTORY_LOCKED
等状态来追踪进度。这种方法比纯粹的过程式代码要好,但它将复杂的业务流程逻辑与数据模型紧密耦合。状态转换的逻辑散布在各个业务方法中,缺乏一个集中的、可被验证的定义,很快就会变成另一个维护噩emon。
我们需要一个更健壮的方案来对整个流程进行建模和编排。这个方案必须能明确地定义流程中的每一步、每种可能的成功或失败路径,并且支持持久化,以便在服务中断后能够精确恢复。这就是有限状态机(Finite State Machine)及其在Saga事务模式中应用的价值所在。本文将探讨如何在NestJS后端应用中,结合TypeScript的类型系统、XState的状态机库以及SQL数据库的持久化能力,构建一个可恢复、可观测且高度可维护的Saga工作流。
方案对比与技术选型
方案A:传统的Service层编排
这是最常见的实现方式。一个OrderService
可能包含如下方法:
// a-service-orchestration.ts
@Injectable()
export class OrderService {
constructor(
private readonly inventoryService: InventoryService,
private readonly paymentService: PaymentService,
// ... other services
) {}
async createOrder(orderData: CreateOrderDto): Promise<Order> {
let inventoryLockId: string | null = null;
let paymentTransactionId: string | null = null;
try {
// Step 1: Lock inventory
const lock = await this.inventoryService.lock(orderData.items);
inventoryLockId = lock.id;
// Step 2: Process payment
const payment = await this.paymentService.process(orderData.paymentDetails);
paymentTransactionId = payment.id;
// Imagine the server crashes here.
// Inventory is locked, payment is processed, but no order is created.
// This results in data inconsistency.
// Step 3: Create order record
const order = await this.orderRepository.create({
...orderData,
status: 'COMPLETED',
inventoryLockId,
paymentTransactionId,
});
return order;
} catch (error) {
// Compensation logic
if (paymentTransactionId) {
await this.paymentService.refund(paymentTransactionId);
}
if (inventoryLockId) {
await this.inventoryService.release(inventoryLockId);
}
// This compensation logic itself can fail.
// What if refund succeeds but inventory release fails?
throw new Error(`Order creation failed: ${error.message}`);
}
}
}
劣势分析:
- 脆弱性: 进程崩溃会导致状态不一致,且没有内建的恢复机制。
- 补偿逻辑复杂:
catch
块中的补偿逻辑会随着步骤增多而变得异常复杂,容易出错。 - 状态不明确: 流程的当前状态是隐式的,由代码执行到哪一行决定,难以追踪和调试。
- 职责不清:
createOrder
方法承担了过多的编排职责,违反了单一职责原则。
方案B:使用XState进行状态机建模
我们将整个订单创建流程抽象为一个状态机。每个步骤(锁定库存、支付)都是一个状态,步骤的成功或失败是触发状态转换的事件。
// b-state-machine-concept.ts
import { createMachine } from 'xstate';
const orderCreationMachine = createMachine({
id: 'orderCreation',
initial: 'lockingInventory',
context: {
// ... order data
},
states: {
lockingInventory: {
invoke: {
src: 'lockInventoryService',
onDone: 'processingPayment',
onError: 'compensatingInventory',
},
},
processingPayment: {
invoke: {
src: 'processPaymentService',
onDone: 'creatingOrderRecord',
onError: 'compensatingPayment',
},
},
creatingOrderRecord: {
invoke: {
src: 'createOrderRecordService',
onDone: 'completed',
onError: 'compensatingOrderCreation',
}
},
// ... compensation states ...
completed: {
type: 'final'
},
failed: {
type: 'final'
}
}
});
优势分析:
- 明确性与可视化: 状态和转换是显式定义的。整个业务流程可以被清晰地可视化(XState提供了可视化工具),降低了理解成本。
- 关注点分离: 状态机定义了流程的“what”和“when”,而具体的实现(异步服务调用)则被分离出去,由状态机在进入特定状态时
invoke
。 - 内建的错误处理:
onError
转换专门用于处理失败路径,使得补偿逻辑的定义与主流程一样清晰。 - 可持久化与可恢复: XState的状态对象是可序列化的JSON。我们可以将当前状态(包括上下文数据)存入SQL数据库。当服务重启后,可以从数据库加载状态,恢复状态机并从中断的地方继续执行。
最终决策:
对于这种涉及多个步骤、需要补偿逻辑且对一致性要求高的业务流程,方案B(XState)是明显更优的选择。它牺牲了少量的 первоначальную简单性,换来了极高的可维护性、健壮性和可观测性。在真实项目中,这种前期投入能够避免后期无尽的线上问题排查和数据修复。
核心实现概览
我们将使用NestJS作为应用框架,TypeORM作为SQL数据库的ORM,以及@xstate/fsm
(一个轻量级的XState核心)来实现这个模式。
1. 数据库实体设计
我们需要一个实体来存储订单信息,并且关键是,要有一个字段来存储状态机的序列化状态。
// src/order/entities/order.entity.ts
import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, UpdateDateColumn } from 'typeorm';
export enum OrderStatus {
PENDING = 'PENDING',
COMPLETED = 'COMPLETED',
FAILED = 'FAILED',
}
@Entity('orders')
export class Order {
@PrimaryGeneratedColumn('uuid')
id: string;
@Column({ type: 'jsonb' })
productInfo: { productId: string; quantity: number }[];
@Column({ type: 'decimal', precision: 10, scale: 2 })
amount: number;
@Column({
type: 'enum',
enum: OrderStatus,
default: OrderStatus.PENDING,
})
status: OrderStatus;
// 这是关键字段,用于存储XState状态机的快照
@Column({ type: 'jsonb', nullable: true })
machineState: any;
@Column({ nullable: true })
failureReason?: string;
@CreateDateColumn()
createdAt: Date;
@UpdateDateColumn()
updatedAt: Date;
}
2. 定义状态机
这是整个架构的核心。我们定义一个包含主流程和完整补偿逻辑的状态机。
// src/order/machines/order.machine.ts
import { createMachine, assign } from 'xstate';
// 定义状态机上下文的类型
export interface OrderMachineContext {
orderId: string;
productInfo: { productId: string; quantity: number }[];
amount: number;
inventoryLockId?: string;
paymentTransactionId?: string;
error?: string;
}
// 定义状态机可以接收的事件
export type OrderMachineEvent =
| { type: 'CREATE'; data: { orderId: string; productInfo: any; amount: number } }
| { type: 'RETRY' }
| { type: 'FAIL'; error: string }
| { type: 'SUCCESS' };
// 定义状态机的状态
export interface OrderMachineState {
states: {
pending: {
states: {
lockingInventory: {};
processingPayment: {};
creatingOrderRecord: {};
success: {};
};
};
compensating: {
states: {
releasingInventory: {};
refundingPayment: {};
failure: {};
};
};
completed: {};
failed: {};
};
}
export const orderMachine = createMachine<OrderMachineContext, OrderMachineEvent, OrderMachineState>({
id: 'order',
initial: 'pending',
context: {
orderId: '',
productInfo: [],
amount: 0,
},
states: {
pending: {
on: {
// 允许在pending状态下接收FAIL事件,直接进入补偿流程
FAIL: {
target: 'compensating',
actions: assign({ error: (_, event) => event.error }),
},
},
initial: 'lockingInventory',
states: {
lockingInventory: {
// 'invoke' 定义了进入该状态时要执行的异步操作
invoke: {
id: 'lockInventory',
src: 'lockInventory', // 这个字符串将映射到NestJS Service中的一个方法
onDone: {
target: 'processingPayment',
// 'assign' action 用于更新状态机的 context
actions: assign({
inventoryLockId: (_, event) => event.data.inventoryLockId,
}),
},
onError: {
target: '#order.compensating', // 绝对ID跳转
actions: assign({ error: (_, event) => event.data.message }),
},
},
},
processingPayment: {
invoke: {
id: 'processPayment',
src: 'processPayment',
onDone: {
target: 'creatingOrderRecord',
actions: assign({
paymentTransactionId: (_, event) => event.data.paymentTransactionId,
}),
},
onError: {
target: '#order.compensating.releasingInventory',
actions: assign({ error: (_, event) => event.data.message }),
},
},
},
creatingOrderRecord: {
invoke: {
id: 'updateOrderToComplete',
src: 'updateOrderToComplete',
onDone: { target: '#order.completed' },
onError: {
target: '#order.compensating.refundingPayment',
actions: assign({ error: (_, event) => event.data.message }),
},
},
},
success: {
type: 'final',
},
},
onDone: 'completed',
},
compensating: {
initial: 'releasingInventory',
states: {
releasingInventory: {
// 只有在存在 inventoryLockId 时才执行补偿
always: [{ target: 'refundingPayment', cond: (context) => !context.inventoryLockId }],
invoke: {
id: 'releaseInventory',
src: 'releaseInventory',
onDone: 'refundingPayment',
onError: {
target: 'failure',
// 这里的错误是补偿逻辑失败,需要记录严重错误
actions: assign({
error: (context, event) =>
`CRITICAL: Failed to release inventory ${context.inventoryLockId}. Reason: ${event.data.message}`,
}),
},
},
},
refundingPayment: {
always: [{ target: 'failure', cond: (context) => !context.paymentTransactionId }],
invoke: {
id: 'refundPayment',
src: 'refundPayment',
onDone: 'failure',
onError: {
target: 'failure',
actions: assign({
error: (context, event) =>
`CRITICAL: Failed to refund payment ${context.paymentTransactionId}. Reason: ${event.data.message}`,
}),
},
},
},
failure: {
invoke: {
id: 'updateOrderToFailed',
src: 'updateOrderToFailed',
onDone: '#order.failed',
},
},
},
},
completed: {
type: 'final',
},
failed: {
type: 'final',
},
},
});
这个状态机定义非常详尽,它清晰地描述了:
- 主流程:
lockingInventory
->processingPayment
->creatingOrderRecord
. - 补偿流程:
releasingInventory
->refundingPayment
. - 条件判断: 使用
cond
来决定是否需要执行某一步补偿。 - 上下文管理: 使用
assign
来将异步操作的结果保存到context
中。
3. 在NestJS Service中集成并执行
OrderSagaService
将负责创建、解释和持久化状态机实例。
// src/order/services/order-saga.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { interpret, StateMachine } from 'xstate';
import { Order, OrderStatus } from '../entities/order.entity';
import { orderMachine, OrderMachineContext, OrderMachineEvent } from '../machines/order.machine';
import { InventoryService } from './inventory.service'; // Mock service
import { PaymentService } from './payment.service'; // Mock service
@Injectable()
export class OrderSagaService {
private readonly logger = new Logger(OrderSagaService.name);
constructor(
@InjectRepository(Order)
private readonly orderRepository: Repository<Order>,
private readonly inventoryService: InventoryService,
private readonly paymentService: PaymentService,
) {}
/**
* 单元测试思路:
* 1. Mock InventoryService 和 PaymentService, 分别测试它们成功和失败的场景。
* 2. 使用 XState 的 test 工具来验证在不同事件下,状态机是否转换到预期状态。
* 3. 验证每次状态转换后,orderRepository.save 是否被正确调用,并且 machineState 字段被更新。
*/
public async createOrder(productInfo: any, amount: number) {
// 1. 创建一个初始的订单记录,状态为 PENDING
const order = this.orderRepository.create({
productInfo,
amount,
status: OrderStatus.PENDING,
});
await this.orderRepository.save(order);
this.logger.log(`Order ${order.id} created, starting saga...`);
// 2. 启动状态机
this.startMachine(order);
return order;
}
// 用于服务重启后恢复中断的流程
public async resumePendingOrders() {
const pendingOrders = await this.orderRepository.find({
where: { status: OrderStatus.PENDING }
});
this.logger.log(`Found ${pendingOrders.length} pending orders to resume.`);
for (const order of pendingOrders) {
this.startMachine(order, order.machineState);
}
}
private startMachine(order: Order, existingState?: any) {
const machine = this.createMachineInstance(order.id);
const service = interpret(machine);
// 监听状态转换,并将新状态持久化到数据库
service.onTransition(async (state) => {
if (state.changed) {
this.logger.log(`Order ${order.id} transitioned to ${JSON.stringify(state.value)}`);
// 核心:持久化状态机快照
order.machineState = JSON.parse(JSON.stringify(state)); // deep clone
if (state.matches('completed')) {
order.status = OrderStatus.COMPLETED;
} else if (state.matches('failed')) {
order.status = OrderStatus.FAILED;
order.failureReason = state.context.error;
}
await this.orderRepository.save(order);
}
});
service.start(existingState); // 如果有旧状态,则从旧状态恢复
}
private createMachineInstance(orderId: string): StateMachine<OrderMachineContext, any, OrderMachineEvent> {
const order = await this.orderRepository.findOneByOrFail({ id: orderId });
// 为状态机提供具体的服务实现
return orderMachine.withConfig({
services: {
lockInventory: async (context) => {
this.logger.log(`[Order: ${context.orderId}] Locking inventory...`);
return this.inventoryService.lock(context.productInfo);
},
processPayment: async (context) => {
this.logger.log(`[Order: ${context.orderId}] Processing payment...`);
return this.paymentService.process(context.amount);
},
updateOrderToComplete: async (context) => {
this.logger.log(`[Order: ${context.orderId}] Updating order to complete.`);
// 最终状态的更新在 onTransition 中处理,这里仅表示成功
return Promise.resolve();
},
releaseInventory: async (context) => {
this.logger.warn(`[Order: ${context.orderId}] Compensating: releasing inventory...`);
return this.inventoryService.release(context.inventoryLockId!);
},
refundPayment: async (context) => {
this.logger.warn(`[Order: ${context.orderId}] Compensating: refunding payment...`);
return this.paymentService.refund(context.paymentTransactionId!);
},
updateOrderToFailed: async (context) => {
this.logger.error(`[Order: ${context.orderId}] Updating order to failed.`);
// 最终状态的更新在 onTransition 中处理
return Promise.resolve();
}
},
}).withContext({
...orderMachine.context,
orderId: order.id,
productInfo: order.productInfo,
amount: order.amount
});
}
}
这段代码展示了集成的关键:
- 创建与启动:
createOrder
方法在数据库中创建一条初始记录,然后调用startMachine
。 - 动态服务实现:
createMachineInstance
使用withConfig
将状态机定义中的抽象服务(如'lockInventory'
)映射到OrderSagaService
中具体的、依赖注入的服务方法。 - 持久化:
onTransition
回调是实现可恢复性的核心。每次状态机状态发生变化,都会将其最新的状态序列化并存入订单实体的machineState
字段。 - 恢复机制:
resumePendingOrders
方法可以在应用启动时调用。它查询所有处于PENDING
状态的订单,并使用它们存储的machineState
来恢复状态机,使其从上次中断的地方继续执行。
4. 可视化状态机流程
使用Mermaid.js可以直观地展示我们设计的复杂流程。
stateDiagram-v2 direction LR [*] --> pending state pending { direction LR [*] --> lockingInventory lockingInventory --> processingPayment: onDone lockingInventory --> compensating: onError processingPayment --> creatingOrderRecord: onDone processingPayment --> compensating: onError creatingOrderRecord --> completed: onDone creatingOrderRecord --> compensating: onError } state compensating { direction LR [*] --> releasingInventory releasingInventory --> refundingPayment: onDone / cond releasingInventory --> failure: onError refundingPayment --> failure: onDone / cond refundingPayment --> failure: onError failure --> failed } pending --> completed compensating --> failed completed --> [*] failed --> [*]
这张图清晰地展示了主路径和所有补偿路径,这正是状态机带来的巨大优势:让复杂的流程逻辑变得一目了然。
架构的局限性与未来展望
此方案通过在单体应用内部利用状态机和数据库实现了Saga模式,极大地提高了业务流程的健壮性。然而,它并非没有局限性。
首先,状态机的执行被绑定在单个NestJS实例上。如果应用是多实例部署,必须确保一个Saga实例(一个订单流程)始终由同一个应用实例处理,或者引入分布式锁来避免并发修改状态机。一种更云原生的做法是将状态转换的触发从直接的invoke
调用改为通过消息队列(如RabbitMQ或Kafka)发布和订阅事件。状态机在接收到外部事件后进行转换,这使其能更好地适应分布式和Serverless环境。
其次,状态机定义的版本控制是一个挑战。如果业务流程需要变更(例如增加一个新的“发货通知”步骤),就需要策略来处理正在运行中的旧版本状态机实例。这可能需要实现版本化状态机,或者设计能平滑迁移的转换逻辑。
最后,尽管此模式解决了业务流程的可靠性,但它引入了新的复杂性。对于简单的、少于三个步骤且不需要补偿的流程,使用这种模式可能是一种过度设计。技术选型的关键在于权衡,必须根据业务场景的复杂性和对数据一致性的要求来做出判断。未来的探索方向可能包括将这种Saga执行器抽象成一个通用的、与业务逻辑解耦的平台级服务。