在 NestJS 中集成 XState 实现可恢复的 Saga 事务模式


处理跨多个服务或模块的长时间运行业务流程,数据一致性是一个绕不开的难题。一个典型的场景是订单创建:你需要锁定库存、处理支付、创建订单记录,可能还需要通知物流。这些步骤中任何一步失败,整个系统都必须回退到一致的状态。如果服务在执行到一半时崩溃重启,又该如何从中断点安全地恢复?

在真实项目中,我们经常看到这样的实现:一个庞大的try...catch块,里面串联着多个await调用。这种方式脆弱且难以维护。当补偿逻辑(例如,支付成功但库存锁定失败后需要退款)变得复杂时,代码会迅速退化成难以理解的嵌套if-else。更严重的是,它无法处理进程崩溃的场景,一次意外的重启就可能导致数据永久不一致——钱扣了,但订单没创建,库存也没解锁。

另一种方案是引入数据库状态字段,例如order_status,通过PENDING, PAID, INVENTORY_LOCKED等状态来追踪进度。这种方法比纯粹的过程式代码要好,但它将复杂的业务流程逻辑与数据模型紧密耦合。状态转换的逻辑散布在各个业务方法中,缺乏一个集中的、可被验证的定义,很快就会变成另一个维护噩emon。

我们需要一个更健壮的方案来对整个流程进行建模和编排。这个方案必须能明确地定义流程中的每一步、每种可能的成功或失败路径,并且支持持久化,以便在服务中断后能够精确恢复。这就是有限状态机(Finite State Machine)及其在Saga事务模式中应用的价值所在。本文将探讨如何在NestJS后端应用中,结合TypeScript的类型系统、XState的状态机库以及SQL数据库的持久化能力,构建一个可恢复、可观测且高度可维护的Saga工作流。

方案对比与技术选型

方案A:传统的Service层编排

这是最常见的实现方式。一个OrderService可能包含如下方法:

// a-service-orchestration.ts

@Injectable()
export class OrderService {
  constructor(
    private readonly inventoryService: InventoryService,
    private readonly paymentService: PaymentService,
    // ... other services
  ) {}

  async createOrder(orderData: CreateOrderDto): Promise<Order> {
    let inventoryLockId: string | null = null;
    let paymentTransactionId: string | null = null;

    try {
      // Step 1: Lock inventory
      const lock = await this.inventoryService.lock(orderData.items);
      inventoryLockId = lock.id;

      // Step 2: Process payment
      const payment = await this.paymentService.process(orderData.paymentDetails);
      paymentTransactionId = payment.id;
      
      // Imagine the server crashes here.
      // Inventory is locked, payment is processed, but no order is created.
      // This results in data inconsistency.

      // Step 3: Create order record
      const order = await this.orderRepository.create({
        ...orderData,
        status: 'COMPLETED',
        inventoryLockId,
        paymentTransactionId,
      });
      
      return order;

    } catch (error) {
      // Compensation logic
      if (paymentTransactionId) {
        await this.paymentService.refund(paymentTransactionId);
      }
      if (inventoryLockId) {
        await this.inventoryService.release(inventoryLockId);
      }
      
      // This compensation logic itself can fail.
      // What if refund succeeds but inventory release fails?

      throw new Error(`Order creation failed: ${error.message}`);
    }
  }
}

劣势分析:

  1. 脆弱性: 进程崩溃会导致状态不一致,且没有内建的恢复机制。
  2. 补偿逻辑复杂: catch块中的补偿逻辑会随着步骤增多而变得异常复杂,容易出错。
  3. 状态不明确: 流程的当前状态是隐式的,由代码执行到哪一行决定,难以追踪和调试。
  4. 职责不清: createOrder方法承担了过多的编排职责,违反了单一职责原则。

方案B:使用XState进行状态机建模

我们将整个订单创建流程抽象为一个状态机。每个步骤(锁定库存、支付)都是一个状态,步骤的成功或失败是触发状态转换的事件。

