构建基于 Serverless 与 LlamaIndex 的异步 RAG 索引管道的全链路可观测性


一个生产级的 RAG (Retrieval-Augmented Generation) 系统,其稳定性高度依赖于背后数据管道的健壮性。当我们将 LlamaIndex 的索引构建过程迁移到 Serverless 架构上时,问题便开始浮现。一个典型的异步索引流程——用户上传文档到 S3,触发一系列 Lambda 函数进行解析、分块、向量化并最终写入向量数据库——在逻辑上是一个整体,但在物理实现上却是一个分散的、事件驱动的黑盒。

当一份文档索引失败时,问题出在哪一步?是文档解析的 Lambda 内存溢出,是调用 Embedding 模型的 API 超时,还是写入向量数据库时发生了瞬时网络抖动?如果索引延迟变高,是哪个环节成为了瓶颈?在没有端到端可观测性的情况下,回答这些问题无异于大海捞针,这在任何严肃的生产环境中都是不可接受的。

我们的目标是为这个异步、跨多个 Lambda 函数的索引流程建立一个统一的、端到端的分布式追踪视图。从 S3 对象创建的那一刻起,直到向量数据成功入库,所有相关的计算、等待、I/O 操作都必须被捕获并关联在同一个 Trace 下。这不仅仅是为了排查故障,更是为了性能分析和成本优化。

技术选型决策相对直接:

  • 计算平台: AWS Lambda。其事件驱动模型和按需付费的特性,完美契合文档索引这种非固定频率的工作负载。
  • RAG 框架: LlamaIndex。它为文档加载、转换、向量化等步骤提供了成熟的抽象,让我们能专注于业务逻辑而非底层实现。
  • 可观测性标准: OpenTelemetry。作为 CNCF 的项目,它的厂商中立性和丰富的生态系统是关键。我们不希望被任何单一的 APM 供应商锁定。

真正的挑战在于实现细节,尤其是在 Serverless 环境中如何有效地传递追踪上下文(Trace Context)。

架构设计与上下文传播的挑战

整个异步索引管道由两个核心的 Lambda 函数构成,通过 SQS 队列解耦。

graph TD
    A[用户上传文档] -->|写入 Trace Metadata| B(S3 Bucket);
    B -->|触发 s3:ObjectCreated:*| C{Lambda: DocumentParser};
    C -->|写入 Trace Context 到 Message Attributes| D(SQS Queue);
    D -->|触发| E{Lambda: ChunkEmbedder};
    E -->|外部调用| F[Embedding Model API];
    E -->|写入数据| G[Vector Database];

    subgraph Trace Span 1
        C
    end
    subgraph Trace Span 2
        E
    end
    subgraph Trace Span 2.1
        F
    end
    subgraph Trace Span 2.2
        G
    end

    style C fill:#f9f,stroke:#333,stroke-width:2px
    style E fill:#f9f,stroke:#333,stroke-width:2px
  1. DocumentParser 函数: 由 S3 的 ObjectCreated 事件触发。它负责下载原始文档,使用 LlamaIndex 的 NodeParser 将其分割成多个文本块(Chunks),然后为每个文本块生成一条消息,发送到 SQS 队列。
  2. ChunkEmbedder 函数: 消费 SQS 队列中的消息。它获取文本块,调用外部的 Embedding 模型服务将其转换为向量,最后将文本块和对应的向量写入向量数据库。

这里的核心难题是:DocumentParserChunkEmbedder 是两个独立的调用,如何让 OpenTelemetry 知道它们属于同一个业务流程?标准的上下文传播机制依赖于 HTTP Headers (traceparent, tracestate),但这在 S3 -> Lambda -> SQS -> Lambda 这种异步链路上是行不通的。

  • S3 到 DocumentParser: S3 事件本身不携带自定义头信息。一个可行的方案是在上传对象到 S3 时,由上传方(例如一个 API 网关后的 Lambda)将 OpenTelemetry 的 Trace Context 注入到对象的元数据(x-amz-meta-*)中。我们的 DocumentParser 函数在启动时,需要从事件记录中解析出这些元数据,并以此作为父级 Span 来启动新的 Trace。
  • DocumentParserChunkEmbedder: 这一步相对简单。AWS SQS 消息体外支持附加消息属性(Message Attributes)。我们可以将当前的 Trace Context 序列化后放入消息属性中。当 ChunkEmbedder 函数被 SQS 消息触发时,它可以从消息属性中提取出 Trace Context,从而将自己的执行过程关联到已有的 Trace 上。

