基于 etcd Watch 与 Redis Hash 实现高可用的多租户配置中心


在构建多租户SaaS平台时,一个绕不开的挑战是租户配置的管理。这些配置包括功能开关(Feature Flags)、API速率限制、主题定制等。每个进入系统的API请求,都需要快速、准确地获取当前租户的上下文信息。直接从主数据库(如PostgreSQL)中读取这些配置,会在高并发下形成性能瓶颈,甚至拖垮整个系统。

显而易见的解决方案是引入缓存。但一个简单的TTL(Time-To-Live)缓存策略在这里并不适用。当运营人员在后台修改了某个租户的配置时,我们期望这个变更能够近乎实时地生效,而不是等待缓存过期。这种对数据一致性和低延迟读取的双重诉isqiu,要求我们设计一个更精密的架构。

我们的目标是构建一个独立的、高可用的配置同步服务。它负责监听配置源的变化,并将其推送到一个高性能的缓存层。其他业务服务只需从缓存层读取配置,从而实现与主数据源的解耦。

技术选型决策

在一个典型的生产环境中,我们需要对三个核心组件进行选型:配置的“真相源头”(Source of Truth)、高速缓存以及执行同步任务的服务本身。

  1. 真相源头:为什么是 etcd?

    • 一致性保证: etcd 基于Raft协议,为分布式环境提供了强一致性的键值存储。对于配置这类关键数据,我们不能容忍数据不一致或丢失。

    • Watch机制: 这是etcd最关键的特性之一。它允许客户端监听指定前缀(prefix)下所有键的变化。这正是我们实现“实时同步”的基础,远比轮询数据库要高效和优雅。

    • 事务操作: etcd支持简单的事务,可以确保多个配置项的原子性更新。

      相较之下,使用关系型数据库并通过某种CDC(Change Data Capture)工具虽然可行,但架构更重,延迟也更高。

  2. 高速缓存:为什么是 Redis Hash?

    • 性能: Redis的内存读写性能毋庸置疑,能够轻松应对高QPS的配置读取请求。
    • 数据结构: 对于每个租户的配置,它本质上是一个结构化的对象。使用Redis的Hash结构(HSET/HGETALL)来存储是绝佳选择。相比于将每个配置项(如tenant:123:feature_a, tenant:123:rate_limit)存为独立的key,使用Hash(HSET tenant:123:config feature_a 'true' rate_limit 100)能极大地减少key的数量,并且在逻辑上更内聚,内存使用也更高效。
  3. 同步服务:为什么是 Go?

    • 并发模型: Go的goroutine和channel天生适合处理etcd watch这类持续的、基于事件的IO任务。我们可以轻松地为每个watcher或后台任务启动一个goroutine,代码清晰且资源开销小。
    • 生态系统: Go拥有高质量的etcd和Redis官方或社区客户端库,网络编程能力强大,编译后的二进制文件部署简单,无额外运行时依赖。

整个架构的数据流如下:

graph TD
    A[管理后台/Ops] --修改配置--> B(etcd);
    B --Watch API推送变更事件--> C{Go同步服务};
    C --解析事件, 更新数据--> D(Redis);
    E[业务服务A] --读取配置--> D;
    F[业务服务B] --读取配置--> D;

步骤化实现

我们将从零开始构建这个Go同步服务。项目的核心是Synchronizer,它封装了与etcd和Redis的交互逻辑。

1. 项目结构与配置

一个清晰的项目结构是可维护性的开端。

/config-synchronizer
├── cmd
│   └── main.go
├── internal
│   ├── config
│   │   └── config.go
│   ├── service
│   │   └── synchronizer.go
│   └── storage
│       └── model.go
├── go.mod
├── go.sum
└── config.yaml

首先定义服务的配置文件config.yaml

# config.yaml
app:
  name: "config-synchronizer"
  log_level: "info"

etcd:
  endpoints: ["127.0.0.1:2379"]
  dial_timeout: 5 # in seconds
  username: ""
  password: ""
  key_prefix: "/tenants/"

redis:
  address: "127.0.0.1:6379"
  password: ""
  db: 0
  dial_timeout: 5 # in seconds

接着,在internal/config/config.go中创建对应的Go结构体来加载这些配置。

// internal/config/config.go
package config

