在 Nuxt.js 中集成基于 Event Sourcing 的 CQRS 读模型实现高性能搜索


一个看似简单的搜索框,背后可能隐藏着复杂的架构妥协。在处理一个内容频繁变更且需要完整审计日志的项目时,传统的 CRUD 模型很快就暴露了其局限性。每次更新,我们不仅要修改主数据表,还要同步更新一个非规范化的搜索表或 Elasticsearch 文档。这两个操作的事务一致性、性能损耗以及业务逻辑的耦合,都成了团队维护的痛点。尤其是在需要从同一份源数据构建多种不同用途的“视图”(例如,搜索视图、报表视图、分析视图)时,这种耦合的维护成本呈指数级增长。

我们决定从根源上解决这个问题:将系统的写入(Command)和读取(Query)操作彻底分离。这自然而然地导向了 CQRS (Command Query Responsibility Segregation) 架构模式。为了获得最大的灵活性和可追溯性,写入侧我们选择了 Event Sourcing。这意味着系统的唯一事实来源(Source of Truth)不再是当前状态,而是一系列不可变的领域事件(Domain Events)流。搜索功能,则作为读取侧的一个具体实现,即一个专门为查询优化的“读模型”。

这个决策引入了新的挑战:如何将事件流高效地“投影”到一个高性能的搜索引擎中?以及,作为前端的 Nuxt.js 应用,如何处理这种架构带来的“最终一致性”问题,为用户提供一个看似实时的搜索体验?

架构概览:命令、事件与投影

在我们深入代码之前,理解数据流的全貌至关重要。整个系统被划分为几个解耦的部分:

  1. Command API: 接收来自 Nuxt.js 前端的写操作请求(例如,创建文档、更新内容)。它不直接修改状态,而是验证命令并生成一个或多个事件。
  2. Event Store: 系统的核心。所有生成的事件都被持久化到这里。它是一个只允许追加的日志。在我们的实现中,为了简化,会使用一个内存中的数组,但在生产环境中这应该被替换为 Kafka、Pulsar 或 EventStoreDB。
  3. Projector (投影器): 一个独立的后台进程。它持续监听 Event Store 中的新事件,并将这些事件转化为对读模型的更新。这是连接写模型和读模型的桥梁。
  4. Read Model (Search Index): 我们的高性能搜索服务。这里选择 Meilisearch,因为它轻量、快速且易于部署。Projector 会将事件内容写入 Meilisearch。
  5. Query API: 专为 Nuxt.js 前端提供搜索服务的 API。它只与 Meilisearch 交互,完全不知道 Event Store 的存在。
  6. Nuxt.js Frontend: 用户界面。它通过 Command API 发送写命令,通过 Query API 获取搜索结果,并通过 WebSocket 接收状态更新以处理最终一致性。
sequenceDiagram
    participant Nuxt as Nuxt.js Client
    participant CmdAPI as Command API
    participant Events as Event Store
    participant Proj as Projector
    participant SearchDB as Meilisearch (Read Model)
    participant QueryAPI as Query API
    participant WS as WebSocket Server

    Nuxt->>+CmdAPI: POST /documents (CreateDocumentCommand)
    CmdAPI->>CmdAPI: 验证命令, 生成事件
    CmdAPI->>+Events: Append(DocumentCreatedEvent)
    Events-->>-CmdAPI: Ack
    CmdAPI->>+WS: Broadcast('document_processing', {id: 'doc1'})
    WS-->>Nuxt: Push('document_processing', {id: 'doc1'})
    CmdAPI-->>-Nuxt: 202 Accepted {id: 'doc1'}

    Note right of Events: Projector is listening...
    Proj->>+Events: Read new events
    Events-->>-Proj: [DocumentCreatedEvent]
    Proj->>Proj: 转换事件为 Meilisearch 文档
    Proj->>+SearchDB: Add Document {id: 'doc1', ...}
    SearchDB-->>-Proj: Ack
    Proj->>+WS: Broadcast('document_indexed', {id: 'doc1'})
    WS-->>Nuxt: Push('document_indexed', {id: 'doc1'})

    Nuxt->>+QueryAPI: GET /search?q=...
    QueryAPI->>+SearchDB: search('...')
    SearchDB-->>-QueryAPI: Search Results
    QueryAPI-->>-Nuxt: 200 OK [Results]