// b-state-machine-concept.ts
import { createMachine } from 'xstate';

const orderCreationMachine = createMachine({
  id: 'orderCreation',
  initial: 'lockingInventory',
  context: {
    // ... order data
  },
  states: {
    lockingInventory: {
      invoke: {
        src: 'lockInventoryService',
        onDone: 'processingPayment',
        onError: 'compensatingInventory',
      },
    },
    processingPayment: {
      invoke: {
        src: 'processPaymentService',
        onDone: 'creatingOrderRecord',
        onError: 'compensatingPayment',
      },
    },
    creatingOrderRecord: {
      invoke: {
        src: 'createOrderRecordService',
        onDone: 'completed',
        onError: 'compensatingOrderCreation',
      }
    },
    // ... compensation states ...
    completed: {
      type: 'final'
    },
    failed: {
      type: 'final'
    }
  }
});

优势分析:

  1. 明确性与可视化: 状态和转换是显式定义的。整个业务流程可以被清晰地可视化(XState提供了可视化工具),降低了理解成本。
  2. 关注点分离: 状态机定义了流程的“what”和“when”,而具体的实现(异步服务调用)则被分离出去,由状态机在进入特定状态时invoke
  3. 内建的错误处理: onError转换专门用于处理失败路径,使得补偿逻辑的定义与主流程一样清晰。
  4. 可持久化与可恢复: XState的状态对象是可序列化的JSON。我们可以将当前状态(包括上下文数据)存入SQL数据库。当服务重启后,可以从数据库加载状态,恢复状态机并从中断的地方继续执行。

最终决策:

对于这种涉及多个步骤、需要补偿逻辑且对一致性要求高的业务流程,方案B(XState)是明显更优的选择。它牺牲了少量的 первоначальную简单性,换来了极高的可维护性、健壮性和可观测性。在真实项目中,这种前期投入能够避免后期无尽的线上问题排查和数据修复。

核心实现概览

我们将使用NestJS作为应用框架,TypeORM作为SQL数据库的ORM,以及@xstate/fsm(一个轻量级的XState核心)来实现这个模式。

1. 数据库实体设计

我们需要一个实体来存储订单信息,并且关键是,要有一个字段来存储状态机的序列化状态。

// src/order/entities/order.entity.ts
import { Entity, PrimaryGeneratedColumn, Column, CreateDateColumn, UpdateDateColumn } from 'typeorm';

export enum OrderStatus {
  PENDING = 'PENDING',
  COMPLETED = 'COMPLETED',
  FAILED = 'FAILED',
}

@Entity('orders')
export class Order {
  @PrimaryGeneratedColumn('uuid')
  id: string;

  @Column({ type: 'jsonb' })
  productInfo: { productId: string; quantity: number }[];

  @Column({ type: 'decimal', precision: 10, scale: 2 })
  amount: number;

  @Column({
    type: 'enum',
    enum: OrderStatus,
    default: OrderStatus.PENDING,
  })
  status: OrderStatus;

  // 这是关键字段,用于存储XState状态机的快照
  @Column({ type: 'jsonb', nullable: true })
  machineState: any;

  @Column({ nullable: true })
  failureReason?: string;

  @CreateDateColumn()
  createdAt: Date;

  @UpdateDateColumn()
  updatedAt: Date;
}

2. 定义状态机

这是整个架构的核心。我们定义一个包含主流程和完整补偿逻辑的状态机。

// src/order/machines/order.machine.ts
import { createMachine, assign } from 'xstate';

// 定义状态机上下文的类型
export interface OrderMachineContext {
  orderId: string;
  productInfo: { productId: string; quantity: number }[];
  amount: number;
  inventoryLockId?: string;
  paymentTransactionId?: string;
  error?: string;
}

