Micronaut 与 Pulsar 构建横向扩展的 WebSocket 推送网关及 Solid.js 响应式前端实现


业务需求很直接:将 Apache Pulsar 中高吞吐量的实时数据流,毫秒级延迟地推送到数千个并发的 Web 客户端。初版方案是一个单体 Java WebSocket 服务,它消费 Pulsar 的 topic,然后广播给所有连接的客户端。这个方案在几十个连接时工作良好,但随着负载增加,瓶颈和单点故障问题变得不可接受。当该实例崩溃时,所有客户端连接丢失,恢复期间的数据也全部丢失。我们需要一个能横向扩展且具备容错能力的新架构。

这里的核心挑战在于状态管理。在一个分布式的 WebSocket 网关集群中,一个客户端可能连接到实例A,而它订阅的 Pulsar topic 数据可能被实例B的 consumer 消费。如何将数据从实例B路由到实例A,再推送给正确的客户端?传统的解决方案,如使用 Redis 进行会话管理和消息路由,会引入额外的基础设施依赖和网络延迟,增加了系统的复杂性和维护成本。

我们的目标是设计一个无状态的网关层,它能够水平扩展,并且尽可能地减少外部依赖。Pulsar 自身强大的订阅模型似乎为我们提供了一条出路。

架构决策:利用Pulsar的独占订阅模型管理会话

我们决定放弃在网关实例之间共享状态或进行消息路由的复杂模式。取而代之的,是将每一个 WebSocket 连接映射为 Pulsar 中的一个独立的、临时的 Exclusive 订阅。