第一步:构建写入侧 - 命令与事件存储

我们的起点是构建一个简单的 Node.js 服务器来处理命令并存储事件。我们将使用 Nitro 作为底层服务器,因为它与 Nuxt 3 天然集成。

首先定义我们的命令和事件结构。这是一个关键的设计步骤,事件应该捕获业务意图,而不是简单的数据变更。

server/domain/types.ts

// 统一的事件结构
export interface DomainEvent {
  streamId: string; // 聚合根 ID,这里是文档 ID
  version: number;
  type: string;
  timestamp: number;
  payload: Record<string, any>;
}

// 命令定义
export interface CreateDocumentCommand {
  type: 'CREATE_DOCUMENT';
  payload: {
    documentId: string;

    title: string;
    author: string;
  };
}

export interface UpdateDocumentContentCommand {
  type: 'UPDATE_DOCUMENT_CONTENT';
  payload: {
    documentId: string;
    content: string;
  };
}

export type Command = CreateDocumentCommand | UpdateDocumentContentCommand;

// 事件定义
export interface DocumentCreatedEventPayload {
  title: string;
  author: string;
  createdAt: number;
}

export interface DocumentContentUpdatedEventPayload {
  content: string;
  updatedAt: number;
}

接下来是 Event Store 的一个极简内存实现。在真实项目中,这里的并发控制(通过检查 version)和持久化是核心难点。

server/services/eventStore.ts

import { DomainEvent } from '~/server/domain/types';

// 这是用于演示的非生产级内存事件存储
// 生产环境应使用专用数据库或消息队列
const streams: Map<string, DomainEvent[]> = new Map();

export const appendToStream = async (streamId: string, events: Omit<DomainEvent, 'version' | 'streamId'>[]): Promise<DomainEvent[]> => {
  if (!streams.has(streamId)) {
    streams.set(streamId, []);
  }

  const stream = streams.get(streamId)!;
  const currentVersion = stream.length;
  let nextVersion = currentVersion + 1;

  const newDomainEvents: DomainEvent[] = events.map(event => ({
    ...event,
    streamId,
    version: nextVersion++,
  }));

  // 模拟持久化
  stream.push(...newDomainEvents);
  
  // 在真实系统中,这里会触发一个钩子或消息,通知投影器
  // 为简单起见,我们稍后将直接调用投影器
  console.log(`[EventStore] Appended ${newDomainEvents.length} events to stream ${streamId}`);

  return newDomainEvents;
};

export const getStream = async (streamId: string): Promise<DomainEvent[]> => {
  return streams.get(streamId) || [];
};

// 用于投影器读取所有事件
export const readAllEvents = async (fromOffset: number = 0): Promise<DomainEvent[]> => {
    const allEvents: DomainEvent[] = Array.from(streams.values()).flat();
    allEvents.sort((a, b) => a.timestamp - b.timestamp); // 按时间戳排序
    return allEvents.slice(fromOffset);
}

最后,是处理 HTTP 请求的 Command API 端点。

server/api/commands.post.ts

import { appendToStream, getStream } from '~/server/services/eventStore';
import { 
  Command,
  DomainEvent,
  DocumentContentUpdatedEventPayload,
  DocumentCreatedEventPayload
} from '~/server/domain/types';
import { v4 as uuidv4 } from 'uuid';