生产级代码实现

我们使用 Serverless Framework 来定义和部署整个应用。代码结构如下:

.
├── otel_lib/              # OpenTelemetry 初始化和封装
│   └── __init__.py
├── src/
│   ├── document_parser/
│   │   ├── app.py         # Lambda handler
│   │   └── requirements.txt
│   └── chunk_embedder/
│       ├── app.py         # Lambda handler
│       └── requirements.txt
├── serverless.yml         # Serverless Framework 配置文件
└── package.json

OpenTelemetry 初始化封装 (otel_lib/__init__.py)

为了避免在每个 Lambda 函数中重复编写初始化代码,我们创建一个通用的工具库。这在真实项目中至关重要,它保证了所有服务使用一致的配置。

# otel_lib/__init__.py
import os
import logging
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.propagators.aws import AwsXRayPropagator
from opentelemetry.propagators.tracecontext import TraceContextTextMapPropagator
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.instrumentation.aws_lambda import AwsLambdaInstrumentor
from opentelemetry.instrumentation.boto3sqs import Boto3SQSInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor

# 配置日志
logger = logging.getLogger()
logger.setLevel(logging.INFO)

_tracer_provider = None

def get_tracer_provider():
    """
    获取全局 TracerProvider 的单例。
    避免在每次 Lambda 调用时重复初始化,这对冷启动性能至关重要。
    """
    global _tracer_provider
    if _tracer_provider:
        return _tracer_provider

    # 从环境变量读取配置
    otel_exporter_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317")
    service_name = os.environ.get("OTEL_SERVICE_NAME", "rag-indexing-pipeline")
    
    # 1. 定义资源属性,用于标识此遥测数据的来源
    resource = Resource.create({
        ResourceAttributes.SERVICE_NAME: service_name,
        ResourceAttributes.CLOUD_PROVIDER: "aws",
        ResourceAttributes.CLOUD_PLATFORM: "aws_lambda",
        # 函数名和版本会自动由 AwsLambdaInstrumentor 添加
    })

    # 2. 初始化 TracerProvider
    provider = TracerProvider(resource=resource)
    
    # 3. 配置 OTLP Exporter
    # 在生产环境中,这通常指向 OpenTelemetry Collector 或 APM 后端
    otlp_exporter = OTLPSpanExporter(endpoint=otel_exporter_endpoint, insecure=True)
    
    # 4. 使用 BatchSpanProcessor 异步导出 Span
    span_processor = BatchSpanProcessor(otlp_exporter)
    provider.add_span_processor(span_processor)

    # 5. 设置全局 TracerProvider
    trace.set_tracer_provider(provider)
    _tracer_provider = provider
    
    logger.info(f"OpenTelemetry TracerProvider initialized for service: {service_name}")
    return _tracer_provider

def instrument():
    """
    应用所有需要的自动埋点库。
    只需在 Lambda handler 加载前调用一次。
    """
    get_tracer_provider()

    # 自动为 AWS Lambda handler 创建 Span,并处理事件上下文
    AwsLambdaInstrumentor.instrument()
    
    # 自动为 boto3 SQS 客户端注入和提取 Trace Context
    # 这是实现 SQS 上下文传播的关键
    Boto3SQSInstrumentor.instrument()

    # 自动为 requests 库创建 Span,用于追踪对 Embedding API 的调用
    RequestsInstrumentor.instrument()
    
    logger.info("All instrumentations applied.")

# 在模块加载时就执行埋点,确保 handler 被调用前已生效
instrument()