import (
	"os"
	"time"

	"gopkg.in/yaml.v3"
)

type Config struct {
	App  AppConfig  `yaml:"app"`
	Etcd EtcdConfig `yaml:"etcd"`
	Redis RedisConfig `yaml:"redis"`
}

type AppConfig struct {
	Name     string `yaml:"name"`
	LogLevel string `yaml:"log_level"`
}

type EtcdConfig struct {
	Endpoints   []string      `yaml:"endpoints"`
	DialTimeout time.Duration `yaml:"dial_timeout"`
	Username    string        `yaml:"username"`
	Password    string        `yaml:"password"`
	KeyPrefix   string        `yaml:"key_prefix"`
}

type RedisConfig struct {
	Address     string        `yaml:"address"`
	Password    string        `yaml:"password"`
	DB          int           `yaml:"db"`
	DialTimeout time.Duration `yaml:"dial_timeout"`
}

func Load(path string) (*Config, error) {
	data, err := os.ReadFile(path)
	if err != nil {
		return nil, err
	}

	// 自动将配置中的秒数转换为 time.Duration
	var cfg Config
	// A trick to parse duration from integer seconds in yaml
	type rawConfig struct {
		App  AppConfig  `yaml:"app"`
		Etcd rawEtcdConfig `yaml:"etcd"`
		Redis rawRedisConfig `yaml:"redis"`
	}
	var rawCfg rawConfig
	if err := yaml.Unmarshal(data, &rawCfg); err != nil {
		return nil, err
	}
	
	cfg.App = rawCfg.App
	cfg.Redis = RedisConfig{
		Address: rawCfg.Redis.Address,
		Password: rawCfg.Redis.Password,
		DB: rawCfg.Redis.DB,
		DialTimeout: time.Duration(rawCfg.Redis.DialTimeout) * time.Second,
	}
	cfg.Etcd = EtcdConfig{
		Endpoints: rawCfg.Etcd.Endpoints,
		Username: rawCfg.Etcd.Username,
		Password: rawCfg.Etcd.Password,
		KeyPrefix: rawCfg.Etcd.KeyPrefix,
		DialTimeout: time.Duration(rawCfg.Etcd.DialTimeout) * time.Second,
	}

	return &cfg, nil
}

// Intermediate structs for parsing duration
type rawEtcdConfig struct {
	Endpoints   []string `yaml:"endpoints"`
	DialTimeout int      `yaml:"dial_timeout"`
	Username    string   `yaml:"username"`
	Password    string   `yaml:"password"`
	KeyPrefix   string   `yaml:"key_prefix"`
}

type rawRedisConfig struct {
	Address     string `yaml:"address"`
	Password    string `yaml:"password"`
	DB          int    `yaml:"db"`
	DialTimeout int    `yaml:"dial_timeout"`
}

2. 定义数据模型

我们需要一个统一的结构来表示租户的配置。这个结构将被序列化为JSON存入etcd。

// internal/storage/model.go
package storage

import "encoding/json"

// TenantConfig defines the structure for a tenant's configuration.
type TenantConfig struct {
	ID          string         `json:"id"`
	Name        string         `json:"name"`
	Status      string         `json:"status"` // e.g., "active", "suspended"
	RateLimit   int            `json:"rate_limit"`
	FeatureFlags map[string]bool `json:"feature_flags"`
}

// ToMap converts the struct to a map[string]interface{} for Redis HSET.
// In a real project, this logic could be more complex, handling nested structs.
func (tc *TenantConfig) ToMap() (map[string]interface{}, error) {
    // We serialize the whole object as json and then deserialize to a map.
    // This is a generic way to handle complex structs.
    // For performance critical paths, a manual conversion might be better.
	data, err := json.Marshal(tc)
	if err != nil {
		return nil, err
	}

	var result map[string]interface{}
	if err := json.Unmarshal(data, &result); err != nil {
		return nil, err
	}
	return result, nil
}

3. 核心同步服务 Synchronizer

这是整个项目的核心。synchronizer.go 文件将包含启动、全量同步和增量同步的逻辑。

// internal/service/synchronizer.go
package service

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"
	"time"

	"github.com/redis/go-redis/v9"
	"go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"

	"config-synchronizer/internal/config"
	"config-synchronizer/internal/storage"
)