export default defineEventHandler(async (event) => {
  const command = await readBody<Command>(event);

  try {
    switch (command.type) {
      case 'CREATE_DOCUMENT': {
        const { documentId, title, author } = command.payload;
        
        // 幂等性检查:确保文档不存在
        const existingStream = await getStream(documentId);
        if (existingStream.length > 0) {
          throw createError({ statusCode: 409, statusMessage: 'Document already exists' });
        }
        
        const newEvent = {
          type: 'DOCUMENT_CREATED',
          timestamp: Date.now(),
          payload: {
            title,
            author,
            createdAt: Date.now(),
          } as DocumentCreatedEventPayload,
        };
        
        await appendToStream(documentId, [newEvent]);
        // 生产环境中,这里会触发 WebSocket 通知
        // broadcastToClients('document_processing', { documentId });
        
        setResponseStatus(event, 202); // Accepted
        return { documentId };
      }

      case 'UPDATE_DOCUMENT_CONTENT': {
        const { documentId, content } = command.payload;
        
        // 验证文档必须存在
        const existingStream = await getStream(documentId);
        if (existingStream.length === 0) {
          throw createError({ statusCode: 404, statusMessage: 'Document not found' });
        }
        
        const newEvent = {
          type: 'DOCUMENT_CONTENT_UPDATED',
          timestamp: Date.now(),
          payload: {
            content,
            updatedAt: Date.now(),
          } as DocumentContentUpdatedEventPayload,
        };

        await appendToStream(documentId, [newEvent]);
        // broadcastToClients('document_processing', { documentId });
        
        setResponseStatus(event, 202);
        return { documentId };
      }

      default:
        throw createError({ statusCode: 400, statusMessage: 'Invalid command type' });
    }
  } catch (error: any) {
    console.error('[CommandHandler] Error processing command:', error);
    // 向上抛出 Nitro 错误
    throw error;
  }
});

第二步:建立投影,连接写模型与读模型

Projector 是整个架构的粘合剂。它负责消费事件并更新 Meilisearch 索引。在一个真实的项目中,这会是一个独立运行的 Node.js 进程或一个 Serverless Function。为了演示,我们将在服务器启动时运行一个简单的轮询循环。

server/services/meilisearch.ts

import { MeiliSearch } from 'meilisearch';

// 确保在 .env 文件中配置了这些变量
const client = new MeiliSearch({
  host: process.env.MEILISEARCH_HOST!,
  apiKey: process.env.MEILISEARCH_API_KEY!,
});

export const getMeilisearchClient = () => client;

export async function initializeSearchIndex() {
  const indexName = 'documents';
  try {
    await client.getIndex(indexName);
  } catch (error: any) {
    if (error.code === 'index_not_found') {
      console.log(`[Meilisearch] Index '${indexName}' not found. Creating...`);
      await client.createIndex(indexName, { primaryKey: 'id' });
    } else {
      throw error;
    }
  }
  // 配置可搜索和可过滤的字段
  await client.index(indexName).updateFilterableAttributes(['author', 'createdAt']);
  await client.index(indexName).updateSortableAttributes(['createdAt']);
  console.log(`[Meilisearch] Index '${indexName}' is ready.`);
}

现在是 Projector 的核心逻辑。它需要维护自己的状态(最后处理的事件偏移量),以确保事件不会被重复处理或遗漏。

server/services/projector.ts

import { getMeilisearchClient } from './meilisearch';
import { readAllEvents } from './eventStore';
import { DomainEvent } from '~/server/domain/types';

let lastProcessedOffset = 0;
let isRunning = false;
const POLL_INTERVAL_MS = 2000; // 轮询间隔

// 这是一个简化的投影逻辑
// 生产系统需要更复杂的聚合逻辑
const projectEvent = async (event: DomainEvent): Promise<void> => {
  const client = getMeilisearchClient();
  const index = client.index('documents');
  const documentId = event.streamId;

  try {
    switch (event.type) {
      case 'DOCUMENT_CREATED': {
        const { title, author, createdAt } = event.payload;
        await index.addDocuments([{
          id: documentId,
          title,
          author,
          content: '', // 初始内容为空
          createdAt,
          updatedAt: createdAt,
        }]);
        console.log(`[Projector] Indexed new document: ${documentId}`);
        break;
      }
      case 'DOCUMENT_CONTENT_UPDATED': {
        const { content, updatedAt } = event.payload;
        await index.updateDocuments([{
          id: documentId,
          content,
          updatedAt,
        }]);
        console.log(`[Projector] Updated content for document: ${documentId}`);
        break;
      }
      default:
        // 忽略不关心的事件
        break;
    }
  } catch (error) {
    console.error(`[Projector] Failed to project event ${event.type} for stream ${event.streamId}`, error);
    // 在这里添加重试和死信队列逻辑
  }
};

