基于 Paxos 算法构建与 Jotai 集成的 Clojure 分布式状态机


同步多个客户端之间复杂的、相互依赖的 UI 状态,是一项艰巨的挑战。当下的主流方案,如 Operational Transformation (OT) 或 Conflict-free Replicated Data Types (CRDT),虽然在协同编辑等场景下表现出色,但它们要么实现逻辑异常复杂,要么在处理非交换性操作时需要精心设计数据结构,无法提供强一致性保证。一个更根本的问题是,如果业务逻辑要求所有参与者在任何时刻都必须观察到完全一致的状态,一个中心化的服务器就成了单点故障和性能瓶颈。

我们的痛点在于需要一种机制,它既能为前端提供响应式的状态管理,又能保证后端状态变更的绝对顺序和一致性,即使在部分节点发生故障时也是如此。

初步的构想是,将整个共享应用状态抽象为一个确定性的状态机。任何用户的操作都不直接修改状态,而是被封装成一个“提议”(Proposal),提交给一个分布式系统。这个系统的唯一职责就是就这些提议的顺序达成共识。一旦顺序确定,每个节点都可以独立地、按部就班地将这些操作应用到本地的状态机副本上,从而达到最终状态的一致。这个过程的核心就是解决分布式共识问题,Paxos 算法是绕不开的经典解决方案。

技术选型决策

  1. 共识算法:Paxos
    虽然 Raft 算法因其更好的可理解性而备受推崇,但选择 Paxos 是为了回归本源。理解 Paxos 的两阶段提交(Prepare-Accept)能让我们更深刻地把握分布式共识的本质。在我们的场景中,我们不需要一个完整的 Multi-Paxos 实现,一个能就单个值达成共识的 Basic Paxos 核心就足以验证整个架构的可行性。

  2. 后端语言:Clojure
    这是最关键的决策。Clojure 对不可变数据结构(Immutable Data Structures)的执着,使其成为实现状态机和共识算法的绝佳工具。状态机的每一次变迁,都是一个纯函数 (apply-fn current-state operation),返回一个全新的状态,没有任何副作用。这极大地简化了并发环境下的推理和调试。同时,Clojure 强大的 core.async 库为我们提供了一种优雅的方式来模拟或实现节点间的异步消息传递。JVM 的稳定性和生态也是重要的加分项。

  3. 前端状态管理:Jotai
    前端需要一个能够精细化、原子化管理状态的库。Jotai 的原子模型(Atoms)与我们的后端架构不谋而合。我们可以创建一个主状态原子,它只被动地接收来自后端共识层的权威状态更新。UI 的其他部分则通过派生原子(Derived Atoms)来响应式地展示数据。用户的操作则通过一个独立的、只写的原子(Write-only Atom)发送给后端,形成清晰的单向数据流。这种架构避免了前端状态的随意突变,所有状态变更的来源都唯一且可追溯。

步骤化实现

我们的目标是构建一个系统,其中多个 Web 客户端(使用 Jotai)通过 WebSocket 连接到一个 Clojure 应用集群。这个集群在内部运行 Paxos 算法来对客户端提交的操作进行排序。

1. Clojure Paxos 核心实现

首先,我们来构建 Basic Paxos 的核心逻辑。我们将角色(Proposer, Acceptor, Learner)和消息都建模为 Clojure 的 defrecord,这提供了结构化数据和一定的类型清晰度。为了简化,我们先在一个进程内用 core.async 的 channel 模拟节点间的网络通信。

(ns paxos-jotai-server.core
  (:require [clojure.core.async :as async :refer [>! <! go-loop chan timeout]]))

;;; ----------------------------------------------------------------------------
;;; 数据结构定义 (Data Structures)
;;; ----------------------------------------------------------------------------

(defrecord Prepare [proposal-id])
(defrecord Promise [proposal-id accepted-proposal-id accepted-value])
(defrecord Accept [proposal-id value])
(defrecord Accepted [proposal-id])
(defrecord Nack [proposal-id]) ; Nack: Negative Acknowledgment

;;; ----------------------------------------------------------------------------
;;; Acceptor 节点逻辑 (Acceptor Node Logic)
;;; 这里的坑在于,Acceptor 必须持久化它承诺的最高 proposal-id 和已接受的值。
;;; 为简化,我们用一个 atom 来模拟持久化状态。
;;; ----------------------------------------------------------------------------