// 定义状态机可以接收的事件
export type OrderMachineEvent =
  | { type: 'CREATE'; data: { orderId: string; productInfo: any; amount: number } }
  | { type: 'RETRY' }
  | { type: 'FAIL'; error: string }
  | { type: 'SUCCESS' };

// 定义状态机的状态
export interface OrderMachineState {
  states: {
    pending: {
      states: {
        lockingInventory: {};
        processingPayment: {};
        creatingOrderRecord: {};
        success: {};
      };
    };
    compensating: {
      states: {
        releasingInventory: {};
        refundingPayment: {};
        failure: {};
      };
    };
    completed: {};
    failed: {};
  };
}

export const orderMachine = createMachine<OrderMachineContext, OrderMachineEvent, OrderMachineState>({
  id: 'order',
  initial: 'pending',
  context: {
    orderId: '',
    productInfo: [],
    amount: 0,
  },
  states: {
    pending: {
      on: {
        // 允许在pending状态下接收FAIL事件,直接进入补偿流程
        FAIL: {
          target: 'compensating',
          actions: assign({ error: (_, event) => event.error }),
        },
      },
      initial: 'lockingInventory',
      states: {
        lockingInventory: {
          // 'invoke' 定义了进入该状态时要执行的异步操作
          invoke: {
            id: 'lockInventory',
            src: 'lockInventory', // 这个字符串将映射到NestJS Service中的一个方法
            onDone: {
              target: 'processingPayment',
              // 'assign' action 用于更新状态机的 context
              actions: assign({
                inventoryLockId: (_, event) => event.data.inventoryLockId,
              }),
            },
            onError: {
              target: '#order.compensating', // 绝对ID跳转
              actions: assign({ error: (_, event) => event.data.message }),
            },
          },
        },
        processingPayment: {
          invoke: {
            id: 'processPayment',
            src: 'processPayment',
            onDone: {
              target: 'creatingOrderRecord',
              actions: assign({
                paymentTransactionId: (_, event) => event.data.paymentTransactionId,
              }),
            },
            onError: {
              target: '#order.compensating.releasingInventory',
              actions: assign({ error: (_, event) => event.data.message }),
            },
          },
        },
        creatingOrderRecord: {
          invoke: {
            id: 'updateOrderToComplete',
            src: 'updateOrderToComplete',
            onDone: { target: '#order.completed' },
            onError: {
              target: '#order.compensating.refundingPayment',
              actions: assign({ error: (_, event) => event.data.message }),
            },
          },
        },
        success: {
          type: 'final',
        },
      },
      onDone: 'completed',
    },
    compensating: {
      initial: 'releasingInventory',
      states: {
        releasingInventory: {
          // 只有在存在 inventoryLockId 时才执行补偿
          always: [{ target: 'refundingPayment', cond: (context) => !context.inventoryLockId }],
          invoke: {
            id: 'releaseInventory',
            src: 'releaseInventory',
            onDone: 'refundingPayment',
            onError: {
              target: 'failure',
              // 这里的错误是补偿逻辑失败,需要记录严重错误
              actions: assign({
                error: (context, event) =>
                  `CRITICAL: Failed to release inventory ${context.inventoryLockId}. Reason: ${event.data.message}`,
              }),
            },
          },
        },
        refundingPayment: {
          always: [{ target: 'failure', cond: (context) => !context.paymentTransactionId }],
          invoke: {
            id: 'refundPayment',
            src: 'refundPayment',
            onDone: 'failure',
            onError: {
              target: 'failure',
              actions: assign({
                error: (context, event) =>
                  `CRITICAL: Failed to refund payment ${context.paymentTransactionId}. Reason: ${event.data.message}`,
              }),
            },
          },
        },
        failure: {
          invoke: {
            id: 'updateOrderToFailed',
            src: 'updateOrderToFailed',
            onDone: '#order.failed',
          },
        },
      },
    },
    completed: {
      type: 'final',
    },
    failed: {
      type: 'final',
    },
  },
});

