构建基于Pulsar与容器化Playwright的弹性分布式浏览器任务执行器


在处理独立的、小批量的浏览器自动化任务时,一个简单的 Playwright 脚本足矣。但当需求演变为一个需要为多个业务方提供中心化、高可用的浏览器自动化服务时,挑战便会陡然升级。最初的尝试,无非是将 Playwright 脚本封装成一个简单的 Flask 或 FastAPI 应用,直接同步处理请求。这种架构的脆弱性在真实负载下暴露无遗:前端请求被长时间阻塞、浏览器进程资源竞争导致相互干扰、单点故障、无法水平扩展。

痛点很明确:我们需要一个解耦的、可扩展的、具备容错能力的架构。核心构想是转向事件驱动模型,将任务的提交与执行彻底分离。任务提交方(Producer)只需将一个定义好的任务描述(例如:目标URL、要执行的操作、参数)作为一个事件发送到消息队列中。执行方(Worker)则是一组无状态的、可任意伸缩的消费者,它们从队列中获取任务、执行、并将结果写回。

技术选型上,消息队列是核心。我们评估了 Kafka 和 RabbitMQ,但最终选择了 Apache Pulsar。原因有几点:首先,Pulsar 内置的 Dead Letter Queue (DLQ) 和 Retry Letter Topic 机制为我们处理任务失败与重试提供了开箱即用的支持,无需在应用层实现复杂的重试逻辑。其次,Pulsar 的延迟消息投递功能,让我们能以极低的成本实现定时任务。最后,其分层存储特性(BookKeeper + Tiered Storage)对于可能产生大量结果数据(如截图、PDF)的场景,长期来看更具成本优势。

执行环境则毫无悬念地选择了容器化。每个 Playwright worker 都在一个独立的 Docker 容器中运行。这不仅解决了依赖隔离问题,还为我们利用 Kubernetes 等编排工具实现自动伸缩和资源管理铺平了道路。

架构概览与消息流设计

整个系统的核心由四个部分组成:任务生产者、Pulsar 服务、容器化的 Playwright Worker 池,以及结果消费者。