(defn create-acceptor
  "创建一个 Acceptor。它通过 input-ch 接收消息,并通过 output-ch 发送响应。"
  [id input-ch output-ch]
  (let [state (atom {:max-proposal-id 0
                     :accepted-proposal-id nil
                     :accepted-value nil})]
    (go-loop []
      (when-let [msg (<! input-ch)]
        (let [{:keys [max-proposal-id accepted-proposal-id accepted-value]} @state]
          (cond
            ;; --- Phase 1a: Prepare ---
            (instance? Prepare msg)
            (let [proposal-id (:proposal-id msg)]
              (if (> proposal-id max-proposal-id)
                (do
                  (swap! state assoc :max-proposal-id proposal-id)
                  (>! output-ch (->Promise proposal-id accepted-proposal-id accepted-value)))
                (do
                  ;; 拒绝旧的 prepare 请求
                  (>! output-ch (->Nack proposal-id)))))

            ;; --- Phase 2a: Accept ---
            (instance? Accept msg)
            (let [proposal-id (:proposal-id msg)
                  value (:value msg)]
              (if (>= proposal-id max-proposal-id)
                (do
                  (swap! state assoc
                         :max-proposal-id proposal-id
                         :accepted-proposal-id proposal-id
                         :accepted-value value)
                  (>! output-ch (->Accepted proposal-id)))
                (do
                  ;; 拒绝旧的 accept 请求
                  (>! output-ch (->Nack proposal-id))))))
        (recur)))))

;;; ----------------------------------------------------------------------------
;;; Proposer 节点逻辑 (Proposer Node Logic)
;;; Proposer 的复杂性在于处理各种响应、超时和重试。
;;; ----------------------------------------------------------------------------

(defn run-proposer
  "运行一个 Proposer 实例来提议一个值。"
  [initial-proposal-id value acceptor-channels learner-channel]
  (go
    (let [num-acceptors (count acceptor-channels)
          quorum (inc (quot num-acceptors 2))
          proposal-id (atom initial-proposal-id)]

      ;; Proposer 的主循环,会因被拒绝而重试
      (loop []
        (let [response-ch (chan num-acceptors)
              current-pid @proposal-id]

          ;; --- Phase 1a: 发送 Prepare 请求 ---
          (doseq [ch acceptor-channels]
            (>! ch (->Prepare current-pid)))

          ;; --- Phase 1b: 等待 Promises ---
          (let [promises (async/reduce
                           (fn [acc item] (conj acc item))
                           []
                           (async/take num-acceptors
                                       (async/merge (map (fn [ch]
                                                           (let [resp-ch (chan)]
                                                             (go (>! resp-ch (<! ch)))
                                                             resp-ch))
                                                         acceptor-channels)
                                                    num-acceptors)))]
            (println (format "Proposer [%d]: Received promises/nacks: %s" current-pid promises))

            (let [promise-count (count (filter #(instance? Promise %) promises))
                  nack-count (count (filter #(instance? Nack %) promises))]

              (cond
                ;; 多数派同意,进入 Phase 2
                (>= promise-count quorum)
                (let [highest-accepted-promise (->> promises
                                                    (filter #(instance? Promise %))
                                                    (filter :accepted-proposal-id)
                                                    (sort-by :accepted-proposal-id >)
                                                    first)
                      value-to-propose (if highest-accepted-promise
                                         (:accepted-value highest-accepted-promise)
                                         value)]
                  ;; --- Phase 2a: 发送 Accept 请求 ---
                  (doseq [ch acceptor-channels]
                    (>! ch (->Accept current-pid value-to-propose)))

                  ;; --- Phase 2b: 等待 Accepted ---
                  (let [accepts (async/reduce (fn [acc item] (conj acc item)) []
                                             (async/take num-acceptors
                                                         (async/merge (map (fn [ch]
                                                                             (let [resp-ch (chan)]
                                                                               (go (>! resp-ch (<! ch)))
                                                                               resp-ch))
                                                                           acceptor-channels)
                                                                      num-acceptors)))]
                    (println (format "Proposer [%d]: Received accepted/nacks: %s" current-pid accepts))

                    (if (>= (count (filter #(instance? Accepted %) accepts)) quorum)
                      ;; --- 成功!通知 Learner ---
                      (do
                        (println (format "SUCCESS: Value '%s' chosen with proposal ID %d" value-to-propose current-pid))
                        (>! learner-channel {:value value-to-propose}))
                      ;; Phase 2 失败,需要增加 proposal ID 重试
                      (do
                        (println "Phase 2 failed, retrying...")
                        (swap! proposal-id + 10) ; 增加 proposal ID 来避免冲突
                        (recur)))))

                ;; 多数派拒绝,增加 proposal ID 重试
                :else
                (do
                  (println "Phase 1 failed, retrying...")
                  (swap! proposal-id + 10)
                  (async/timeout 500) ; 等待一会再重试
                  (recur)))))))))))

这段代码模拟了 Proposer 和 Acceptor 的核心交互。在真实项目中,acceptor-channels 会是网络连接,并且需要健壮的超时和重传机制。

2. 状态机与 Learner

Learner 的角色相对简单:它接收被大多数 Acceptor 接受的值,并将其应用到状态机上。一旦值被“学习”到,它就成为日志中不可更改的一部分。

;;; ----------------------------------------------------------------------------
;;; Learner 与 State Machine
;;; ----------------------------------------------------------------------------

(def state-machine (atom {:value 0}))
(def consensus-log (atom []))

;; 定义一个简单的状态机转换函数
(defn apply-operation [current-state operation]
  (let [op-type (:op operation)
        op-payload (:payload operation)]
    (case op-type
      :increment (update current-state :value + op-payload)
      :decrement (update current-state :value - op-payload)
      :set (assoc current-state :value op-payload)
      current-state))) ; 默认返回原状态

(defn create-learner [learner-ch broadcast-fn]
  "Learner 监听 channel,一旦有值被确定,就更新状态机并广播"
  (go-loop []
    (when-let [learned-msg (<! learner-ch)]
      (let [operation (:value learned-msg)] ; Paxos 达成共识的是操作,而不是状态
        (println (format "Learner: Learned new operation: %s" operation))
        (let [new-state (swap! state-machine apply-operation operation)]
          (swap! consensus-log conj operation)
          ;; 广播最新的权威状态给所有客户端
          (broadcast-fn new-state))))
    (recur)))

这里的关键点是:Paxos 算法本身不关心它在为什么值达成共识。我们将客户端的操作(例如 {:op :increment, :payload 1})作为 Paxos 要达成共识的 value。Learner 负责按顺序执行这些操作。

3. WebSocket 桥接层

我们需要一个 WebSocket 服务器来连接前端和后端。这里使用 http-kit

(ns paxos-jotai-server.server
  (:require [org.httpkit.server :as http-kit]
            [cheshire.core :as json]
            [paxos-jotai-server.core :as paxos]))

(defonce channels (atom #{})) ; 存储所有 WebSocket 连接

(defn broadcast-state [new-state]
  (let [state-json (json/generate-string {:type "state-update" :payload new-state})]
    (doseq [ch @channels]
      (http-kit/send! ch state-json))))

(defn app [req]
  (http-kit/as-channel req
    {:on-open    (fn [ch]
                   (println "Client connected")
                   (swap! channels conj ch)
                   ;; 新客户端连接时,发送当前权威状态
                   (http-kit/send! ch (json/generate-string {:type "initial-state"
                                                             :payload @paxos/state-machine})))
     :on-close   (fn [ch status]
                   (println "Client disconnected, status:" status)
                   (swap! channels disj ch))
     :on-receive (fn [ch raw-msg]
                   (let [msg (json/parse-string raw-msg true)
                         operation (:payload msg)]
                     (println "Received operation from client:" operation)
                     ;; 当收到客户端操作时,启动一个 Proposer 实例
                     ;; 这里的 proposal-id 应该由一个中心化的序列生成器或基于时间戳+节点ID生成
                     ;; 为了演示,我们使用一个简单的原子计数器
                     (let [proposal-id (+ (System/currentTimeMillis) (rand-int 100))]
                       (paxos/run-proposer proposal-id operation @paxos-acceptor-channels paxos-learner-channel))))}))

(defn -main []
  ;; 初始化 Paxos 集群 (模拟)
  (let [num-acceptors 3
        acceptor-ins (repeatedly num-acceptors #(chan 10))
        acceptor-outs (repeatedly num-acceptors #(chan 10))
        learner-ch (chan 10)]

    ;; 保存 acceptor channels 以便 proposer 使用
    (alter-var-root #'paxos-acceptor-channels (constantly acceptor-ins))
    (alter-var-root #'paxos-learner-channel (constantly learner-ch))

    ;; 启动 Acceptors 和 Learner
    (dotimes [i num-acceptors]
      (paxos/create-acceptor (str "acceptor-" i) (nth acceptor-ins i) (nth acceptor-outs i)))
    (paxos/create-learner learner-ch broadcast-state)

    ;; 启动 WebSocket 服务器
    (http-kit/run-server app {:port 8080})
    (println "Server started on port 8080")))

这个 WebSocket 服务器将客户端的操作分发给 Paxos Proposer,并在 Learner 确认新状态后,将最终状态广播给所有连接的客户端。

sequenceDiagram
    participant ClientA as Client A (Jotai)
    participant ClientB as Client B (Jotai)
    participant WSServer as Clojure WebSocket Server
    participant Proposer as Paxos Proposer
    participant Acceptors as Paxos Acceptors (Quorum)
    participant Learner as Paxos Learner & State Machine

    ClientA ->> WSServer: Send Operation {:op: :inc, :val: 1}
    WSServer ->> Proposer: Initiate Proposal(op)
    Proposer ->> Acceptors: Phase 1: Prepare(pid=10)
    Acceptors -->> Proposer: Phase 1: Promise(pid=10)
    Proposer ->> Acceptors: Phase 2: Accept(pid=10, op)
    Acceptors -->> Proposer: Phase 2: Accepted(pid=10)
    Proposer ->> Learner: Value 'op' is chosen!
    Learner ->> Learner: Apply 'op' to State Machine
    Learner ->> WSServer: New State is ready
    WSServer -->> ClientA: Broadcast New State
    WSServer -->> ClientB: Broadcast New State
    ClientA ->> ClientA: Update Jotai Atom
    ClientB ->> ClientB: Update Jotai Atom

4. Jotai 前端实现

前端代码力求简洁,严格遵循单向数据流。

// store.ts
import { atom } from 'jotai';
import { wsend, useWebSocket } from './websocket'; // 假设的 WebSocket hook

// 这个原子持有从服务器接收到的权威状态
export const serverStateAtom = atom<{ value: number }>({ value: 0 });

// UI 组件只应该读取这个派生原子,确保数据来源唯一
export const valueAtom = atom((get) => get(serverStateAtom).value);

// 这是一个只写的原子,用于向服务器发送操作
export const operationAtom = atom(
  null,
  (_get, _set, operation: { op: string; payload: any }) => {
    // 这里的 wsend 是我们封装的 WebSocket 发送函数
    wsend({ type: 'operation', payload: operation });
  }
);

// App.tsx
import React, { useEffect } from 'react';
import { useAtom, useSetAtom } from 'jotai';
import { serverStateAtom, valueAtom, operationAtom } from './store';

// 假设的 WebSocket hook
const useWebSocketConnection = (onMessage: (data: any) => void) => {
  useEffect(() => {
    const ws = new WebSocket('ws://localhost:8080');
    ws.onmessage = (event) => {
      const data = JSON.parse(event.data);
      onMessage(data);
    };
    // 错误处理和重连逻辑应在此处实现
    return () => ws.close();
  }, [onMessage]);
};


export const App = () => {
  const setServerState = useSetAtom(serverStateAtom);
  const value = useAtom(valueAtom)[0];
  const sendOperation = useSetAtom(operationAtom);

  useWebSocketConnection((data) => {
    if (data.type === 'initial-state' || data.type === 'state-update') {
      // 关键点:前端从不自己计算状态,完全信任服务器推送的状态
      setServerState(data.payload);
    }
  });

  const handleIncrement = () => {
    sendOperation({ op: 'increment', payload: 1 });
  };

  const handleDecrement = () => {
    sendOperation({ op: 'decrement', payload: 1 });
  };

  return (
    <div>
      <h1>Distributed Counter</h1>
      <h2>Value: {value}</h2>
      <button onClick={handleIncrement}>Increment</button>
      <button onClick={handleDecrement}>Decrement</button>
    </div>
  );
};

在这个前端实现中,React 组件是“哑”的。它不知道 Paxos,也不知道状态机。它只知道从 valueAtom 读取值,并通过 operationAtom 发送意图。所有状态同步的复杂性都被封装在了后端和 serverStateAtom 的更新逻辑中。

局限性与未来迭代

这个实现成功地验证了核心架构:Jotai + WebSocket + Clojure + Paxos 可以构建一个强一致性的实时协作应用。然而,一个生产级的系统还需要解决许多问题:

  1. Multi-Paxos/Raft: Basic Paxos 只能就单个值达成共识,效率低下。实际应用需要 Multi-Paxos 或 Raft 这样的协议来高效地提交一个连续的操作日志。这通常涉及到领导者选举(Leader Election)来避免提议者冲突(dueling proposers)并提高吞吐量。

  2. 网络与持久化: 当前的实现是内存中的模拟。一个真实的系统需要将 Acceptor 的状态(max-proposal-id, accepted-value)持久化到磁盘,以防节点崩溃。节点间的通信需要替换为真正的网络协议(如 TCP 或 gRPC),并处理网络分区、消息丢失和重传。

  3. 客户端体验: 网络延迟会导致客户端操作后,状态更新有一段可见的延迟。可以引入乐观更新(Optimistic UI Updates),即在本地立即应用操作,如果后续收到来自服务器的冲突状态,再进行回滚。Jotai 的原子模型也使得实现这种逻辑相对容易。

  4. 可扩展性: 当前所有客户端都连接到单个 WebSocket 节点。为了水平扩展,需要一个路由层,或者让 Paxos 集群的每个节点都能处理客户端连接,但这会增加 Proposer 选择的复杂性。

尽管存在这些局限,这个架构的核心思想——将前端状态管理与后端的分布式共识直接关联——为构建一类新型的、需要强一致性保证的协作工具提供了一条清晰且强大的路径。它避开了 CRDT 的数据结构设计难题和 OT 的算法复杂性,回归到了一致性问题的本源。


  目录