这个状态机定义非常详尽,它清晰地描述了:

  • 主流程: lockingInventory -> processingPayment -> creatingOrderRecord.
  • 补偿流程: releasingInventory -> refundingPayment.
  • 条件判断: 使用cond来决定是否需要执行某一步补偿。
  • 上下文管理: 使用assign来将异步操作的结果保存到context中。

3. 在NestJS Service中集成并执行

OrderSagaService将负责创建、解释和持久化状态机实例。

// src/order/services/order-saga.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { interpret, StateMachine } from 'xstate';
import { Order, OrderStatus } from '../entities/order.entity';
import { orderMachine, OrderMachineContext, OrderMachineEvent } from '../machines/order.machine';
import { InventoryService } from './inventory.service'; // Mock service
import { PaymentService } from './payment.service';   // Mock service

@Injectable()
export class OrderSagaService {
  private readonly logger = new Logger(OrderSagaService.name);

  constructor(
    @InjectRepository(Order)
    private readonly orderRepository: Repository<Order>,
    private readonly inventoryService: InventoryService,
    private readonly paymentService: PaymentService,
  ) {}
  
  /**
   * 单元测试思路:
   * 1. Mock InventoryService 和 PaymentService, 分别测试它们成功和失败的场景。
   * 2. 使用 XState 的 test 工具来验证在不同事件下,状态机是否转换到预期状态。
   * 3. 验证每次状态转换后,orderRepository.save 是否被正确调用,并且 machineState 字段被更新。
   */
  public async createOrder(productInfo: any, amount: number) {
    // 1. 创建一个初始的订单记录,状态为 PENDING
    const order = this.orderRepository.create({
      productInfo,
      amount,
      status: OrderStatus.PENDING,
    });
    await this.orderRepository.save(order);
    this.logger.log(`Order ${order.id} created, starting saga...`);

    // 2. 启动状态机
    this.startMachine(order);

    return order;
  }
  
  // 用于服务重启后恢复中断的流程
  public async resumePendingOrders() {
    const pendingOrders = await this.orderRepository.find({
        where: { status: OrderStatus.PENDING }
    });
    this.logger.log(`Found ${pendingOrders.length} pending orders to resume.`);
    for (const order of pendingOrders) {
        this.startMachine(order, order.machineState);
    }
  }

  private startMachine(order: Order, existingState?: any) {
    const machine = this.createMachineInstance(order.id);

    const service = interpret(machine);

    // 监听状态转换,并将新状态持久化到数据库
    service.onTransition(async (state) => {
      if (state.changed) {
        this.logger.log(`Order ${order.id} transitioned to ${JSON.stringify(state.value)}`);
        
        // 核心:持久化状态机快照
        order.machineState = JSON.parse(JSON.stringify(state)); // deep clone
        
        if (state.matches('completed')) {
            order.status = OrderStatus.COMPLETED;
        } else if (state.matches('failed')) {
            order.status = OrderStatus.FAILED;
            order.failureReason = state.context.error;
        }

        await this.orderRepository.save(order);
      }
    });

    service.start(existingState); // 如果有旧状态,则从旧状态恢复
  }
  