架构逻辑如下:

  1. 客户端通过 WebSocket 连接到负载均衡器,并被路由到任意一个健康的 Micronaut 网关实例。
  2. 在 WebSocket 连接建立时,客户端发送一条初始消息,声明其希望订阅的 Pulsar topic(例如 persistent://public/default/stock-ticks)。
  3. 网关实例收到订阅请求后,为这个 WebSocket 会话创建一个全新的 Pulsar Consumer。关键在于,这个 Consumer 使用 Exclusive 订阅类型,并生成一个与该 WebSocket 会话绑定的唯一订阅名。
  4. 该 Consumer 的生命周期与 WebSocket 会话完全绑定。当 WebSocket 连接断开时(@OnClose),对应的 Pulsar Consumer 必须被立即关闭和清理。
  5. 由于每个会话都有自己独立的 Consumer 和 Exclusive 订阅,因此 Pulsar broker 会将指定 topic 的消息副本分发给每一个订阅者。网关实例本身不持有任何路由信息,只负责在 WebSocket 会话和其专属的 Pulsar Consumer 之间建立一个管道。
graph TD
    subgraph "用户浏览器"
        A[Solid.js UI] -->|WebSocket Conn| B(Load Balancer)
    end

    B --> C1(Micronaut GW 1)
    B --> C2(Micronaut GW 2)
    B --> C3(Micronaut GW N)

    subgraph "Pulsar集群"
        P(Broker) -- Topic: stock-ticks --> S1(Sub-ws-session-1)
        P -- Topic: stock-ticks --> S2(Sub-ws-session-2)
        P -- Topic: stock-ticks --> S3(Sub-ws-session-N)
    end

    C1 -- Creates/Manages --> S1
    C2 -- Creates/Manages --> S2
    C3 -- Creates/Manages --> S3

    S1 -- Pushes Msg --> C1 -- Pushes Msg --> A
    S2 -- Pushes Msg --> C2 -- Pushes Msg --> A
    S3 -- Pushes Msg --> C3 -- Pushes Msg --> A

    style C1 fill:#f9f,stroke:#333,stroke-width:2px
    style C2 fill:#f9f,stroke:#333,stroke-width:2px
    style C3 fill:#f9f,stroke:#333,stroke-width:2px

这个设计的优势显而易见:

  • 无状态网关: 网关实例是完全无状态的,可以任意增删,极易于水平扩展。
  • 无外部依赖: 不需要 Redis 或其他外部组件来管理会话状态。
  • 故障隔离: 一个网关实例的故障只会影响连接到该实例的客户端,并且由于 Pulsar 的游标机制,客户端重新连接到新实例后,可以从上次中断的地方继续消费,不会丢失消息。

当然,这种方法的代价是可能会在 Pulsar 中创建大量的 Consumer。但 Pulsar 的设计就是为了支持海量 topic 和 consumer,只要 broker 资源充足,这在实践中是完全可行的。

Micronaut后端实现

我们选择 Micronaut 是因为它基于AOT编译,提供了极低的内存占用和飞快的启动速度,非常适合作为云原生环境下的微服务。

首先是 build.gradle.kts 的依赖配置:

// build.gradle.kts
dependencies {
    implementation("io.micronaut:micronaut-http-client")
    implementation("io.micronaut:micronaut-http-server-netty")
    implementation("io.micronaut:micronaut-jackson-databind")
    implementation("io.micronaut.java:micronaut-java-runtime")
    implementation("io.micronaut.kotlin:micronaut-kotlin-runtime")
    implementation("io.micronaut.reactor:micronaut-reactor")
    implementation("io.micronaut.websocket:micronaut-websocket")
    implementation("jakarta.annotation:jakarta.annotation-api")
    implementation("org.apache.pulsar:pulsar-client:2.11.1") // Pulsar Client
    runtimeOnly("ch.qos.logback:logback-classic")
}

接下来是核心的 WebSocket Server 实现。我们用一个 ConcurrentHashMap 来管理每个 WebSocket Session 对应的 Pulsar Consumer。

// src/main/java/com/example/gateway/PulsarWebSocketServer.java

import io.micronaut.http.MediaType;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;
import jakarta.inject.Inject;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

@ServerWebSocket("/ws/pulsar/{tenant}/{namespace}/{topic}")
public class PulsarWebSocketServer {

    private static final Logger LOG = LoggerFactory.getLogger(PulsarWebSocketServer.class);
    private static final Pattern TOPIC_PATTERN = Pattern.compile("^subscribe:(.+)$");

    private final PulsarClient pulsarClient;
    private final ConcurrentHashMap<String, Consumer<byte[]>> consumers = new ConcurrentHashMap<>();
    
    // 使用一个专用的线程池来处理Pulsar消息接收,避免阻塞Netty的IO线程
    private final ExecutorService pulsarListenerExecutor = Executors.newCachedThreadPool();

    @Inject
    public PulsarWebSocketServer(PulsarClient pulsarClient) {
        this.pulsarClient = pulsarClient;
    }

    @OnOpen
    public void onOpen(String tenant, String namespace, String topic, WebSocketSession session) {
        LOG.info("WebSocket connection opened: [{}]. Waiting for subscription message.", session.getId());
        // 初始连接时不立即创建Consumer,而是等待客户端发送明确的订阅指令
        // 这增加了灵活性,比如可以在一个WS连接上切换topic
    }

    @OnClose
    public void onClose(String tenant, String namespace, String topic, WebSocketSession session) {
        LOG.info("WebSocket connection closed: [{}].", session.getId());
        closeAndRemoveConsumer(session.getId());
    }

    @OnMessage
    public void onMessage(String message, String tenant, String namespace, String topic, WebSocketSession session) {
        Matcher matcher = TOPIC_PATTERN.matcher(message);
        if (matcher.matches()) {
            String requestedTopic = matcher.group(1);
            String fullTopicName = String.format("persistent://%s/%s/%s", tenant, namespace, requestedTopic);
            LOG.info("Session [{}] requested subscription to topic: {}", session.getId(), fullTopicName);

            // 如果会话已经有Consumer,先关闭旧的
            closeAndRemoveConsumer(session.getId());

            try {
                // 为每个会话创建独立的Consumer
                Consumer<byte[]> consumer = pulsarClient.newConsumer()
                    .topic(fullTopicName)
                    // 使用UUID确保订阅名唯一,类型为独占
                    .subscriptionName("ws-gateway-sub-" + session.getId() + "-" + UUID.randomUUID())
                    .subscriptionType(SubscriptionType.Exclusive)
                    // 从最早的位置开始消费,确保不会错过消息。对于某些场景,可能需要Latest
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                    .messageListener((c, msg) -> {
                        try {
                            // 在这里将Pulsar Message推送给WebSocket Client
                            String payload = new String(msg.getData(), StandardCharsets.UTF_8);
                            // broadcast方法是线程安全的
                            session.getAsyncRemote().sendText(payload, result -> {
                                if(result.isSuccess()) {
                                    // 消息成功发送后才ack
                                    c.acknowledgeAsync(msg);
                                } else {
                                    LOG.error("Failed to send message to WebSocket client [{}]. Cause: {}", session.getId(), result.getCause().getMessage());
                                    // 发送失败,不ack,Pulsar会自动重传
                                }
                            });
                        } catch (Exception e) {
                            LOG.error("Error processing message for session [{}].", session.getId(), e);
                            // 发生异常,不ack,让Pulsar重传
                            c.negativeAcknowledge(msg);
                        }
                    })
                    // 将监听器操作提交到我们自己的线程池
                    .listenerThreads(1) // pulsar client内部线程数
                    .subscribe();

                consumers.put(session.getId(), consumer);
                LOG.info("Successfully created Pulsar consumer for session [{}] on topic [{}].", session.getId(), fullTopicName);
                session.sendSync("subscribed:" + fullTopicName);

            } catch (PulsarClientException e) {
                LOG.error("Failed to create Pulsar consumer for session [{}].", session.getId(), e);
                session.sendSync("error:Failed to subscribe. Reason: " + e.getMessage());
                session.close();
            }
        } else {
            LOG.warn("Received invalid message from session [{}]: {}", session.getId(), message);
            session.sendSync("error:Invalid command. Use 'subscribe:<topic-name>'.");
        }
    }

    private void closeAndRemoveConsumer(String sessionId) {
        Consumer<byte[]> consumer = consumers.remove(sessionId);
        if (consumer != null && consumer.isConnected()) {
            try {
                // 使用异步关闭,避免阻塞OnClose回调
                consumer.closeAsync().thenAccept(v -> 
                    LOG.info("Pulsar consumer for session [{}] closed successfully.", sessionId)
                ).exceptionally(ex -> {
                    LOG.warn("Failed to close Pulsar consumer for session [{}].", sessionId, ex);
                    return null;
                });
            } catch (Exception e) {
                LOG.warn("Exception while closing consumer for session [{}].", sessionId, e);
            }
        }
    }
}

Pulsar Client的配置通过 application.yml 和一个 Factory Bean 来注入。

# src/main/resources/application.yml
micronaut:
  application:
    name: pulsar-websocket-gateway
  server:
    port: 8080
pulsar:
  service-url: "pulsar://localhost:6650"
  # 在生产环境中,需要配置TLS和认证
  # tls-trust-certs-file-path: "/path/to/ca.cert.pem"
  # authentication:
  #   plugin-class-name: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
  #   params: "token:eyJ..."
// src/main/java/com/example/gateway/config/PulsarClientFactory.java

import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Factory
public class PulsarClientFactory {

    private static final Logger LOG = LoggerFactory.getLogger(PulsarClientFactory.class);

    @Value("${pulsar.service-url}")
    private String serviceUrl;
    
    // 可选的认证配置,生产环境必备
    // @Value("${pulsar.authentication.plugin-class-name:}")
    // private String authPluginClassName;
    // @Value("${pulsar.authentication.params:}")
    // private String authParams;


    @Singleton
    public PulsarClient pulsarClient() throws PulsarClientException {
        LOG.info("Initializing Pulsar client with service URL: {}", serviceUrl);
        return PulsarClient.builder()
                .serviceUrl(serviceUrl)
                // .authentication(authPluginClassName, authParams) // 启用认证
                .build();
    }
}

这段代码有几个关键的生产级考量:

  1. 线程模型: Pulsar的 messageListener 是在Pulsar客户端的内部线程池中执行的。为了不阻塞这些线程,并且将消息安全地推送到Netty管理的WebSocket Session,我们使用了 session.getAsyncRemote().sendText()
  2. 优雅关闭: 在 @OnClose 事件中,我们必须确保关联的Pulsar Consumer被正确关闭。这会释放Pulsar broker上的资源,并删除该独占订阅。我们使用 closeAsync 来避免阻塞Netty的IO线程。
  3. 错误处理与ACK: 只有当消息成功推送到WebSocket客户端后,我们才对Pulsar消息进行 acknowledgeAsync。如果推送失败(例如,客户端网络缓慢,缓冲区满),我们不ACK,Pulsar会在配置的超时后自动重传该消息,保证了至少一次的交付语义。对于处理逻辑本身的异常,使用 negativeAcknowledge 让消息能更快被重传。
  4. 动态订阅: 客户端可以通过发送 subscribe:<new-topic> 消息来切换其订阅的topic,增强了灵活性。

Solid.js 前端实现

前端我们选择Solid.js,因为它惊人的性能和细粒度的响应式系统。在处理高频数据流时,它的无虚拟DOM架构可以避免不必要的重渲染开销,只更新真正改变的DOM节点。

这是一个简单的前端页面,用于连接WebSocket并实时显示接收到的数据。

// src/App.jsx
import { createSignal, onCleanup, createEffect } from "solid-js";
import { createStore } from "solid-js/store";

function App() {
  const WS_URL = "ws://localhost:8080/ws/pulsar/public/default/ticks";
  const [connectionStatus, setConnectionStatus] = createSignal("Disconnected");
  const [messages, setMessages] = createStore([]);
  let socket;

  const connect = () => {
    if (socket && socket.readyState === WebSocket.OPEN) {
      console.log("Already connected.");
      return;
    }

    setConnectionStatus("Connecting...");
    socket = new WebSocket(WS_URL);

    socket.onopen = () => {
      setConnectionStatus("Connected");
      console.log("WebSocket connection established.");
      // 连接成功后,发送订阅指令
      socket.send("subscribe:stock-ticks"); 
    };

    socket.onmessage = (event) => {
      // 假设收到的数据是JSON字符串
      try {
        const data = JSON.parse(event.data);
        // 使用非响应式更新来优化性能,批量添加
        setMessages(prev => [data, ...prev.slice(0, 99)]); // 只保留最近100条
      } catch (e) {
        // 处理非JSON消息,比如服务端的控制消息
        if (event.data.startsWith("subscribed:")) {
            console.log("Subscription confirmed by server:", event.data);
        } else {
            console.warn("Received non-JSON message:", event.data);
        }
      }
    };

    socket.onclose = (event) => {
      setConnectionStatus(`Disconnected (Code: ${event.code})`);
      console.log("WebSocket connection closed. Attempting to reconnect in 3 seconds...");
      // 实现简单的自动重连逻辑
      setTimeout(connect, 3000);
    };

    socket.onerror = (error) => {
      setConnectionStatus("Error");
      console.error("WebSocket Error:", error);
      // onerror会紧接着触发onclose,所以重连逻辑放在onclose里
    };
  };

  // 组件加载时自动连接
  createEffect(() => connect());

  // 组件卸载时清理连接
  onCleanup(() => {
    if (socket) {
      socket.onclose = null; // 阻止重连逻辑
      socket.close();
    }
  });

  return (
    <div style={{ padding: "20px", "font-family": "monospace" }}>
      <h1>Pulsar Real-time Data Stream</h1>
      <p>Connection Status: <b>{connectionStatus()}</b></p>
      <div style={{ 
        "border": "1px solid #ccc", 
        "height": "500px", 
        "overflow-y": "scroll",
        "background-color": "#f5f5f5",
        "padding": "10px"
      }}>
        <For each={messages}>
          {(msg, i) => 
            <pre style={{ margin: "0", "border-bottom": "1px solid #eee" }}>
              {JSON.stringify(msg)}
            </pre>
          }
        </For>
      </div>
    </div>
  );
}

export default App;

Solid.js的代码亮点:

  • createStore: 用于管理一个对象或数组的集合,当新消息到来时,只有数组本身被修改,For组件能够高效地只添加新的DOM节点,而不是重新渲染整个列表。
  • createEffect / onCleanup: 确保WebSocket连接的生命周期与组件的生命周期绑定,实现了自动连接和在组件销毁时安全断开。
  • 自动重连: onclose事件中包含了简单的setTimeout重连逻辑,这是生产环境中保证可用性的基本要求。当后端某个网关实例重启或部署时,客户端可以无缝地重新连接到集群中的另一个实例。

局限性与未来展望

这个架构虽然解决了横向扩展和容错的核心问题,但并非没有权衡。
最主要的考虑是Pulsar的Exclusive订阅模型对Broker造成的压力。每个WebSocket连接都会创建一个独立的Consumer和订阅。对于数万甚至数十万的连接,这将对Pulsar的元数据管理和消息分发调度构成挑战。在实践中,需要对Pulsar集群进行充分的压力测试和容量规划。

其次,当前的实现没有处理客户端反压(backpressure)。如果Pulsar的数据生产速率远超客户端的处理能力或网络带宽,网关实例的内存可能会因为缓冲待发送的消息而膨胀。一个可行的优化路径是在WebSocket协议之上实现一套简单的流量控制机制,例如,当客户端处理不过来时,发送一个pause指令,网关收到后可以调用Pulsar Consumer的pause()方法暂停接收消息。

最后,安全是生产环境中必须解决的问题。当前的实现是匿名的。一个完整的系统需要集成认证和授权机制,例如通过WebSocket的子协议或初始连接时的token,来验证用户身份,并据此在后端判断该用户是否有权限订阅请求的Pulsar topic。这可以与Micronaut Security等框架结合实现。


  目录