FastAPI 聚合 MyBatis 遗留服务与 NoSQL 数据的架构实践


技术痛点:一个割裂的用户画像

我们团队维护着一个运行了近十年的核心用户系统,Java + Spring Boot + MyBatis 的技术栈,稳定得像一块磐石。它负责处理所有用户的核心身份信息(ID、用户名、注册信息等),数据存储在分片的 MySQL 集群中。问题在于,随着业务发展,我们需要为用户引入更丰富的、非结构化的画像数据,比如用户的兴趣标签、最近的活跃度评分、个性化配置等等。这些数据变化快、结构多变,硬塞进关系型数据库里不仅设计上别扭,还会给这个本已不堪重负的遗留系统带来风险。

最初的方案简单粗暴:前端需要完整的用户画像时,需要并行调用两个接口:

  1. GET /api/v1/users/{userId} (Java/MyBatis 老服务)
  2. GET /api/v2/profiles/{userId} (新的 Python/FastAPI 服务)

前端团队怨声载道。他们不仅要处理两次网络请求的复杂性、合并数据,还要处理其中任意一个接口失败或超时的状态,这让前端的状态管理逻辑变得一团糟。我们需要一个聚合层,一个单一的入口点,为客户端提供一个统一、完整的用户画像视图。这个担子,自然就落到了负责新服务的 Python 团队身上。

初步构想:一个看似合理的聚合接口

我的第一版设计非常直接:在 FastAPI 服务中创建一个新的端点 GET /api/v3/composite-profile/{userId},它来扮演这个聚合者的角色。其内部逻辑如下:

  1. 接收到请求,获取 userId
  2. 通过 HTTP 调用 Java/MyBatis 遗留服务的用户接口,获取核心数据。
  3. 查询自身的 MongoDB,获取扩展画像数据。
  4. 将两份数据合并,返回给客户端。

下面是这个构想的第一版代码实现。

FastAPI (main.py - v1)

import httpx
from fastapi import FastAPI, HTTPException
from pymongo import MongoClient
from pydantic import BaseModel, Field
import os
import logging

# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 配置 ---
# 在生产环境中,这些应该来自环境变量或配置服务
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
LEGACY_USER_SERVICE_URL = os.getenv("LEGACY_USER_SERVICE_URL", "http://localhost:8080/api/v1/users")

# --- 应用实例与数据库连接 ---
app = FastAPI()
mongo_client = MongoClient(MONGO_URI)
db = mongo_client.user_profiles

# --- Pydantic 模型定义 ---
class LegacyUser(BaseModel):
    userId: int
    username: str
    email: str
    createdAt: str

class ExtendedProfile(BaseModel):
    userId: int
    tags: list[str] = []
    lastActivityScore: float = 0.0
    customSettings: dict = Field(default_factory=dict)

class CompositeProfile(BaseModel):
    core: LegacyUser
    extension: ExtendedProfile

# --- 核心路由 ---
@app.get("/api/v3/composite-profile/{user_id}", response_model=CompositeProfile)
def get_composite_profile(user_id: int):
    """
    一个简单的、同步阻塞的聚合实现
    """
    legacy_user_data = None
    extended_profile_data = None

    # 1. 请求遗留服务
    try:
        logging.info(f"Requesting legacy user data for user_id: {user_id}")
        with httpx.Client() as client:
            response = client.get(f"{LEGACY_USER_SERVICE_URL}/{user_id}", timeout=5.0)
            response.raise_for_status() # 如果状态码是 4xx 或 5xx,则抛出异常
            legacy_user_data = response.json()
    except httpx.RequestError as exc:
        logging.error(f"HTTP request to legacy service failed for user_id: {user_id}. Error: {exc}")
        raise HTTPException(status_code=503, detail="Legacy user service is unavailable.")
    except Exception as e:
        logging.error(f"An unexpected error occurred while fetching legacy data for {user_id}. Error: {e}")
        raise HTTPException(status_code=500, detail="Internal server error.")

    # 2. 查询 NoSQL 数据库
    logging.info(f"Querying extended profile for user_id: {user_id}")
    extended_profile_doc = db.profiles.find_one({"userId": user_id})
    if not extended_profile_doc:
        # 如果没有扩展信息,我们创建一个默认的,保证数据结构的完整性
        extended_profile_data = {"userId": user_id, "tags": [], "lastActivityScore": 0.0, "customSettings": {}}
    else:
        # Pymongo 返回的 _id 是 ObjectId 类型, Pydantic 无法直接处理,需要移除
        extended_profile_doc.pop("_id", None)
        extended_profile_data = extended_profile_doc

    # 3. 组合数据并返回
    return CompositeProfile(
        core=LegacyUser(**legacy_user_data),
        extension=ExtendedProfile(**extended_profile_data)
    )