// Synchronizer is responsible for syncing tenant configs from etcd to Redis.
type Synchronizer struct {
	etcdClient *clientv3.Client
	redisClient *redis.Client
	cfg        *config.Config
	logger     *zap.Logger
}

func NewSynchronizer(
	cfg *config.Config,
	etcdClient *clientv3.Client,
	redisClient *redis.Client,
	logger *zap.Logger,
) *Synchronizer {
	return &Synchronizer{
		cfg:        cfg,
		etcdClient: etcdClient,
		redisClient: redisClient,
		logger:     logger,
	}
}

// Start initiates the synchronization process.
// It first performs a full sync and then starts watching for changes.
func (s *Synchronizer) Start(ctx context.Context) error {
	s.logger.Info("starting initial full synchronization")
	rev, err := s.fullSync(ctx)
	if err != nil {
		return fmt.Errorf("initial full sync failed: %w", err)
	}
	s.logger.Info("full synchronization completed", zap.Int64("revision", rev))

	// Start watching for changes from the revision after the full sync
	go s.watchChanges(ctx, rev+1)

	return nil
}

// fullSync performs a one-time full load of all tenant configurations.
// It returns the etcd revision at the end of the operation.
func (s *Synchronizer) fullSync(ctx context.Context) (int64, error) {
	// Use WithPrefix() to get all keys under the configured prefix.
	resp, err := s.etcdClient.Get(ctx, s.cfg.Etcd.KeyPrefix, clientv3.WithPrefix())
	if err != nil {
		return 0, err
	}

	if len(resp.Kvs) == 0 {
		s.logger.Info("no existing tenant configurations found in etcd")
		return resp.Header.Revision, nil
	}

	// Use Redis pipeline for batch updates to improve performance.
	pipe := s.redisClient.Pipeline()
	for _, kv := range resp.Kvs {
		var tenantCfg storage.TenantConfig
		if err := json.Unmarshal(kv.Value, &tenantCfg); err != nil {
			s.logger.Warn("failed to unmarshal tenant config, skipping",
				zap.String("key", string(kv.Key)),
				zap.Error(err),
			)
			continue
		}
		
		redisKey := s.getRedisKey(tenantCfg.ID)
		cfgMap, err := tenantCfg.ToMap()
		if err != nil {
			s.logger.Warn("failed to convert tenant config to map, skipping",
				zap.String("key", string(kv.Key)),
				zap.Error(err),
			)
			continue
		}
		
		// HSet will create or overwrite the hash.
		pipe.HSet(ctx, redisKey, cfgMap)
	}

	if _, err := pipe.Exec(ctx); err != nil {
		return 0, fmt.Errorf("redis pipeline execution failed: %w", err)
	}

	s.logger.Info("successfully synced tenant configurations", zap.Int("count", len(resp.Kvs)))
	return resp.Header.Revision, nil
}

// watchChanges starts an etcd watcher from a given revision.
// This is a long-running goroutine.
func (s *Synchronizer) watchChanges(ctx context.Context, startRev int64) {
	watchChan := s.etcdClient.Watch(ctx, s.cfg.Etcd.KeyPrefix, clientv3.WithPrefix(), clientv3.WithRev(startRev))

	s.logger.Info("starting to watch for changes", zap.Int64("from_revision", startRev))

	for {
		select {
		case <-ctx.Done():
			s.logger.Info("watch context cancelled, stopping watcher.")
			return
		case watchResp, ok := <-watchChan:
			if !ok {
				s.logger.Error("etcd watch channel closed, attempting to reconnect...")
				// In a production system, you need robust reconnection logic.
				// For simplicity, we'll just try to re-establish the watch.
				time.Sleep(5 * time.Second)
				watchChan = s.etcdClient.Watch(ctx, s.cfg.Etcd.KeyPrefix, clientv3.WithPrefix(), clientv3.WithRev(startRev))
				continue
			}
			if err := watchResp.Err(); err != nil {
				s.logger.Error("etcd watch error", zap.Error(err))
				// Handle specific errors like `etcdserver: mvcc: required revision has been compacted`
				// which would require a new full sync.
				continue
			}

			for _, event := range watchResp.Events {
				s.processEvent(ctx, event)
			}
			// Update the revision to watch from
			startRev = watchResp.Header.Revision + 1
		}
	}
}