async function runProjectionCycle() {
  if (isRunning) return;
  isRunning = true;
  
  try {
    const events = await readAllEvents(lastProcessedOffset);
    if (events.length > 0) {
      console.log(`[Projector] Found ${events.length} new events to project.`);
      for (const event of events) {
        await projectEvent(event);
      }
      lastProcessedOffset += events.length;
      // 在生产环境中,这个偏移量需要持久化到 Redis 或数据库中
    }
  } catch (error) {
    console.error('[Projector] Error during projection cycle:', error);
  } finally {
    isRunning = false;
  }
}

export function startProjector() {
  console.log('[Projector] Starting projector...');
  // 立即运行一次,然后设置定时器
  runProjectionCycle();
  setInterval(runProjectionCycle, POLL_INTERVAL_MS);
}

我们需要在 Nitro 服务器启动时初始化 Meilisearch 并启动 Projector。

server/plugins/01.search-init.ts

import { initializeSearchIndex } from '~/server/services/meilisearch';
import { startProjector } from '~/server/services/projector';

export default defineNitroPlugin(async (nitroApp) => {
  if (process.env.NODE_ENV === 'development') {
    try {
      await initializeSearchIndex();
      startProjector();
    } catch (e) {
      console.error('Failed to initialize search services:', e);
    }
  }
});

第三步:实现查询端点和 Nuxt.js 集成

查询 API 非常简单,它只是 Meilisearch 的一个代理。

server/api/search.get.ts

import { getMeilisearchClient } from '~/server/services/meilisearch';

export default defineEventHandler(async (event) => {
  const query = getQuery(event);
  const q = query.q as string || '';

  if (!q) {
    return { hits: [] };
  }

  const client = getMeilisearchClient();
  const index = client.index('documents');

  try {
    const searchResult = await index.search(q, {
      limit: 20,
    });
    return searchResult;
  } catch (error) {
    console.error('[SearchAPI] Error querying Meilisearch:', error);
    throw createError({ statusCode: 500, statusMessage: 'Search service unavailable' });
  }
});

现在,到了 Nuxt.js 前端的关键部分。我们需要处理搜索输入、调用 API,并优雅地处理“正在索引”这一中间状态。

app.vue

<template>
  <div class="container">
    <h1>Event Sourcing Search Demo</h1>

    <!-- 创建/更新表单 -->
    <div class="form-section">
      <h2>Create or Update Document</h2>
      <input v-model="form.id" placeholder="Document ID (leave blank to create)" />
      <input v-model="form.title" placeholder="Title (only for creation)" :disabled="!!form.id" />
      <input v-model="form.author" placeholder="Author (only for creation)" :disabled="!!form.id" />
      <textarea v-model="form.content" placeholder="Content (for updates)"></textarea>
      <button @click="submitCommand" :disabled="isSubmitting">
        {{ isSubmitting ? 'Submitting...' : (form.id ? 'Update Content' : 'Create Document') }}
      </button>
      <p v-if="submissionStatus" class="status-message">{{ submissionStatus }}</p>
    </div>

    <!-- 搜索区域 -->
    <div class="search-section">
      <h2>Search Documents</h2>
      <input v-model="searchQuery" @input="debouncedSearch" placeholder="Type to search..." />
      <div v-if="pending" class="loading">Loading...</div>
      <div v-else-if="error" class="error">Error: {{ error.message }}</div>
      <ul v-else-if="results && results.hits.length > 0">
        <li v-for="hit in results.hits" :key="hit.id" :class="{ 'processing': processingIds.has(hit.id) }">
          <h3>{{ hit.title }} <span v-if="processingIds.has(hit.id)">(Indexing...)</span></h3>
          <p><em>by {{ hit.author }}</em></p>
          <p>{{ hit.content }}</p>
          <small>ID: {{ hit.id }}</small>
        </li>
      </ul>
      <p v-else>No results found.</p>
    </div>
  </div>
</template>

<script setup lang="ts">
import { ref, reactive, watch } from 'vue';
import { v4 as uuidv4 } from 'uuid';
import { debounce } from 'lodash-es';

// --- State ---
const form = reactive({
  id: '',
  title: '',
  author: '',
  content: ''
});
const isSubmitting = ref(false);
const submissionStatus = ref('');

const searchQuery = ref('');
const processingIds = ref(new Set<string>()); // 存储正在处理的文档ID