这段代码能跑,但当我提交 Code Review 请求时,我的同事,一位经验丰富的架构师,留下了几条尖锐但一针见血的评论:

  1. 性能瓶颈: “get_composite_profile 是一个 def 函数,意味着它运行在 FastAPI 的主线程池中。内部的 httpx.Client() 是同步阻塞调用。如果遗留服务响应慢(比如慢 200ms),这个 FastAPI worker 在这 200ms 内就完全被占用了,无法处理任何其他请求。在高并发下,线程池会迅速耗尽,整个服务吞吐量会急剧下降。”
  2. 可用性陷阱: “try...except 块的设计是‘一荣俱荣,一损俱损’。只要遗留服务超时或返回 500,整个聚合接口就会返回 503。但在很多场景下,即使用户核心信息获取失败,我们仍然希望返回可用的扩展信息,或者反之。当前的实现过于脆弱。”
  3. 资源浪费: “为每个请求创建一个 httpx.Client 实例是低效的。正确的模式是为整个应用生命周期维护一个共享的 AsyncClient 实例。”

这次 Code Review 让我意识到,我仅仅是“实现”了功能,却完全没有从生产环境的角度去思考性能和韧性。

迭代与重构:拥抱异步与韧性

基于 Code Review 的反馈,我开始了对这个接口的彻底重构。目标很明确:异步化、并发化、提升容错能力。

1. 架构调整:从串行到并行

重构的核心是利用 Python 的 asyncio 和 FastAPI 的原生异步支持。我们将两个独立的数据获取操作(调用遗留服务、查询MongoDB)视为可以并发执行的任务。

sequenceDiagram
    participant C as Client
    participant F as FastAPI Aggregator
    participant L as Legacy Service (Java/MyBatis)
    participant M as MongoDB

    C->>+F: GET /api/v3/composite-profile/{id}
    Note right of F: Naive Implementation (Sequential)
    F->>+L: GET /api/v1/users/{id}
    L-->>-F: Core User Data
    F->>+M: findOne({userId: id})
    M-->>-F: Extended Profile Data
    F-->>-C: Composite Profile

    %% Refactored Implementation (Concurrent)
    C->>+F: GET /api/v3/composite-profile/{id}
    Note right of F: Refactored Implementation (Concurrent)
    par
        F->>+L: GET /api/v1/users/{id}
    and
        F->>+M: findOne({userId: id}) (async)
    end
    L-->>-F: Core User Data
    M-->>-F: Extended Profile Data
    F-->>-C: Composite Profile (handles partial failures)

2. 关键代码实现

FastAPI (main.py - v2, Production-Ready)

import asyncio
import httpx
from fastapi import FastAPI, HTTPException, Request
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel, Field
import os
import logging
from contextlib import asynccontextmanager

# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- 配置 ---
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
LEGACY_USER_SERVICE_URL = os.getenv("LEGACY_USER_SERVICE_URL", "http://localhost:8080/api/v1/users")
# 为外部调用设置一个合理的超时
HTTP_CLIENT_TIMEOUT = 3.0

# --- Pydantic 模型定义 (增加可选字段以支持部分失败) ---
class LegacyUser(BaseModel):
    userId: int
    username: str
    email: str
    createdAt: str

class ExtendedProfile(BaseModel):
    userId: int
    tags: list[str] = []
    lastActivityScore: float = 0.0
    customSettings: dict = Field(default_factory=dict)

# 新模型,允许部分数据为空
class CompositeProfile(BaseModel):
    core: LegacyUser | None = None
    extension: ExtendedProfile | None = None
    errors: list[str] = []

# --- 应用生命周期管理:共享资源 ---
# 使用 lifespan 事件来管理 httpx client 和 motor client 的生命周期
# 这是 FastAPI 0.93.0+ 的推荐做法
@asynccontextmanager
async def lifespan(app: FastAPI):
    # 应用启动时
    app.state.http_client = httpx.AsyncClient(timeout=HTTP_CLIENT_TIMEOUT)
    app.state.mongo_client = AsyncIOMotorClient(MONGO_URI)
    app.state.db = app.state.mongo_client.user_profiles
    logging.info("Resources initialized.")
    yield
    # 应用关闭时
    await app.state.http_client.aclose()
    app.state.mongo_client.close()
    logging.info("Resources cleaned up.")

app = FastAPI(lifespan=lifespan)