// processEvent handles a single etcd watch event.
func (s *Synchronizer) processEvent(ctx context.Context, event *clientv3.Event) {
	tenantID := s.getTenantIDFromKey(string(event.Kv.Key))
	if tenantID == "" {
		s.logger.Warn("could not extract tenant ID from key", zap.String("key", string(event.Kv.Key)))
		return
	}
	redisKey := s.getRedisKey(tenantID)

	switch event.Type {
	case clientv3.EventTypePut: // Handle create or update
		var tenantCfg storage.TenantConfig
		if err := json.Unmarshal(event.Kv.Value, &tenantCfg); err != nil {
			s.logger.Error("failed to unmarshal updated tenant config",
				zap.String("key", string(event.Kv.Key)),
				zap.Error(err),
			)
			return
		}
		
		cfgMap, err := tenantCfg.ToMap()
		if err != nil {
			s.logger.Error("failed to convert updated tenant config to map",
				zap.String("key", string(event.Kv.Key)),
				zap.Error(err),
			)
			return
		}

		if err := s.redisClient.HSet(ctx, redisKey, cfgMap).Err(); err != nil {
			s.logger.Error("failed to update redis hash", zap.String("redis_key", redisKey), zap.Error(err))
		} else {
			s.logger.Info("successfully updated tenant config in redis", zap.String("tenant_id", tenantID))
		}

	case clientv3.EventTypeDelete: // Handle deletion
		if err := s.redisClient.Del(ctx, redisKey).Err(); err != nil {
			s.logger.Error("failed to delete redis key", zap.String("redis_key", redisKey), zap.Error(err))
		} else {
			s.logger.Info("successfully deleted tenant config from redis", zap.String("tenant_id", tenantID))
		}
	}
}

// getRedisKey constructs the key used for storing tenant config in Redis.
func (s *Synchronizer) getRedisKey(tenantID string) string {
	return fmt.Sprintf("tenant:%s:config", tenantID)
}

// getTenantIDFromKey extracts the tenant ID from an etcd key.
// e.g., "/tenants/tenant-123" -> "tenant-123"
func (s *Synchronizer) getTenantIDFromKey(key string) string {
	if !strings.HasPrefix(key, s.cfg.Etcd.KeyPrefix) {
		return ""
	}
	return strings.TrimPrefix(key, s.cfg.Etcd.KeyPrefix)
}

代码解析与关键点:

  1. Start方法: 这是入口。它首先执行fullSync来确保服务启动时,Redis的状态与etcd完全一致。这是一个关键的健壮性设计,防止服务在重启期间错过任何变更。完成全量同步后,它会获取当前的etcd版本号(Revision),并从这个版本号的下一个版本开始watch,确保无缝衔接。
  2. fullSync方法: 使用clientv3.WithPrefix()获取所有租户数据。为了提高效率,所有的Redis写入操作都通过Pipeline批量执行,这能显著减少网络往返次数。
  3. watchChanges方法: 这是增量同步的核心。它在一个无限循环中监听watchChan。这里包含了对上下文取消(ctx.Done())和watch channel关闭的健壮性处理。在真实的生产环境中,对watchResp.Err()的处理需要更精细,特别是当遇到ErrCompacted错误时,意味着watcher的起始版本号太旧,etcd已经将其压缩。此时唯一的恢复方法是重新执行一次fullSync
  4. processEvent方法: 根据事件类型(PUTDELETE)执行相应的Redis操作。PUT事件对应HSET,它会覆盖已存在的整个Hash,确保数据更新的原子性。DELETE事件对应DEL

4. 组装服务 main.go

最后,我们需要在main.go中将所有组件连接起来:加载配置、初始化客户端、创建Synchronizer实例并启动它。

// cmd/main.go
package main

import (
	"context"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/redis/go-redis/v9"
	"go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"

	"config-synchronizer/internal/config"
	"config-synchronizer/internal/service"
)