  private createMachineInstance(orderId: string): StateMachine<OrderMachineContext, any, OrderMachineEvent> {
    const order = await this.orderRepository.findOneByOrFail({ id: orderId });
    // 为状态机提供具体的服务实现
    return orderMachine.withConfig({
      services: {
        lockInventory: async (context) => {
          this.logger.log(`[Order: ${context.orderId}] Locking inventory...`);
          return this.inventoryService.lock(context.productInfo);
        },
        processPayment: async (context) => {
          this.logger.log(`[Order: ${context.orderId}] Processing payment...`);
          return this.paymentService.process(context.amount);
        },
        updateOrderToComplete: async (context) => {
          this.logger.log(`[Order: ${context.orderId}] Updating order to complete.`);
          // 最终状态的更新在 onTransition 中处理,这里仅表示成功
          return Promise.resolve();
        },
        releaseInventory: async (context) => {
          this.logger.warn(`[Order: ${context.orderId}] Compensating: releasing inventory...`);
          return this.inventoryService.release(context.inventoryLockId!);
        },
        refundPayment: async (context) => {
          this.logger.warn(`[Order: ${context.orderId}] Compensating: refunding payment...`);
          return this.paymentService.refund(context.paymentTransactionId!);
        },
        updateOrderToFailed: async (context) => {
            this.logger.error(`[Order: ${context.orderId}] Updating order to failed.`);
             // 最终状态的更新在 onTransition 中处理
            return Promise.resolve();
        }
      },
    }).withContext({
        ...orderMachine.context,
        orderId: order.id,
        productInfo: order.productInfo,
        amount: order.amount
    });
  }
}

这段代码展示了集成的关键:

  1. 创建与启动: createOrder方法在数据库中创建一条初始记录,然后调用startMachine
  2. 动态服务实现: createMachineInstance使用withConfig将状态机定义中的抽象服务(如'lockInventory')映射到OrderSagaService中具体的、依赖注入的服务方法。
  3. 持久化: onTransition回调是实现可恢复性的核心。每次状态机状态发生变化,都会将其最新的状态序列化并存入订单实体的machineState字段。
  4. 恢复机制: resumePendingOrders方法可以在应用启动时调用。它查询所有处于PENDING状态的订单,并使用它们存储的machineState来恢复状态机,使其从上次中断的地方继续执行。

4. 可视化状态机流程

使用Mermaid.js可以直观地展示我们设计的复杂流程。

stateDiagram-v2
    direction LR
    
    [*] --> pending

    state pending {
        direction LR
        [*] --> lockingInventory
        lockingInventory --> processingPayment: onDone
        lockingInventory --> compensating: onError
        
        processingPayment --> creatingOrderRecord: onDone
        processingPayment --> compensating: onError
        
        creatingOrderRecord --> completed: onDone
        creatingOrderRecord --> compensating: onError
    }
    
    state compensating {
        direction LR
        [*] --> releasingInventory
        releasingInventory --> refundingPayment: onDone / cond
        releasingInventory --> failure: onError
        
        refundingPayment --> failure: onDone / cond
        refundingPayment --> failure: onError
        
        failure --> failed
    }

    pending --> completed
    compensating --> failed
    completed --> [*]
    failed --> [*]

这张图清晰地展示了主路径和所有补偿路径,这正是状态机带来的巨大优势:让复杂的流程逻辑变得一目了然。

架构的局限性与未来展望

此方案通过在单体应用内部利用状态机和数据库实现了Saga模式,极大地提高了业务流程的健壮性。然而,它并非没有局限性。

首先,状态机的执行被绑定在单个NestJS实例上。如果应用是多实例部署,必须确保一个Saga实例(一个订单流程)始终由同一个应用实例处理,或者引入分布式锁来避免并发修改状态机。一种更云原生的做法是将状态转换的触发从直接的invoke调用改为通过消息队列(如RabbitMQ或Kafka)发布和订阅事件。状态机在接收到外部事件后进行转换,这使其能更好地适应分布式和Serverless环境。

其次,状态机定义的版本控制是一个挑战。如果业务流程需要变更(例如增加一个新的“发货通知”步骤),就需要策略来处理正在运行中的旧版本状态机实例。这可能需要实现版本化状态机,或者设计能平滑迁移的转换逻辑。

最后,尽管此模式解决了业务流程的可靠性,但它引入了新的复杂性。对于简单的、少于三个步骤且不需要补偿的流程,使用这种模式可能是一种过度设计。技术选型的关键在于权衡,必须根据业务场景的复杂性和对数据一致性的要求来做出判断。未来的探索方向可能包括将这种Saga执行器抽象成一个通用的、与业务逻辑解耦的平台级服务。


  目录