# --- 异步数据获取函数 (带错误处理) ---
async def fetch_legacy_user(client: httpx.AsyncClient, user_id: int) -> LegacyUser | None:
    """
    异步获取遗留用户数据。
    如果失败,记录日志并返回 None,而不是抛出异常。
    """
    try:
        response = await client.get(f"{LEGACY_USER_SERVICE_URL}/{user_id}")
        response.raise_for_status()
        return LegacyUser(**response.json())
    except httpx.RequestError as exc:
        logging.error(f"Request to legacy service failed for user_id: {user_id}. Error: {exc}")
        return None
    except Exception as e:
        # Pydantic validation error or other unexpected issues
        logging.error(f"Error processing legacy data for {user_id}. Error: {e}")
        return None

async def fetch_extended_profile(db, user_id: int) -> ExtendedProfile | None:
    """
    异步获取扩展画像数据。
    """
    try:
        doc = await db.profiles.find_one({"userId": user_id})
        if doc:
            doc.pop("_id", None)
            return ExtendedProfile(**doc)
        # 如果找不到,返回一个默认空对象,这通常不被视为一个“错误”
        return ExtendedProfile(userId=user_id)
    except Exception as e:
        logging.error(f"Error fetching extended profile for {user_id} from MongoDB. Error: {e}")
        return None

# --- 重构后的核心路由 ---
@app.get("/api/v3/composite-profile/{user_id}", response_model=CompositeProfile)
async def get_composite_profile_resilient(request: Request, user_id: int):
    """
    一个高性能、高可用的异步聚合实现
    """
    # 从应用状态中获取共享的 clients
    http_client = request.app.state.http_client
    db = request.app.state.db

    # 使用 asyncio.gather 并发执行两个任务
    # return_exceptions=True 使得一个任务失败不会取消另一个
    results = await asyncio.gather(
        fetch_legacy_user(http_client, user_id),
        fetch_extended_profile(db, user_id),
        return_exceptions=True # 这是一个保险措施,我们内部函数已经处理了异常
    )
    
    legacy_data, extended_data = results
    errors = []

    # 检查每个任务的结果
    if legacy_data is None or isinstance(legacy_data, Exception):
        errors.append("Failed to fetch core user profile.")
        if isinstance(legacy_data, Exception):
            logging.error(f"Uncaught exception from fetch_legacy_user: {legacy_data}")
        legacy_data = None # 确保为 None

    if extended_data is None or isinstance(extended_data, Exception):
        errors.append("Failed to fetch extended user profile.")
        if isinstance(extended_data, Exception):
            logging.error(f"Uncaught exception from fetch_extended_profile: {extended_data}")
        extended_data = None # 确保为 None

    # 业务决策:如果核心数据和扩展数据都失败了,才认为是服务端错误
    if not legacy_data and not extended_data:
        raise HTTPException(status_code=503, detail="All upstream services are unavailable.")
    
    return CompositeProfile(core=legacy_data, extension=extended_data, errors=errors)

这个版本解决了 Code Review 中提出的所有问题:

  • 异步化: async defawait 使得 I/O 操作不再阻塞 worker 线程。
  • 并发化: asyncio.gather 同时发起对遗留服务和 MongoDB 的请求,总耗时取决于最慢的那个请求,而不是两者之和。
  • 韧性: fetch_* 函数内部消化了异常,返回 None。聚合接口可以返回部分成功的数据,并通过 errors 字段明确告知客户端哪些数据获取失败。
  • 资源管理: 使用 lifespan 事件来管理共享的 AsyncClient 和数据库连接,避免了不必要的开销。

遗留系统端:一个稳定的 MyBatis 服务

为了让整个场景完整,这里也展示一下被调用的 Java/MyBatis 服务的核心部分。它非常传统,但坚如磐石。

UserMapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.legacy.mapper.UserMapper">
    <select id="findById" resultType="com.example.legacy.model.User" parameterType="int">
        SELECT
            user_id as userId,
            username,
            email,
            created_at as createdAt
        FROM
            users
        WHERE
            user_id = #{userId}
    </select>
</mapper>

UserController.java

@RestController
@RequestMapping("/api/v1/users")
public class UserController {

    private static final Logger logger = LoggerFactory.getLogger(UserController.class);
    
    @Autowired
    private UserMapper userMapper;

    @GetMapping("/{userId}")
    public ResponseEntity<User> getUserById(@PathVariable("userId") int userId) {
        // 在真实项目中,这里应该有更复杂的业务逻辑和缓存策略
        logger.info("Fetching user with ID: {}", userId);
        User user = userMapper.findById(userId);
        if (user == null) {
            logger.warn("User not found with ID: {}", userId);
            return ResponseEntity.notFound().build();
        }
        return ResponseEntity.ok(user);
    }
}

这个服务很简单,但它代表了成千上万个企业内部正在运行的、难以改动的关键系统。我们的 FastAPI 聚合层正是为了在不触碰它的前提下,赋予业务新的活力。

前端状态同步:Zustand 的优雅实践