// --- Command Submission Logic ---
async function submitCommand() {
  isSubmitting.value = true;
  submissionStatus.value = '';
  
  let command;
  let documentId = form.id || uuidv4();

  if (form.id) { // Update
    command = {
      type: 'UPDATE_DOCUMENT_CONTENT',
      payload: { documentId, content: form.content }
    };
  } else { // Create
    command = {
      type: 'CREATE_DOCUMENT',
      payload: { documentId, title: form.title, author: form.author }
    };
  }

  try {
    const response = await $fetch('/api/commands', {
      method: 'POST',
      body: command,
    });
    
    submissionStatus.value = `Command accepted for document ${response.documentId}. It will be searchable shortly.`;
    processingIds.value.add(response.documentId);
    
    // 模拟WebSocket通知,实际项目中应替换为真实WebSocket
    setTimeout(() => {
      console.log(`Simulating 'indexed' event for ${response.documentId}`);
      processingIds.value.delete(response.documentId);
      // 触发一次新的搜索来刷新结果
      refresh();
    }, 3000); // 假设投影需要3秒

  } catch (err: any) {
    submissionStatus.value = `Error: ${err.data?.statusMessage || err.message}`;
  } finally {
    isSubmitting.value = false;
  }
}

// --- Search Logic ---
const { data: results, pending, error, refresh } = useFetch(() => `/api/search?q=${searchQuery.value}`, {
  immediate: false, // 不立即执行
  watch: false, // 手动控制刷新
});

const debouncedSearch = debounce(() => {
  if (searchQuery.value.length > 1 || searchQuery.value.length === 0) {
    refresh();
  }
}, 300);

watch(searchQuery, debouncedSearch);

</script>

<style>
/* ... some basic styling ... */
.container { max-width: 800px; margin: 0 auto; font-family: sans-serif; }
.form-section, .search-section { border: 1px solid #ccc; padding: 1rem; margin-bottom: 1rem; border-radius: 5px; }
input, textarea { display: block; width: 95%; margin-bottom: 0.5rem; padding: 0.5rem; }
button { padding: 0.5rem 1rem; }
.status-message { color: blue; }
.error { color: red; }
.processing { opacity: 0.6; background-color: #f0f0f0; }
li { list-style: none; border-bottom: 1px solid #eee; padding: 1rem 0; }
</style>

在这个前端实现中,我们用 setTimeout 模拟了 WebSocket 通知。当一个写命令被接受后,我们将对应的 documentId 添加到 processingIds 集合中。UI 会根据这个集合为正在索引的条目显示一个特殊状态。当模拟的“索引完成”通知到达时,我们将其从集合中移除,并主动刷新搜索结果。这是在前端层面处理最终一致性、提升用户体验的一种务实方法。

当前方案的局限性与未来展望

这套架构解决了我们最初的痛点:写操作和读操作解耦,拥有完整的事件日志,并且能够灵活地构建多个读模型。然而,当前这个简化的实现存在几个明显的局限性,在投入生产前必须解决:

  1. Event Store 的健壮性: 内存中的 Event Store 是脆弱的。生产环境必须使用专用的、支持持久化、并发控制和高吞吐量的解决方案,如 Kafka、Pulsar 或专门的 EventStoreDB。
  2. Projector 的可靠性: 简单的轮询投影器是单点故障。它需要被改造成一个高可用的服务。此外,必须持久化其处理进度(lastProcessedOffset),以防重启后从头开始处理或丢失事件。这通常存储在 Redis 或数据库中。
  3. 读模型重建: 业务需求变化时,我们可能需要修改搜索索引的结构。这就要求我们有能力停止旧的投影器,启动一个新的投影器,从头开始回放所有历史事件来构建一个全新的读模型,整个过程需要做到对用户无感或停机时间极短。
  4. 快照(Snapshotting): 当一个文档的事件流变得非常长时,每次加载它以验证新命令(例如,检查文档是否存在)都需要回放所有事件,性能会下降。引入快照机制,即定期将文档的当前状态持久化,可以极大地优化这个过程。

尽管存在这些挑战,但从 CQRS 和 Event Sourcing 中获得的架构清晰度和业务灵活性是巨大的。它迫使我们更深入地思考业务流程的本质,将系统行为建模为一系列有意义的事件,而不仅仅是数据的增删改查。这种模式的投入,对于需要长期演进的复杂系统而言,是一笔值得的投资。


  目录