这个封装做了几件关键的事情:

  1. 单例模式: _tracer_provider 是全局的,确保在 Lambda 的生命周期内只初始化一次。
  2. 配置化: OTLP endpoint 和服务名通过环境变量配置,符合十二要素应用原则。
  3. 自动埋点: AwsLambdaInstrumentor 会自动包裹 handler,处理 Lambda 的事件源。Boto3SQSInstrumentor 是连接两个函数 trace 的关键,它会自动处理 SQS Message Attributes。RequestsInstrumentor 则用于追踪外部 HTTP 调用。

serverless.yml 配置文件

这是整个 Serverless 应用的骨架,定义了函数、触发器、权限和环境变量。

# serverless.yml
service: rag-indexing-pipeline
frameworkVersion: '3'

provider:
  name: aws
  runtime: python3.10
  region: us-east-1
  stage: dev
  # 统一为所有函数配置环境变量
  environment:
    OTEL_EXPORTER_OTLP_ENDPOINT: ${param:OTEL_COLLECTOR_ENDPOINT, "otel-collector.observability:4317"}
    OTEL_SERVICE_NAME: ${self:service}
    POWERTOOLS_SERVICE_NAME: ${self:service} # For AWS Lambda Powertools
    LOG_LEVEL: INFO
    SQS_QUEUE_URL: !Ref ChunksQueue
  
  # 定义所有函数需要的最小权限集
  iam:
    role:
      statements:
        - Effect: Allow
          Action:
            - s3:GetObject
          Resource: "arn:aws:s3:::${param:DOCUMENTS_BUCKET}/*"
        - Effect: Allow
          Action:
            - sqs:SendMessage
          Resource: !GetAtt ChunksQueue.Arn
        # 允许 Lambda 将遥测数据发送到 Collector
        # 此处假设 Collector 部署在 VPC 内
        - Effect: "Allow"
          Action:
            - "ec2:CreateNetworkInterface"
            - "ec2:DescribeNetworkInterfaces"
            - "ec2:DeleteNetworkInterface"
          Resource: "*"

# 打包时包含 otel_lib
package:
  patterns:
    - '!node_modules/**'
    - 'otel_lib/**'

functions:
  document-parser:
    handler: src/document_parser/app.handler
    # 让 Python 解释器能找到我们的共享库
    environment:
      PYTHONPATH: /var/task/otel_lib:/var/runtime
    layers:
      # 在生产中,应该将依赖项打包成 Lambda Layer
      - arn:aws:lambda:us-east-1:xxxx:layer:my-dependencies:1
    events:
      - s3:
          bucket: ${param:DOCUMENTS_BUCKET}
          event: s3:ObjectCreated:*
          rules:
            - suffix: .txt
          existing: true
    # 根据文档大小和解析复杂度配置
    memorySize: 512
    timeout: 60

  chunk-embedder:
    handler: src/chunk_embedder/app.handler
    environment:
      PYTHONPATH: /var/task/otel_lib:/var/runtime
      EMBEDDING_API_URL: ${param:EMBEDDING_API_URL}
      VECTOR_DB_URL: ${param:VECTOR_DB_URL}
    layers:
      - arn:aws:lambda:us-east-1:xxxx:layer:my-dependencies:1
    events:
      - sqs:
          arn: !GetAtt ChunksQueue.Arn
          batchSize: 5 # 批量处理以降低成本和提高吞吐
    # Embedding 操作可能需要更多内存和时间
    memorySize: 1024
    timeout: 120

resources:
  Resources:
    ChunksQueue:
      Type: AWS::SQS::Queue
      Properties:
        QueueName: ${self:service}-chunks-queue-${self:provider.stage}

DocumentParser 函数实现 (src/document_parser/app.py)

# src/document_parser/app.py
import os
import boto3
import json
import urllib.parse
from uuid import uuid4

# 关键:导入 otel_lib 会自动执行 instrument()
import otel_lib 

from opentelemetry import trace
from opentelemetry.propagate import extract
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator

from llama_index.core.node_parser import SentenceSplitter

# 获取当前 Span 的 tracer
tracer = trace.get_tracer(__name__)

s3_client = boto3.client("s3")
sqs_client = boto3.client("sqs")
SQS_QUEUE_URL = os.environ["SQS_QUEUE_URL"]

def handler(event, context):
    """
    S3 触发的 Lambda,负责解析文档并发送到 SQS
    """
    # AwsLambdaInstrumentor 会自动处理大部分事件,但S3元数据需要手动提取
    # 这是解决 S3 -> Lambda 上下文传播挑战的核心
    carrier = {}
    s3_meta = event['Records'][0].get('s3', {}).get('object', {}).get('userMetadata', {})
    if s3_meta:
        # S3 元数据 key 会被转为小写
        for k, v in s3_meta.items():
            carrier[k.lower()] = v

    # 从 carrier 中提取父级上下文
    parent_context = TraceContextTextMapPropagator().extract(carrier=carrier)
    
    # 手动创建一个以提取的上下文为父级的 Span
    with tracer.start_as_current_span("process-s3-document", context=parent_context) as span:
        bucket_name = event['Records'][0]['s3']['bucket']['name']
        object_key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
        
        span.set_attribute("s3.bucket.name", bucket_name)
        span.set_attribute("s3.object.key", object_key)
        
        try:
            # 1. 下载文档
            with tracer.start_as_current_span("download-from-s3") as download_span:
                response = s3_client.get_object(Bucket=bucket_name, Key=object_key)
                document_text = response['Body'].read().decode('utf-8')
                download_span.set_attribute("s3.object.size", len(document_text))
            
            # 2. 使用 LlamaIndex 进行分块
            with tracer.start_as_current_span("chunk-document") as chunk_span:
                parser = SentenceSplitter(chunk_size=512, chunk_overlap=50)
                nodes = parser.get_nodes_from_documents([document_text])
                chunk_span.set_attribute("document.chunk_count", len(nodes))
            
            # 3. 发送消息到 SQS
            with tracer.start_as_current_span("send-chunks-to-sqs") as sqs_span:
                entries = []
                for i, node in enumerate(nodes):
                    message_body = json.dumps({
                        "document_key": object_key,
                        "chunk_id": i,
                        "chunk_text": node.get_content()
                    })
                    entries.append({
                        "Id": str(uuid4()),
                        "MessageBody": message_body
                    })
                
                # Boto3SQSInstrumentor 会自动将当前 Span 的上下文注入到消息属性中
                sqs_client.send_message_batch(
                    QueueUrl=SQS_QUEUE_URL,
                    Entries=entries
                )
                sqs_span.set_attribute("sqs.message.count", len(entries))

            span.set_attribute("app.status", "success")
            return {"statusCode": 200, "body": json.dumps("Processing started")}

        except Exception as e:
            # 在 Span 中记录异常信息
            span.record_exception(e)
            span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            span.set_attribute("app.status", "failed")
            # 在生产环境中,这里应该将失败的消息发送到死信队列
            raise e

ChunkEmbedder 函数实现 (src/chunk_embedder/app.py)

# src/chunk_embedder/app.py
import os
import json
import requests

import otel_lib

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

EMBEDDING_API_URL = os.environ["EMBEDDING_API_URL"]
VECTOR_DB_URL = os.environ["VECTOR_DB_URL"]