现在,前端面对的是一个统一、健壮的 API。即使后端某个数据源失败,他们也能拿到部分数据和一个明确的错误列表。使用 Zustand 这种轻量级状态管理库,处理这种复杂的服务器状态变得异常简单。

store.ts

import { create } from 'zustand';

interface LegacyUser {
  userId: number;
  username: string;
  email: string;
  createdAt: string;
}

interface ExtendedProfile {
  userId: number;
  tags: string[];
  lastActivityScore: number;
  customSettings: Record<string, any>;
}

// 定义 Store 的 state 结构
interface ProfileState {
  core: LegacyUser | null;
  extension: ExtendedProfile | null;
  errors: string[];
  isLoading: boolean;
  // 精细化控制加载和错误状态
  isCoreProfileError: boolean;
  isExtensionProfileError: boolean;
  
  fetchCompositeProfile: (userId: number) => Promise<void>;
}

export const useProfileStore = create<ProfileState>((set) => ({
  core: null,
  extension: null,
  errors: [],
  isLoading: false,
  isCoreProfileError: false,
  isExtensionProfileError: false,
  
  fetchCompositeProfile: async (userId: number) => {
    set({ isLoading: true, errors: [], isCoreProfileError: false, isExtensionProfileError: false });
    
    try {
      const response = await fetch(`/api/v3/composite-profile/${userId}`);
      
      if (!response.ok) {
        // 处理整个请求都失败的情况,比如 503
        throw new Error('Failed to fetch user profile from the server.');
      }
      
      const data: {
        core: LegacyUser | null;
        extension: ExtendedProfile | null;
        errors: string[];
      } = await response.json();
      
      set({
        core: data.core,
        extension: data.extension,
        errors: data.errors,
        // 根据返回的 errors 字段来设置精细化的错误状态
        isCoreProfileError: !data.core, 
        isExtensionProfileError: !data.extension,
        isLoading: false,
      });

    } catch (error) {
      const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred';
      set({
        isLoading: false,
        errors: [errorMessage],
        isCoreProfileError: true, // 整个请求失败,我们认为所有部分都出错了
        isExtensionProfileError: true,
      });
    }
  },
}));

React Component (Profile.tsx)

import React, { useEffect } from 'react';
import { useProfileStore } from './store';

const UserProfile = ({ userId }) => {
  const { 
    core, 
    extension, 
    isLoading, 
    isCoreProfileError, 
    fetchCompositeProfile 
  } = useProfileStore();

  useEffect(() => {
    fetchCompositeProfile(userId);
  }, [userId, fetchCompositeProfile]);

  if (isLoading) {
    return <div>Loading profile...</div>;
  }

  // 即使核心信息失败,只要有扩展信息,我们也能展示一部分
  if (!core && !extension) {
    return <div>Could not load any profile information. Please try again later.</div>
  }

  return (
    <div>
      <h1>User Profile</h1>
      {isCoreProfileError ? (
        <div className="error-banner">Warning: Could not load core user information.</div>
      ) : (
        core && (
          <section>
            <h2>Core Info</h2>
            <p>Username: {core.username}</p>
            <p>Email: {core.email}</p>
          </section>
        )
      )}
      
      {/* 同样地,优雅处理扩展信息的缺失 */}
      {extension && (
        <section>
          <h2>Extended Profile</h2>
          <p>Activity Score: {extension.lastActivityScore}</p>
          <p>Tags: {extension.tags.join(', ')}</p>
        </section>
      )}
    </div>
  );
};

export default UserProfile;

通过这种方式,后端聚合层的韧性设计被无缝地传递到了前端。前端UI可以根据 isCoreProfileError 等精细化的状态,优雅地降级,而不是显示一个白屏或一个通用的“错误”信息。Zustand 的简洁 API 让这一切的实现非常直观。

局限性与未来展望

这套基于 FastAPI 的聚合层架构解决了眼下的痛点,但它并非银弹。当前的方案中,FastAPI 服务本身成了一个新的关键节点,如果它自身出现故障,整个用户画像功能就会瘫痪。此外,对于数据一致性要求极高的场景,这种运行时的同步调用聚合可能并不适合,或许需要引入基于消息队列的事件驱动机制,实现最终一致性。

未来的迭代方向可能包括:

  1. 分布式缓存: 在聚合层前引入 Redis 或类似工具,缓存从遗留服务获取的数据,降低对老系统的压力和依赖。
  2. 服务网格: 引入 Linkerd 或 Istio 等服务网格,将超时、重试、熔断等韧性策略从应用代码中下沉到基础设施层,让业务代码更纯粹。
  3. GraphQL 探索: 对于更复杂的聚合需求,探索使用 GraphQL 替代 RESTful API 聚合层,让客户端能按需请求数据,进一步提升灵活性和效率。

  目录