graph TD
    subgraph "任务提交端 (Producers)"
        A[API Service] --> B{Task Payload};
        C[Scheduled Job] --> B;
    end

    subgraph "Apache Pulsar"
        P_TASK[persistent://public/default/browser-tasks];
        P_RETRY[persistent://public/default/browser-tasks-retry];
        P_DLQ[persistent://public/default/browser-tasks-dlq];
        P_RESULT[persistent://public/default/browser-results];
    end

    subgraph "容器化 Worker 池 (Consumers)"
        W1[Container: Worker 1];
        W2[Container: Worker 2];
        W3[Container: Worker N];
    end
    
    subgraph "结果处理端"
        RA[Result Aggregator / DB Writer];
    end

    B -- "Produce" --> P_TASK;
    
    P_TASK -- "Consume" --> W1;
    P_TASK -- "Consume" --> W2;
    P_TASK -- "Consume" --> W3;
    
    W1 -- "Task Success, Ack" --> P_TASK;
    W1 -- "Task Result" --> P_RESULT;
    
    W2 -- "Task Failed, Nack" --> P_TASK;
    P_TASK -- "Auto-Retry via Subscription" --> P_RETRY;
    P_RETRY -- "After Delay, Resend" --> P_TASK;
    
    W3 -- "Max Retries Exceeded" --> P_DLQ;

    P_RESULT -- "Consume" --> RA;

    style W1 fill:#d5f5e3,stroke:#27ae60
    style W2 fill:#fadedb,stroke:#c0392b
    style W3 fill:#fadedb,stroke:#c0392b

消息流如下:

  1. 任务投递: 外部服务通过 API 将任务载荷(一个 JSON 对象)发送到 browser-tasks 主题。
  2. 任务消费: Worker 池中的某个空闲 Worker 会消费一条消息。
  3. 成功处理: 如果 Playwright 操作成功,Worker 会向 Pulsar 发送 ack 确认消息,并将执行结果(如截图的Base64编码、页面文本等)发送到 browser-results 主题。
  4. 失败与重试: 如果任务执行失败(例如,页面超时、元素未找到),Worker 会发送 nack。Pulsar 的共享订阅配置了重试机制,会将该消息重新投递到 browser-tasks-retry 主题。一段时间后,Pulsar 会自动将它再次投递回 browser-tasks 主题供其他 Worker 消费。
  5. 死信处理: 当一条消息的重试次数达到阈值后,Pulsar 会自动将其路由到 browser-tasks-dlq 主题。这可以触发告警,由人工介入分析失败原因。

容器化 Playwright Worker 的实现

Worker 是系统的核心,其健壮性至关重要。它不仅要能正确执行 Playwright 任务,还必须能优雅地处理生命周期事件(如被容器编排系统终止)和任务失败。

Dockerfile:构建一个可靠的执行环境

构建 Playwright 镜像是第一个关键步骤。一个常见的错误是忘记安装浏览器依赖,或是在运行时才去下载浏览器。生产级的镜像应该包含所有必要的依赖和预先下载好的浏览器二进制文件。

# 使用官方的 Playwright 镜像作为基础,它已经包含了所有依赖
FROM mcr.microsoft.com/playwright/python:v1.40.0-jammy

WORKDIR /app

# 复制项目依赖文件
COPY requirements.txt .

# 安装 Python 依赖
# --no-cache-dir 减小镜像体积
RUN pip install --no-cache-dir -r requirements.txt

# 复制所有业务代码
COPY . .

# 定义容器启动命令
CMD ["python", "worker.py"]

这里的 requirements.txt 内容很简单:

pulsar-client==3.4.0

使用官方基础镜像 mcr.microsoft.com/playwright/python 是一个最佳实践,它已经处理了所有复杂的系统级依赖,避免了我们自己从头踩坑。

Worker 核心代码:worker.py

这个文件是 Worker 容器的入口点,负责连接 Pulsar、消费消息、调用 Playwright 执行任务,并处理各种边界情况。

import os
import signal
import json
import logging
import traceback
import base64
from typing import Dict, Any

import pulsar
from playwright.sync_api import sync_playwright, Page, Error as PlaywrightError

# --- 配置 ---
# 从环境变量获取配置,这是容器化应用的最佳实践
PULSAR_SERVICE_URL = os.getenv("PULSAR_URL", "pulsar://localhost:6650")
TASKS_TOPIC = os.getenv("TASKS_TOPIC", "persistent://public/default/browser-tasks")
RESULTS_TOPIC = os.getenv("RESULTS_TOPIC", "persistent://public/default/browser-results")
SUBSCRIPTION_NAME = os.getenv("SUBSCRIPTION_NAME", "browser-worker-subscription")
MAX_REDELIVER_COUNT = int(os.getenv("MAX_REDELIVER_COUNT", "5"))

# --- 日志配置 ---
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# --- 全局变量,用于信号处理 ---
# 这里的全局变量设计是为了让信号处理函数能访问到它们,从而实现优雅关闭
pulsar_client = None
consumer = None
producer = None
shutdown_flag = False

# --- 信号处理函数 ---
def handle_shutdown_signal(signum, frame):
    """
    处理 SIGINT 和 SIGTERM 信号,用于优雅关闭。
    这是在容器环境中至关重要的部分,确保在K8s等系统发出终止信号时,
    我们能完成当前任务再退出,而不是被强制杀死。
    """
    global shutdown_flag
    if not shutdown_flag:
        logging.info(f"Received shutdown signal {signum}. Shutting down gracefully...")
        shutdown_flag = True
        # 如果consumer存在,可以尝试关闭它来中断循环
        if consumer:
            consumer.close()
        if pulsar_client:
            pulsar_client.close()


class BrowserTaskExecutor:
    """封装 Playwright 任务执行逻辑"""

    def __init__(self, page: Page):
        self.page = page

    def execute(self, task: Dict[str, Any]) -> Dict[str, Any]:
        """
        根据任务类型分发到不同的处理函数。
        这种模式易于扩展新的任务类型。
        """
        task_type = task.get("type")
        params = task.get("params", {})
        
        logging.info(f"Executing task type: {task_type} with params: {params}")

        if task_type == "screenshot":
            return self._task_screenshot(**params)
        elif task_type == "get_content":
            return self._task_get_content(**params)
        else:
            raise ValueError(f"Unknown task type: {task_type}")

    def _task_screenshot(self, url: str, full_page: bool = True) -> Dict[str, Any]:
        """执行截图任务"""
        self.page.goto(url, wait_until="networkidle", timeout=30000)
        screenshot_bytes = self.page.screenshot(full_page=full_page)
        
        # 在真实项目中,结果可能直接上传到 S3 等对象存储,
        # 然后在结果消息中只包含 URL。这里为演示方便,使用 base64 编码。
        return {"screenshot_base64": base64.b64encode(screenshot_bytes).decode('utf-8')}

    def _task_get_content(self, url: str) -> Dict[str, Any]:
        """获取页面内容任务"""
        self.page.goto(url, wait_until="domcontentloaded", timeout=20000)
        content = self.page.content()
        return {"content": content}


def main():
    global pulsar_client, consumer, producer
    
    # 注册信号处理器
    signal.signal(signal.SIGINT, handle_shutdown_signal)
    signal.signal(signal.SIGTERM, handle_shutdown_signal)

    try:
        # --- 初始化 Pulsar 客户端、消费者和生产者 ---
        pulsar_client = pulsar.Client(PULSAR_SERVICE_URL)
        
        # 结果生产者
        producer = pulsar_client.create_producer(RESULTS_TOPIC)

        # 任务消费者
        # 这里的配置是容错的关键
        consumer = pulsar_client.subscribe(
            TASKS_TOPIC,
            subscription_name=SUBSCRIPTION_NAME,
            consumer_type=pulsar.ConsumerType.Shared, # 共享模式允许多个worker消费
            dead_letter_policy=pulsar.DeadLetterPolicy(
                max_redeliver_count=MAX_REDELIVER_COUNT,
                dead_letter_topic='persistent://public/default/browser-tasks-dlq'
            ),
            # 也可以配置 retry_enable=True 和一个 retry_letter_topic
        )

        logging.info("Worker started. Waiting for tasks...")
        
        # --- 主循环 ---
        # 启动 Playwright 上下文管理器
        with sync_playwright() as p:
            # 在循环外启动浏览器,可以复用浏览器实例,显著提升处理效率
            # 但要注意内存泄漏风险,对于长期运行的服务,可能需要定期重启浏览器实例
            browser = p.chromium.launch(headless=True)
            page = browser.new_page()
            executor = BrowserTaskExecutor(page)

            while not shutdown_flag:
                try:
                    # receive() 是一个阻塞操作,可以设置超时
                    msg = consumer.receive(timeout_millis=1000)
                except Exception as e:
                    # 超时会抛出异常,这是正常的,我们直接 continue
                    if "timed out" in str(e).lower():
                        continue
                    logging.error(f"Error receiving message from Pulsar: {e}")
                    break # 如果是其他Pulsar连接错误,则退出循环

                # 如果 shutdown_flag 被设置,即使收到了消息也选择不再处理
                if shutdown_flag:
                    # 收到消息但需要关闭,nack让其他worker处理
                    consumer.negative_acknowledge(msg)
                    break
                    
                task_id = msg.message_id()
                try:
                    task_payload = json.loads(msg.data().decode('utf-8'))
                    logging.info(f"Received task {task_id}. Payload: {task_payload}")

                    # --- 执行核心任务 ---
                    result_data = executor.execute(task_payload)
                    
                    # --- 任务成功 ---
                    final_result = {
                        "status": "success",
                        "task_id": str(task_id),
                        "original_payload": task_payload,
                        "result": result_data
                    }
                    
                    # 发送结果到结果主题
                    producer.send(json.dumps(final_result).encode('utf-8'))
                    
                    # 确认消息,Pulsar 将不再投递此消息
                    consumer.acknowledge(msg)
                    logging.info(f"Task {task_id} completed successfully.")

                except (PlaywrightError, ValueError, json.JSONDecodeError) as e:
                    # --- 任务失败 ---
                    logging.error(f"Task {task_id} failed. Error: {e}")
                    traceback.print_exc()

                    # 发送失败结果
                    failure_result = {
                        "status": "failed",
                        "task_id": str(task_id),
                        "original_payload": task_payload if 'task_payload' in locals() else None,
                        "error": str(e)
                    }
                    producer.send(json.dumps(failure_result).encode('utf-8'))

                    # 否认消息,Pulsar 会根据策略重试或发送到DLQ
                    consumer.negative_acknowledge(msg)
                    logging.warning(f"Task {task_id} negatively acknowledged.")
                except Exception as e:
                    # 捕获所有其他未知异常
                    logging.error(f"An unexpected error occurred processing task {task_id}: {e}")
                    traceback.print_exc()
                    consumer.negative_acknowledge(msg)
            
            # 清理 Playwright 资源
            browser.close()

    except Exception as e:
        logging.error(f"A critical error occurred in the main loop: {e}")
        traceback.print_exc()
    finally:
        # --- 资源清理 ---
        logging.info("Cleaning up resources...")
        if consumer:
            consumer.close()
        if producer:
            producer.close()
        if pulsar_client:
            pulsar_client.close()
        logging.info("Worker has shut down.")

if __name__ == "__main__":
    main()

任务生产者与系统部署

生产者可以是一个简单的脚本或集成在现有服务中。

producer.py:

import pulsar
import json
import uuid

PULSAR_SERVICE_URL = "pulsar://localhost:6650"
TASKS_TOPIC = "persistent://public/default/browser-tasks"

client = pulsar.Client(PULSAR_SERVICE_URL)
producer = client.create_producer(TASKS_TOPIC)

tasks_to_send = [
    {"type": "screenshot", "params": {"url": "https://www.google.com"}},
    {"type": "get_content", "params": {"url": "https://news.ycombinator.com"}},
    {"type": "screenshot", "params": {"url": "https://github.com", "full_page": False}},
    {"type": "invalid_task", "params": {}}, # 模拟一个失败的任务
    {"type": "screenshot", "params": {"url": "http://non-existent-url.xyz"}}, # 模拟网络错误
]

for task in tasks_to_send:
    # 使用 properties 可以添加元数据,用于路由或过滤
    producer.send(
        json.dumps(task).encode('utf-8'),
        properties={"task_uuid": str(uuid.uuid4())}
    )
    print(f"Sent task: {task}")

producer.flush()
producer.close()
client.close()

为了在本地运行和测试整个系统,我们可以使用 Docker Compose。

docker-compose.yml:

version: '3.8'

services:
  pulsar:
    image: apachepulsar/pulsar:3.2.0
    container_name: pulsar
    ports:
      - "6650:6650"
      - "8080:8080"
    command: >
      /bin/bash -c
      "bin/apply-config-from-env.py conf/standalone.conf &&
       exec bin/pulsar standalone"
    environment:
      # 在 standalone 模式下,Pulsar 会自动创建不存在的 topic,方便开发
      # 生产环境建议禁用,并使用 pulsar-admin 手动创建
      PULSAR_PREFIX_allowAutoTopicCreation: "true"
      
  browser-worker:
    build: .
    # 使用 'depends_on' 确保 Pulsar 先启动
    depends_on:
      - pulsar
    # 覆盖环境变量
    environment:
      PULSAR_URL: "pulsar://pulsar:6650"
    # 使用 'scale' 可以轻松地启动多个 worker 实例
    # docker-compose up --scale browser-worker=3
    deploy:
      replicas: 2

要运行此系统:

  1. 保存 Dockerfile, worker.py, requirements.txt, producer.py, docker-compose.yml 到同一目录。
  2. 构建并启动服务: docker-compose up --build --scale browser-worker=3
  3. 在另一个终端,运行生产者: python producer.py
  4. 观察 docker-compose 的日志,可以看到3个 worker 如何并发地接收和处理任务。

方案的局限性与未来迭代方向

尽管此架构解决了最初的可扩展性和容错性问题,但它并非银弹。在真实生产环境中,仍然存在一些需要考量的局限和优化点。

首先,资源隔离粒度较粗。每个 Worker 容器完整地运行一个浏览器实例,这对于执行轻量级任务来说是一种资源浪费。一个潜在的优化方向是,让单个 Worker 进程管理一个浏览器实例池(Browser Context Pool),用不同的 Browser Context 来处理不同的任务,从而在进程内部实现隔离,大幅降低资源开销。但这会增加 Worker 内部的复杂性,需要仔细管理 Context 的生命周期。

其次,动态伸缩目前依赖手动操作。当前架构非常适合与 KEDA (Kubernetes Event-driven Autoscaling) 集成。通过配置 KEDA ScaledObject 监控 Pulsar 主题的积压消息数 (backlog),可以实现 Worker Deployment 的自动水平伸缩,在任务高峰期自动增加 Worker 数量,在空闲时自动缩减,最大化资源利用率和成本效益。

最后,对于需要执行复杂用户交互(如登录、表单填写)的任务,当前的任务载荷定义过于简单。需要设计一套更通用的任务描述语言(DSL),或许是一个 JSON 定义的指令序列,让 Worker 能够解析并按顺序执行一系列 Playwright 操作。这 фактически 是在构建一个专用的浏览器自动化平台即服务(PaaS)的雏形。


  目录