func main() {
	// Setup structured logging
	logger, err := zap.NewProduction()
	if err != nil {
		log.Fatalf("can't initialize zap logger: %v", err)
	}
	defer logger.Sync()

	// Load configuration
	cfg, err := config.Load("config.yaml")
	if err != nil {
		logger.Fatal("failed to load configuration", zap.Error(err))
	}

	// Create a context that can be cancelled on shutdown
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Initialize etcd client
	etcdClient, err := clientv3.New(clientv3.Config{
		Endpoints:   cfg.Etcd.Endpoints,
		DialTimeout: cfg.Etcd.DialTimeout,
		Username:    cfg.Etcd.Username,
		Password:    cfg.Etcd.Password,
	})
	if err != nil {
		logger.Fatal("failed to connect to etcd", zap.Error(err))
	}
	defer etcdClient.Close()
	logger.Info("successfully connected to etcd")

	// Initialize Redis client
	redisClient := redis.NewClient(&redis.Options{
		Addr:         cfg.Redis.Address,
		Password:     cfg.Redis.Password,
		DB:           cfg.Redis.DB,
		DialTimeout:  cfg.Redis.DialTimeout,
	})
	if _, err := redisClient.Ping(ctx).Result(); err != nil {
		logger.Fatal("failed to connect to redis", zap.Error(err))
	}
	defer redisClient.Close()
	logger.Info("successfully connected to redis")

	// Create and start the synchronizer
	syncService := service.NewSynchronizer(cfg, etcdClient, redisClient, logger)
	if err := syncService.Start(ctx); err != nil {
		logger.Fatal("failed to start synchronizer", zap.Error(err))
	}

	// Setup a simple health check endpoint
	router := gin.New()
	router.GET("/healthz", func(c *gin.Context) {
		// A more thorough health check would ping etcd and redis
		c.JSON(200, gin.H{"status": "ok"})
	})
	go func() {
		if err := router.Run(":8080"); err != nil {
			logger.Error("health check server failed", zap.Error(err))
		}
	}()

	// Wait for termination signal
	quit := make(chan os.Signal, 1)
	signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
	<-quit

	logger.Info("shutting down server...")
	cancel() // Cancel the context to stop the watcher
	// Give some time for graceful shutdown
	time.Sleep(2 * time.Second)
}

单元测试思路

对这种服务进行测试至关重要。

  • Synchronizer单元测试: 使用etcd-testing-servergo-redis-mock来模拟etcd和Redis环境。
    • 测试fullSync:在mock etcd中预置数据,运行fullSync,然后验证mock Redis中是否包含了正确的数据。
    • 测试watchChanges:启动watcher后,在mock etcd中模拟PUTDELETE事件,验证mock Redis中是否发生了相应的HSETDEL操作。
  • model测试: 测试TenantConfigToMap方法是否能正确处理各种字段类型。

方案的局限性与未来迭代

当前实现是一个健壮的单体同步服务,但在更大规模的部署下,仍有其局限性和优化空间。

  1. 单点瓶颈: Synchronizer服务本身是单点的。如果它崩溃,配置更新将会中断。一个直接的改进是运行多个Synchronizer实例。但这会引入写竞争问题(多个实例同时写Redis)。解决方案是利用etcd的Leader Election机制,确保在任何时刻只有一个实例是active的,负责执行同步任务,而其他实例作为standby。

  2. 大规模租户下的性能: 当租户数量达到数十万级别时,fullSync过程可能会变得很慢,给服务启动带来较长的延迟。此外,单一watcher监听大量key的变化也可能成为瓶颈。可以考虑对同步服务进行分片(Sharding),例如,实例A负责同步ID以0-7结尾的租户,实例B负责同步ID以8-F结尾的租户。每个实例只watch自己负责的key子集。

  3. 数据校验: 当前实现完全信任etcd中的数据。一个更安全的做法是在processEvent中增加一个校验层,确保反序列化后的配置对象符合业务规则(例如,RateLimit不能为负数),防止错误或恶意的数据污染缓存。

  4. 可观测性: 生产级的服务必须有完善的监控。需要通过Prometheus等工具暴露关键指标,例如:

    • sync_latency_seconds:从etcd事件发生到Redis更新完成的延迟。
    • sync_events_total:处理的PUT/DELETE事件总数。
    • sync_errors_total:同步过程中发生的错误总数。
    • last_successful_sync_timestamp:最后一次成功同步的时间戳,用于监控服务活性。

  目录