def handler(event, context):
    """
    SQS 触发的 Lambda,负责向量化并存储文本块
    """
    # AwsLambdaInstrumentor 会自动从 SQS 事件中提取上下文,并为每条消息创建 Span
    for record in event['Records']:
        try:
            # 获取当前消息对应的 Span
            # OTel 的 SQS 埋点库通常会这样做,如果没有,则需要手动从 record['messageAttributes'] 提取
            current_span = trace.get_current_span()
            
            payload = json.loads(record['body'])
            document_key = payload['document_key']
            chunk_id = payload['chunk_id']
            chunk_text = payload['chunk_text']

            current_span.set_attribute("document.key", document_key)
            current_span.set_attribute("document.chunk_id", chunk_id)
            
            # 1. 调用 Embedding API
            vector = get_embedding(chunk_text)

            # 2. 写入向量数据库
            write_to_vector_db(document_key, chunk_id, chunk_text, vector)

            current_span.set_attribute("app.status", "success")

        except Exception as e:
            current_span.record_exception(e)
            current_span.set_status(trace.Status(trace.StatusCode.ERROR, str(e)))
            current_span.set_attribute("app.status", "failed")
            # 触发 Lambda 的重试机制
            raise e

    return {"statusCode": 200}

@tracer.start_as_current_span("get-embedding-from-api")
def get_embedding(text: str) -> list[float]:
    """
    调用外部 API 获取向量。
    RequestsInstrumentor 会自动创建子 Span。
    """
    current_span = trace.get_current_span()
    try:
        response = requests.post(
            EMBEDDING_API_URL, 
            json={"input": text},
            timeout=10 # 必须设置超时
        )
        response.raise_for_status()
        embedding = response.json()['data'][0]['embedding']
        current_span.set_attribute("embedding.dimensions", len(embedding))
        return embedding
    except requests.RequestException as e:
        current_span.record_exception(e)
        current_span.set_status(trace.Status(trace.StatusCode.ERROR, "Embedding API call failed"))
        raise

@tracer.start_as_current_span("write-to-vector-db")
def write_to_vector_db(doc_key, chunk_id, text, vector):
    """
    将数据写入向量数据库。
    这里用 requests 模拟,实际项目中应使用对应的数据库客户端。
    """
    # 此处应有真实数据库客户端的调用代码
    # 如果客户端库没有 OTel 埋点,就需要像这样手动创建 Span
    current_span = trace.get_current_span()
    current_span.set_attribute("db.system", "pinecone_or_weaviate") # 示例
    current_span.set_attribute("db.operation", "upsert")
    
    # 模拟写入延迟
    import time
    time.sleep(0.05) 

最终成果与局限性

部署完成后,当一个 .txt 文件被上传到指定的 S3 桶时,我们可以在可观测性后端(如 Jaeger 或 Honeycomb)看到一条完整的分布式追踪链路。这条链路清晰地展示了:

  1. process-s3-document 作为根 Span,包含了整个文档处理流程。
  2. 其下有 download-from-s3, chunk-document, send-chunks-to-sqs 等子 Span,精确度量了 DocumentParser 函数内部各阶段的耗时。
  3. 对于每个发送到 SQS 的文本块,ChunkEmbedder 函数的调用会作为 send-chunks-to-sqs Span 的子 Span(遵循 Follows From 关系)出现,完美地将异步调用关联起来。
  4. ChunkEmbedder 的 Span 内部,我们又能看到对 Embedding API (get-embedding-from-api) 和向量数据库 (write-to-vector-db) 的调用耗时。

这种粒度的可见性,使得定位性能瓶颈(例如,是 Embedding API 慢还是数据库写入慢?)或排查特定文档索引失败(例如,某个文本块导致 Embedding API 返回错误)变得轻而易举。

然而,当前方案并非没有局限。首先,在 DocumentParser 中手动处理 S3 元数据的上下文提取,是一种侵入性较强的“hack”。如果上传 S3 的客户端不受我们控制,无法注入 traceparent 元数据,那么追踪链路就会在这里断裂。AWS X-Ray 在某些服务间能自动传播上下文,但 OpenTelemetry 的原生支持仍有待完善。其次,我们只关注了 Traces。一个完整的可观测性方案还需要关联 Metrics(如 Lambda 的执行时间、内存使用率)和 Logs(包含 trace_idspan_id 的结构化日志),以便进行更深层次的分析。最后,对于大规模、高吞吐的索引管道,全量采样(tracing every request)的成本会非常高,引入基于 OpenTelemetry Collector 的尾部采样(Tail-based Sampling)策略将是必要的下一步优化。


  目录