业务需求很直接:将 Apache Pulsar 中高吞吐量的实时数据流,毫秒级延迟地推送到数千个并发的 Web 客户端。初版方案是一个单体 Java WebSocket 服务,它消费 Pulsar 的 topic,然后广播给所有连接的客户端。这个方案在几十个连接时工作良好,但随着负载增加,瓶颈和单点故障问题变得不可接受。当该实例崩溃时,所有客户端连接丢失,恢复期间的数据也全部丢失。我们需要一个能横向扩展且具备容错能力的新架构。
这里的核心挑战在于状态管理。在一个分布式的 WebSocket 网关集群中,一个客户端可能连接到实例A,而它订阅的 Pulsar topic 数据可能被实例B的 consumer 消费。如何将数据从实例B路由到实例A,再推送给正确的客户端?传统的解决方案,如使用 Redis 进行会话管理和消息路由,会引入额外的基础设施依赖和网络延迟,增加了系统的复杂性和维护成本。
我们的目标是设计一个无状态的网关层,它能够水平扩展,并且尽可能地减少外部依赖。Pulsar 自身强大的订阅模型似乎为我们提供了一条出路。
架构决策:利用Pulsar的独占订阅模型管理会话
我们决定放弃在网关实例之间共享状态或进行消息路由的复杂模式。取而代之的,是将每一个 WebSocket 连接映射为 Pulsar 中的一个独立的、临时的 Exclusive
订阅。
架构逻辑如下:
- 客户端通过 WebSocket 连接到负载均衡器,并被路由到任意一个健康的 Micronaut 网关实例。
- 在 WebSocket 连接建立时,客户端发送一条初始消息,声明其希望订阅的 Pulsar topic(例如
persistent://public/default/stock-ticks
)。 - 网关实例收到订阅请求后,为这个 WebSocket 会话创建一个全新的 Pulsar Consumer。关键在于,这个 Consumer 使用
Exclusive
订阅类型,并生成一个与该 WebSocket 会话绑定的唯一订阅名。 - 该 Consumer 的生命周期与 WebSocket 会话完全绑定。当 WebSocket 连接断开时(
@OnClose
),对应的 Pulsar Consumer 必须被立即关闭和清理。 - 由于每个会话都有自己独立的 Consumer 和
Exclusive
订阅,因此 Pulsar broker 会将指定 topic 的消息副本分发给每一个订阅者。网关实例本身不持有任何路由信息,只负责在 WebSocket 会话和其专属的 Pulsar Consumer 之间建立一个管道。
graph TD subgraph "用户浏览器" A[Solid.js UI] -->|WebSocket Conn| B(Load Balancer) end B --> C1(Micronaut GW 1) B --> C2(Micronaut GW 2) B --> C3(Micronaut GW N) subgraph "Pulsar集群" P(Broker) -- Topic: stock-ticks --> S1(Sub-ws-session-1) P -- Topic: stock-ticks --> S2(Sub-ws-session-2) P -- Topic: stock-ticks --> S3(Sub-ws-session-N) end C1 -- Creates/Manages --> S1 C2 -- Creates/Manages --> S2 C3 -- Creates/Manages --> S3 S1 -- Pushes Msg --> C1 -- Pushes Msg --> A S2 -- Pushes Msg --> C2 -- Pushes Msg --> A S3 -- Pushes Msg --> C3 -- Pushes Msg --> A style C1 fill:#f9f,stroke:#333,stroke-width:2px style C2 fill:#f9f,stroke:#333,stroke-width:2px style C3 fill:#f9f,stroke:#333,stroke-width:2px
这个设计的优势显而易见:
- 无状态网关: 网关实例是完全无状态的,可以任意增删,极易于水平扩展。
- 无外部依赖: 不需要 Redis 或其他外部组件来管理会话状态。
- 故障隔离: 一个网关实例的故障只会影响连接到该实例的客户端,并且由于 Pulsar 的游标机制,客户端重新连接到新实例后,可以从上次中断的地方继续消费,不会丢失消息。
当然,这种方法的代价是可能会在 Pulsar 中创建大量的 Consumer。但 Pulsar 的设计就是为了支持海量 topic 和 consumer,只要 broker 资源充足,这在实践中是完全可行的。
Micronaut后端实现
我们选择 Micronaut 是因为它基于AOT编译,提供了极低的内存占用和飞快的启动速度,非常适合作为云原生环境下的微服务。
首先是 build.gradle.kts
的依赖配置:
// build.gradle.kts
dependencies {
implementation("io.micronaut:micronaut-http-client")
implementation("io.micronaut:micronaut-http-server-netty")
implementation("io.micronaut:micronaut-jackson-databind")
implementation("io.micronaut.java:micronaut-java-runtime")
implementation("io.micronaut.kotlin:micronaut-kotlin-runtime")
implementation("io.micronaut.reactor:micronaut-reactor")
implementation("io.micronaut.websocket:micronaut-websocket")
implementation("jakarta.annotation:jakarta.annotation-api")
implementation("org.apache.pulsar:pulsar-client:2.11.1") // Pulsar Client
runtimeOnly("ch.qos.logback:logback-classic")
}
接下来是核心的 WebSocket Server 实现。我们用一个 ConcurrentHashMap
来管理每个 WebSocket Session 对应的 Pulsar Consumer。
// src/main/java/com/example/gateway/PulsarWebSocketServer.java
import io.micronaut.http.MediaType;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;
import jakarta.inject.Inject;
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ServerWebSocket("/ws/pulsar/{tenant}/{namespace}/{topic}")
public class PulsarWebSocketServer {
private static final Logger LOG = LoggerFactory.getLogger(PulsarWebSocketServer.class);
private static final Pattern TOPIC_PATTERN = Pattern.compile("^subscribe:(.+)$");
private final PulsarClient pulsarClient;
private final ConcurrentHashMap<String, Consumer<byte[]>> consumers = new ConcurrentHashMap<>();
// 使用一个专用的线程池来处理Pulsar消息接收,避免阻塞Netty的IO线程
private final ExecutorService pulsarListenerExecutor = Executors.newCachedThreadPool();
@Inject
public PulsarWebSocketServer(PulsarClient pulsarClient) {
this.pulsarClient = pulsarClient;
}
@OnOpen
public void onOpen(String tenant, String namespace, String topic, WebSocketSession session) {
LOG.info("WebSocket connection opened: [{}]. Waiting for subscription message.", session.getId());
// 初始连接时不立即创建Consumer,而是等待客户端发送明确的订阅指令
// 这增加了灵活性,比如可以在一个WS连接上切换topic
}
@OnClose
public void onClose(String tenant, String namespace, String topic, WebSocketSession session) {
LOG.info("WebSocket connection closed: [{}].", session.getId());
closeAndRemoveConsumer(session.getId());
}
@OnMessage
public void onMessage(String message, String tenant, String namespace, String topic, WebSocketSession session) {
Matcher matcher = TOPIC_PATTERN.matcher(message);
if (matcher.matches()) {
String requestedTopic = matcher.group(1);
String fullTopicName = String.format("persistent://%s/%s/%s", tenant, namespace, requestedTopic);
LOG.info("Session [{}] requested subscription to topic: {}", session.getId(), fullTopicName);
// 如果会话已经有Consumer,先关闭旧的
closeAndRemoveConsumer(session.getId());
try {
// 为每个会话创建独立的Consumer
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(fullTopicName)
// 使用UUID确保订阅名唯一,类型为独占
.subscriptionName("ws-gateway-sub-" + session.getId() + "-" + UUID.randomUUID())
.subscriptionType(SubscriptionType.Exclusive)
// 从最早的位置开始消费,确保不会错过消息。对于某些场景,可能需要Latest
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener((c, msg) -> {
try {
// 在这里将Pulsar Message推送给WebSocket Client
String payload = new String(msg.getData(), StandardCharsets.UTF_8);
// broadcast方法是线程安全的
session.getAsyncRemote().sendText(payload, result -> {
if(result.isSuccess()) {
// 消息成功发送后才ack
c.acknowledgeAsync(msg);
} else {
LOG.error("Failed to send message to WebSocket client [{}]. Cause: {}", session.getId(), result.getCause().getMessage());
// 发送失败,不ack,Pulsar会自动重传
}
});
} catch (Exception e) {
LOG.error("Error processing message for session [{}].", session.getId(), e);
// 发生异常,不ack,让Pulsar重传
c.negativeAcknowledge(msg);
}
})
// 将监听器操作提交到我们自己的线程池
.listenerThreads(1) // pulsar client内部线程数
.subscribe();
consumers.put(session.getId(), consumer);
LOG.info("Successfully created Pulsar consumer for session [{}] on topic [{}].", session.getId(), fullTopicName);
session.sendSync("subscribed:" + fullTopicName);
} catch (PulsarClientException e) {
LOG.error("Failed to create Pulsar consumer for session [{}].", session.getId(), e);
session.sendSync("error:Failed to subscribe. Reason: " + e.getMessage());
session.close();
}
} else {
LOG.warn("Received invalid message from session [{}]: {}", session.getId(), message);
session.sendSync("error:Invalid command. Use 'subscribe:<topic-name>'.");
}
}
private void closeAndRemoveConsumer(String sessionId) {
Consumer<byte[]> consumer = consumers.remove(sessionId);
if (consumer != null && consumer.isConnected()) {
try {
// 使用异步关闭,避免阻塞OnClose回调
consumer.closeAsync().thenAccept(v ->
LOG.info("Pulsar consumer for session [{}] closed successfully.", sessionId)
).exceptionally(ex -> {
LOG.warn("Failed to close Pulsar consumer for session [{}].", sessionId, ex);
return null;
});
} catch (Exception e) {
LOG.warn("Exception while closing consumer for session [{}].", sessionId, e);
}
}
}
}
Pulsar Client的配置通过 application.yml
和一个 Factory Bean 来注入。
# src/main/resources/application.yml
micronaut:
application:
name: pulsar-websocket-gateway
server:
port: 8080
pulsar:
service-url: "pulsar://localhost:6650"
# 在生产环境中,需要配置TLS和认证
# tls-trust-certs-file-path: "/path/to/ca.cert.pem"
# authentication:
# plugin-class-name: "org.apache.pulsar.client.impl.auth.AuthenticationToken"
# params: "token:eyJ..."
// src/main/java/com/example/gateway/config/PulsarClientFactory.java
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Value;
import jakarta.inject.Singleton;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Factory
public class PulsarClientFactory {
private static final Logger LOG = LoggerFactory.getLogger(PulsarClientFactory.class);
@Value("${pulsar.service-url}")
private String serviceUrl;
// 可选的认证配置,生产环境必备
// @Value("${pulsar.authentication.plugin-class-name:}")
// private String authPluginClassName;
// @Value("${pulsar.authentication.params:}")
// private String authParams;
@Singleton
public PulsarClient pulsarClient() throws PulsarClientException {
LOG.info("Initializing Pulsar client with service URL: {}", serviceUrl);
return PulsarClient.builder()
.serviceUrl(serviceUrl)
// .authentication(authPluginClassName, authParams) // 启用认证
.build();
}
}
这段代码有几个关键的生产级考量:
- 线程模型: Pulsar的
messageListener
是在Pulsar客户端的内部线程池中执行的。为了不阻塞这些线程,并且将消息安全地推送到Netty管理的WebSocket Session,我们使用了session.getAsyncRemote().sendText()
。 - 优雅关闭: 在
@OnClose
事件中,我们必须确保关联的Pulsar Consumer被正确关闭。这会释放Pulsar broker上的资源,并删除该独占订阅。我们使用closeAsync
来避免阻塞Netty的IO线程。 - 错误处理与ACK: 只有当消息成功推送到WebSocket客户端后,我们才对Pulsar消息进行
acknowledgeAsync
。如果推送失败(例如,客户端网络缓慢,缓冲区满),我们不ACK,Pulsar会在配置的超时后自动重传该消息,保证了至少一次的交付语义。对于处理逻辑本身的异常,使用negativeAcknowledge
让消息能更快被重传。 - 动态订阅: 客户端可以通过发送
subscribe:<new-topic>
消息来切换其订阅的topic,增强了灵活性。
Solid.js 前端实现
前端我们选择Solid.js,因为它惊人的性能和细粒度的响应式系统。在处理高频数据流时,它的无虚拟DOM架构可以避免不必要的重渲染开销,只更新真正改变的DOM节点。
这是一个简单的前端页面,用于连接WebSocket并实时显示接收到的数据。
// src/App.jsx
import { createSignal, onCleanup, createEffect } from "solid-js";
import { createStore } from "solid-js/store";
function App() {
const WS_URL = "ws://localhost:8080/ws/pulsar/public/default/ticks";
const [connectionStatus, setConnectionStatus] = createSignal("Disconnected");
const [messages, setMessages] = createStore([]);
let socket;
const connect = () => {
if (socket && socket.readyState === WebSocket.OPEN) {
console.log("Already connected.");
return;
}
setConnectionStatus("Connecting...");
socket = new WebSocket(WS_URL);
socket.onopen = () => {
setConnectionStatus("Connected");
console.log("WebSocket connection established.");
// 连接成功后,发送订阅指令
socket.send("subscribe:stock-ticks");
};
socket.onmessage = (event) => {
// 假设收到的数据是JSON字符串
try {
const data = JSON.parse(event.data);
// 使用非响应式更新来优化性能,批量添加
setMessages(prev => [data, ...prev.slice(0, 99)]); // 只保留最近100条
} catch (e) {
// 处理非JSON消息,比如服务端的控制消息
if (event.data.startsWith("subscribed:")) {
console.log("Subscription confirmed by server:", event.data);
} else {
console.warn("Received non-JSON message:", event.data);
}
}
};
socket.onclose = (event) => {
setConnectionStatus(`Disconnected (Code: ${event.code})`);
console.log("WebSocket connection closed. Attempting to reconnect in 3 seconds...");
// 实现简单的自动重连逻辑
setTimeout(connect, 3000);
};
socket.onerror = (error) => {
setConnectionStatus("Error");
console.error("WebSocket Error:", error);
// onerror会紧接着触发onclose,所以重连逻辑放在onclose里
};
};
// 组件加载时自动连接
createEffect(() => connect());
// 组件卸载时清理连接
onCleanup(() => {
if (socket) {
socket.onclose = null; // 阻止重连逻辑
socket.close();
}
});
return (
<div style={{ padding: "20px", "font-family": "monospace" }}>
<h1>Pulsar Real-time Data Stream</h1>
<p>Connection Status: <b>{connectionStatus()}</b></p>
<div style={{
"border": "1px solid #ccc",
"height": "500px",
"overflow-y": "scroll",
"background-color": "#f5f5f5",
"padding": "10px"
}}>
<For each={messages}>
{(msg, i) =>
<pre style={{ margin: "0", "border-bottom": "1px solid #eee" }}>
{JSON.stringify(msg)}
</pre>
}
</For>
</div>
</div>
);
}
export default App;
Solid.js的代码亮点:
-
createStore
: 用于管理一个对象或数组的集合,当新消息到来时,只有数组本身被修改,For
组件能够高效地只添加新的DOM节点,而不是重新渲染整个列表。 -
createEffect
/onCleanup
: 确保WebSocket连接的生命周期与组件的生命周期绑定,实现了自动连接和在组件销毁时安全断开。 - 自动重连:
onclose
事件中包含了简单的setTimeout
重连逻辑,这是生产环境中保证可用性的基本要求。当后端某个网关实例重启或部署时,客户端可以无缝地重新连接到集群中的另一个实例。
局限性与未来展望
这个架构虽然解决了横向扩展和容错的核心问题,但并非没有权衡。
最主要的考虑是Pulsar的Exclusive
订阅模型对Broker造成的压力。每个WebSocket连接都会创建一个独立的Consumer和订阅。对于数万甚至数十万的连接,这将对Pulsar的元数据管理和消息分发调度构成挑战。在实践中,需要对Pulsar集群进行充分的压力测试和容量规划。
其次,当前的实现没有处理客户端反压(backpressure)。如果Pulsar的数据生产速率远超客户端的处理能力或网络带宽,网关实例的内存可能会因为缓冲待发送的消息而膨胀。一个可行的优化路径是在WebSocket协议之上实现一套简单的流量控制机制,例如,当客户端处理不过来时,发送一个pause
指令,网关收到后可以调用Pulsar Consumer的pause()
方法暂停接收消息。
最后,安全是生产环境中必须解决的问题。当前的实现是匿名的。一个完整的系统需要集成认证和授权机制,例如通过WebSocket的子协议或初始连接时的token,来验证用户身份,并据此在后端判断该用户是否有权限订阅请求的Pulsar topic。这可以与Micronaut Security等框架结合实现。