技术痛点:一个割裂的用户画像
我们团队维护着一个运行了近十年的核心用户系统,Java + Spring Boot + MyBatis 的技术栈,稳定得像一块磐石。它负责处理所有用户的核心身份信息(ID、用户名、注册信息等),数据存储在分片的 MySQL 集群中。问题在于,随着业务发展,我们需要为用户引入更丰富的、非结构化的画像数据,比如用户的兴趣标签、最近的活跃度评分、个性化配置等等。这些数据变化快、结构多变,硬塞进关系型数据库里不仅设计上别扭,还会给这个本已不堪重负的遗留系统带来风险。
最初的方案简单粗暴:前端需要完整的用户画像时,需要并行调用两个接口:
-
GET /api/v1/users/{userId}
(Java/MyBatis 老服务) -
GET /api/v2/profiles/{userId}
(新的 Python/FastAPI 服务)
前端团队怨声载道。他们不仅要处理两次网络请求的复杂性、合并数据,还要处理其中任意一个接口失败或超时的状态,这让前端的状态管理逻辑变得一团糟。我们需要一个聚合层,一个单一的入口点,为客户端提供一个统一、完整的用户画像视图。这个担子,自然就落到了负责新服务的 Python 团队身上。
初步构想:一个看似合理的聚合接口
我的第一版设计非常直接:在 FastAPI 服务中创建一个新的端点 GET /api/v3/composite-profile/{userId}
,它来扮演这个聚合者的角色。其内部逻辑如下:
- 接收到请求,获取
userId
。 - 通过 HTTP 调用 Java/MyBatis 遗留服务的用户接口,获取核心数据。
- 查询自身的 MongoDB,获取扩展画像数据。
- 将两份数据合并,返回给客户端。
下面是这个构想的第一版代码实现。
FastAPI (main.py - v1)
import httpx
from fastapi import FastAPI, HTTPException
from pymongo import MongoClient
from pydantic import BaseModel, Field
import os
import logging
# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- 配置 ---
# 在生产环境中,这些应该来自环境变量或配置服务
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
LEGACY_USER_SERVICE_URL = os.getenv("LEGACY_USER_SERVICE_URL", "http://localhost:8080/api/v1/users")
# --- 应用实例与数据库连接 ---
app = FastAPI()
mongo_client = MongoClient(MONGO_URI)
db = mongo_client.user_profiles
# --- Pydantic 模型定义 ---
class LegacyUser(BaseModel):
userId: int
username: str
email: str
createdAt: str
class ExtendedProfile(BaseModel):
userId: int
tags: list[str] = []
lastActivityScore: float = 0.0
customSettings: dict = Field(default_factory=dict)
class CompositeProfile(BaseModel):
core: LegacyUser
extension: ExtendedProfile
# --- 核心路由 ---
@app.get("/api/v3/composite-profile/{user_id}", response_model=CompositeProfile)
def get_composite_profile(user_id: int):
"""
一个简单的、同步阻塞的聚合实现
"""
legacy_user_data = None
extended_profile_data = None
# 1. 请求遗留服务
try:
logging.info(f"Requesting legacy user data for user_id: {user_id}")
with httpx.Client() as client:
response = client.get(f"{LEGACY_USER_SERVICE_URL}/{user_id}", timeout=5.0)
response.raise_for_status() # 如果状态码是 4xx 或 5xx,则抛出异常
legacy_user_data = response.json()
except httpx.RequestError as exc:
logging.error(f"HTTP request to legacy service failed for user_id: {user_id}. Error: {exc}")
raise HTTPException(status_code=503, detail="Legacy user service is unavailable.")
except Exception as e:
logging.error(f"An unexpected error occurred while fetching legacy data for {user_id}. Error: {e}")
raise HTTPException(status_code=500, detail="Internal server error.")
# 2. 查询 NoSQL 数据库
logging.info(f"Querying extended profile for user_id: {user_id}")
extended_profile_doc = db.profiles.find_one({"userId": user_id})
if not extended_profile_doc:
# 如果没有扩展信息,我们创建一个默认的,保证数据结构的完整性
extended_profile_data = {"userId": user_id, "tags": [], "lastActivityScore": 0.0, "customSettings": {}}
else:
# Pymongo 返回的 _id 是 ObjectId 类型, Pydantic 无法直接处理,需要移除
extended_profile_doc.pop("_id", None)
extended_profile_data = extended_profile_doc
# 3. 组合数据并返回
return CompositeProfile(
core=LegacyUser(**legacy_user_data),
extension=ExtendedProfile(**extended_profile_data)
)
这段代码能跑,但当我提交 Code Review 请求时,我的同事,一位经验丰富的架构师,留下了几条尖锐但一针见血的评论:
- 性能瓶颈: “
get_composite_profile
是一个def
函数,意味着它运行在 FastAPI 的主线程池中。内部的httpx.Client()
是同步阻塞调用。如果遗留服务响应慢(比如慢 200ms),这个 FastAPI worker 在这 200ms 内就完全被占用了,无法处理任何其他请求。在高并发下,线程池会迅速耗尽,整个服务吞吐量会急剧下降。” - 可用性陷阱: “
try...except
块的设计是‘一荣俱荣,一损俱损’。只要遗留服务超时或返回 500,整个聚合接口就会返回 503。但在很多场景下,即使用户核心信息获取失败,我们仍然希望返回可用的扩展信息,或者反之。当前的实现过于脆弱。” - 资源浪费: “为每个请求创建一个
httpx.Client
实例是低效的。正确的模式是为整个应用生命周期维护一个共享的AsyncClient
实例。”
这次 Code Review 让我意识到,我仅仅是“实现”了功能,却完全没有从生产环境的角度去思考性能和韧性。
迭代与重构:拥抱异步与韧性
基于 Code Review 的反馈,我开始了对这个接口的彻底重构。目标很明确:异步化、并发化、提升容错能力。
1. 架构调整:从串行到并行
重构的核心是利用 Python 的 asyncio
和 FastAPI 的原生异步支持。我们将两个独立的数据获取操作(调用遗留服务、查询MongoDB)视为可以并发执行的任务。
sequenceDiagram participant C as Client participant F as FastAPI Aggregator participant L as Legacy Service (Java/MyBatis) participant M as MongoDB C->>+F: GET /api/v3/composite-profile/{id} Note right of F: Naive Implementation (Sequential) F->>+L: GET /api/v1/users/{id} L-->>-F: Core User Data F->>+M: findOne({userId: id}) M-->>-F: Extended Profile Data F-->>-C: Composite Profile %% Refactored Implementation (Concurrent) C->>+F: GET /api/v3/composite-profile/{id} Note right of F: Refactored Implementation (Concurrent) par F->>+L: GET /api/v1/users/{id} and F->>+M: findOne({userId: id}) (async) end L-->>-F: Core User Data M-->>-F: Extended Profile Data F-->>-C: Composite Profile (handles partial failures)
2. 关键代码实现
FastAPI (main.py - v2, Production-Ready)
import asyncio
import httpx
from fastapi import FastAPI, HTTPException, Request
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel, Field
import os
import logging
from contextlib import asynccontextmanager
# --- 日志配置 ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- 配置 ---
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017/")
LEGACY_USER_SERVICE_URL = os.getenv("LEGACY_USER_SERVICE_URL", "http://localhost:8080/api/v1/users")
# 为外部调用设置一个合理的超时
HTTP_CLIENT_TIMEOUT = 3.0
# --- Pydantic 模型定义 (增加可选字段以支持部分失败) ---
class LegacyUser(BaseModel):
userId: int
username: str
email: str
createdAt: str
class ExtendedProfile(BaseModel):
userId: int
tags: list[str] = []
lastActivityScore: float = 0.0
customSettings: dict = Field(default_factory=dict)
# 新模型,允许部分数据为空
class CompositeProfile(BaseModel):
core: LegacyUser | None = None
extension: ExtendedProfile | None = None
errors: list[str] = []
# --- 应用生命周期管理:共享资源 ---
# 使用 lifespan 事件来管理 httpx client 和 motor client 的生命周期
# 这是 FastAPI 0.93.0+ 的推荐做法
@asynccontextmanager
async def lifespan(app: FastAPI):
# 应用启动时
app.state.http_client = httpx.AsyncClient(timeout=HTTP_CLIENT_TIMEOUT)
app.state.mongo_client = AsyncIOMotorClient(MONGO_URI)
app.state.db = app.state.mongo_client.user_profiles
logging.info("Resources initialized.")
yield
# 应用关闭时
await app.state.http_client.aclose()
app.state.mongo_client.close()
logging.info("Resources cleaned up.")
app = FastAPI(lifespan=lifespan)
# --- 异步数据获取函数 (带错误处理) ---
async def fetch_legacy_user(client: httpx.AsyncClient, user_id: int) -> LegacyUser | None:
"""
异步获取遗留用户数据。
如果失败,记录日志并返回 None,而不是抛出异常。
"""
try:
response = await client.get(f"{LEGACY_USER_SERVICE_URL}/{user_id}")
response.raise_for_status()
return LegacyUser(**response.json())
except httpx.RequestError as exc:
logging.error(f"Request to legacy service failed for user_id: {user_id}. Error: {exc}")
return None
except Exception as e:
# Pydantic validation error or other unexpected issues
logging.error(f"Error processing legacy data for {user_id}. Error: {e}")
return None
async def fetch_extended_profile(db, user_id: int) -> ExtendedProfile | None:
"""
异步获取扩展画像数据。
"""
try:
doc = await db.profiles.find_one({"userId": user_id})
if doc:
doc.pop("_id", None)
return ExtendedProfile(**doc)
# 如果找不到,返回一个默认空对象,这通常不被视为一个“错误”
return ExtendedProfile(userId=user_id)
except Exception as e:
logging.error(f"Error fetching extended profile for {user_id} from MongoDB. Error: {e}")
return None
# --- 重构后的核心路由 ---
@app.get("/api/v3/composite-profile/{user_id}", response_model=CompositeProfile)
async def get_composite_profile_resilient(request: Request, user_id: int):
"""
一个高性能、高可用的异步聚合实现
"""
# 从应用状态中获取共享的 clients
http_client = request.app.state.http_client
db = request.app.state.db
# 使用 asyncio.gather 并发执行两个任务
# return_exceptions=True 使得一个任务失败不会取消另一个
results = await asyncio.gather(
fetch_legacy_user(http_client, user_id),
fetch_extended_profile(db, user_id),
return_exceptions=True # 这是一个保险措施,我们内部函数已经处理了异常
)
legacy_data, extended_data = results
errors = []
# 检查每个任务的结果
if legacy_data is None or isinstance(legacy_data, Exception):
errors.append("Failed to fetch core user profile.")
if isinstance(legacy_data, Exception):
logging.error(f"Uncaught exception from fetch_legacy_user: {legacy_data}")
legacy_data = None # 确保为 None
if extended_data is None or isinstance(extended_data, Exception):
errors.append("Failed to fetch extended user profile.")
if isinstance(extended_data, Exception):
logging.error(f"Uncaught exception from fetch_extended_profile: {extended_data}")
extended_data = None # 确保为 None
# 业务决策:如果核心数据和扩展数据都失败了,才认为是服务端错误
if not legacy_data and not extended_data:
raise HTTPException(status_code=503, detail="All upstream services are unavailable.")
return CompositeProfile(core=legacy_data, extension=extended_data, errors=errors)
这个版本解决了 Code Review 中提出的所有问题:
- 异步化:
async def
和await
使得 I/O 操作不再阻塞 worker 线程。 - 并发化:
asyncio.gather
同时发起对遗留服务和 MongoDB 的请求,总耗时取决于最慢的那个请求,而不是两者之和。 - 韧性:
fetch_*
函数内部消化了异常,返回None
。聚合接口可以返回部分成功的数据,并通过errors
字段明确告知客户端哪些数据获取失败。 - 资源管理: 使用
lifespan
事件来管理共享的AsyncClient
和数据库连接,避免了不必要的开销。
遗留系统端:一个稳定的 MyBatis 服务
为了让整个场景完整,这里也展示一下被调用的 Java/MyBatis 服务的核心部分。它非常传统,但坚如磐石。
UserMapper.xml
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.legacy.mapper.UserMapper">
<select id="findById" resultType="com.example.legacy.model.User" parameterType="int">
SELECT
user_id as userId,
username,
email,
created_at as createdAt
FROM
users
WHERE
user_id = #{userId}
</select>
</mapper>
UserController.java
@RestController
@RequestMapping("/api/v1/users")
public class UserController {
private static final Logger logger = LoggerFactory.getLogger(UserController.class);
@Autowired
private UserMapper userMapper;
@GetMapping("/{userId}")
public ResponseEntity<User> getUserById(@PathVariable("userId") int userId) {
// 在真实项目中,这里应该有更复杂的业务逻辑和缓存策略
logger.info("Fetching user with ID: {}", userId);
User user = userMapper.findById(userId);
if (user == null) {
logger.warn("User not found with ID: {}", userId);
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(user);
}
}
这个服务很简单,但它代表了成千上万个企业内部正在运行的、难以改动的关键系统。我们的 FastAPI 聚合层正是为了在不触碰它的前提下,赋予业务新的活力。
前端状态同步:Zustand 的优雅实践
现在,前端面对的是一个统一、健壮的 API。即使后端某个数据源失败,他们也能拿到部分数据和一个明确的错误列表。使用 Zustand 这种轻量级状态管理库,处理这种复杂的服务器状态变得异常简单。
store.ts
import { create } from 'zustand';
interface LegacyUser {
userId: number;
username: string;
email: string;
createdAt: string;
}
interface ExtendedProfile {
userId: number;
tags: string[];
lastActivityScore: number;
customSettings: Record<string, any>;
}
// 定义 Store 的 state 结构
interface ProfileState {
core: LegacyUser | null;
extension: ExtendedProfile | null;
errors: string[];
isLoading: boolean;
// 精细化控制加载和错误状态
isCoreProfileError: boolean;
isExtensionProfileError: boolean;
fetchCompositeProfile: (userId: number) => Promise<void>;
}
export const useProfileStore = create<ProfileState>((set) => ({
core: null,
extension: null,
errors: [],
isLoading: false,
isCoreProfileError: false,
isExtensionProfileError: false,
fetchCompositeProfile: async (userId: number) => {
set({ isLoading: true, errors: [], isCoreProfileError: false, isExtensionProfileError: false });
try {
const response = await fetch(`/api/v3/composite-profile/${userId}`);
if (!response.ok) {
// 处理整个请求都失败的情况,比如 503
throw new Error('Failed to fetch user profile from the server.');
}
const data: {
core: LegacyUser | null;
extension: ExtendedProfile | null;
errors: string[];
} = await response.json();
set({
core: data.core,
extension: data.extension,
errors: data.errors,
// 根据返回的 errors 字段来设置精细化的错误状态
isCoreProfileError: !data.core,
isExtensionProfileError: !data.extension,
isLoading: false,
});
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'An unknown error occurred';
set({
isLoading: false,
errors: [errorMessage],
isCoreProfileError: true, // 整个请求失败,我们认为所有部分都出错了
isExtensionProfileError: true,
});
}
},
}));
React Component (Profile.tsx)
import React, { useEffect } from 'react';
import { useProfileStore } from './store';
const UserProfile = ({ userId }) => {
const {
core,
extension,
isLoading,
isCoreProfileError,
fetchCompositeProfile
} = useProfileStore();
useEffect(() => {
fetchCompositeProfile(userId);
}, [userId, fetchCompositeProfile]);
if (isLoading) {
return <div>Loading profile...</div>;
}
// 即使核心信息失败,只要有扩展信息,我们也能展示一部分
if (!core && !extension) {
return <div>Could not load any profile information. Please try again later.</div>
}
return (
<div>
<h1>User Profile</h1>
{isCoreProfileError ? (
<div className="error-banner">Warning: Could not load core user information.</div>
) : (
core && (
<section>
<h2>Core Info</h2>
<p>Username: {core.username}</p>
<p>Email: {core.email}</p>
</section>
)
)}
{/* 同样地,优雅处理扩展信息的缺失 */}
{extension && (
<section>
<h2>Extended Profile</h2>
<p>Activity Score: {extension.lastActivityScore}</p>
<p>Tags: {extension.tags.join(', ')}</p>
</section>
)}
</div>
);
};
export default UserProfile;
通过这种方式,后端聚合层的韧性设计被无缝地传递到了前端。前端UI可以根据 isCoreProfileError
等精细化的状态,优雅地降级,而不是显示一个白屏或一个通用的“错误”信息。Zustand 的简洁 API 让这一切的实现非常直观。
局限性与未来展望
这套基于 FastAPI 的聚合层架构解决了眼下的痛点,但它并非银弹。当前的方案中,FastAPI 服务本身成了一个新的关键节点,如果它自身出现故障,整个用户画像功能就会瘫痪。此外,对于数据一致性要求极高的场景,这种运行时的同步调用聚合可能并不适合,或许需要引入基于消息队列的事件驱动机制,实现最终一致性。
未来的迭代方向可能包括:
- 分布式缓存: 在聚合层前引入 Redis 或类似工具,缓存从遗留服务获取的数据,降低对老系统的压力和依赖。
- 服务网格: 引入 Linkerd 或 Istio 等服务网格,将超时、重试、熔断等韧性策略从应用代码中下沉到基础设施层,让业务代码更纯粹。
- GraphQL 探索: 对于更复杂的聚合需求,探索使用 GraphQL 替代 RESTful API 聚合层,让客户端能按需请求数据,进一步提升灵活性和效率。