December 20, 2025
3 min read
By devshan

Table of Contents

This is a list of all the sections in this post. Click on any of them to jump to that section.

背景

在解决批量上传产生游离文件的问题后(通过互斥锁保证原子性),发现新的性能问题:

  • 每次上传完成都需要串行更新元数据
  • 每次更新包含:读取 S3 → 解密 → 修改 → 加密 → 上传 S3
  • 单次更新约 100-200ms,100 个文件额外增加 10-20 秒

需要在保证正确性的前提下优化性能。


设计方案

核心思路:自适应批量写入

利用 Go 的 channel + goroutine 实现生产者-消费者模式:

  1. 所有元数据更新请求发送到 channel
  2. 专门的 worker goroutine 收集请求
  3. 采用”时间 + 数量”双触发策略
  4. 批量合并后一次性写入

触发策略

┌─────────────────────────────────────────────────┐
│ 第一个请求到达 → 启动 50ms 计时器 │
│ │
│ 等待期间: │
│ ├─ 持续收集新请求 │
│ ├─ 达到 20 个 → 立即批量写入 │
│ └─ 50ms 超时 → 写入当前积攒的 │
└─────────────────────────────────────────────────┘

性能对比

场景原方案(互斥锁)新方案(批量写入)
20个文件并发20次读写1次读写
单个文件上传即时完成最多延迟50ms
元数据一致性✓ 保证✓ 保证

实现细节

1. 数据结构

// 元数据更新请求
type metaUpdateRequest struct {
    file *FileMetadata
    done chan error  // 用于通知调用者结果
}
 
// Server 添加 channel
type Server struct {
    // ...
    metaUpdateChan chan *metaUpdateRequest
}

2. Worker 初始化

func NewServer(cfg *config.Config) (*Server, error) {
    server := &Server{
        // ...
        metaUpdateChan: make(chan *metaUpdateRequest, 100), // 缓冲100个请求
    }
    
    // 启动批量写入 worker
    go server.metaWriterWorker()
    
    // ...
}

3. 更新接口(对调用者透明)

func (s *Server) updateMetadataIndex(ctx context.Context, file *FileMetadata) error {
    req := &metaUpdateRequest{
        file: file,
        done: make(chan error, 1),
    }
 
    select {
    case s.metaUpdateChan <- req:
        // 请求已发送
    case <-ctx.Done():
        return ctx.Err()
    }
 
    select {
    case err := <-req.done:
        return err
    case <-ctx.Done():
        return ctx.Err()
    }
}

4. Worker 核心逻辑

func (s *Server) metaWriterWorker() {
    const (
        maxBatchSize = 20
        batchTimeout = 50 * time.Millisecond
    )
 
    var pending []*metaUpdateRequest
    var timer <-chan time.Time
 
    for {
        select {
        case req := <-s.metaUpdateChan:
            pending = append(pending, req)
 
            if len(pending) == 1 {
                // 第一个请求,启动计时器
                timer = time.After(batchTimeout)
            }
 
            if len(pending) >= maxBatchSize {
                // 达到批量上限,立即写入
                s.flushMetaUpdates(pending)
                pending = nil
                timer = nil
            }
 
        case <-timer:
            // 超时,写入当前积攒的
            if len(pending) > 0 {
                s.flushMetaUpdates(pending)
                pending = nil
            }
            timer = nil
        }
    }
}

5. 批量写入

func (s *Server) flushMetaUpdates(requests []*metaUpdateRequest) {
    ctx := context.Background()
 
    // 1. 读取当前元数据
    meta, err := s.loadMetadataIndex(ctx)
    if err != nil {
        // 通知所有请求失败
        for _, req := range requests {
            req.done <- err
        }
        return
    }
 
    // 2. 批量添加所有文件
    for _, req := range requests {
        meta.Files[req.file.ID] = req.file
    }
 
    // 3. 一次性保存
    err = s.saveMetadataIndex(ctx, meta)
 
    // 4. 通知所有等待者
    for _, req := range requests {
        req.done <- err
    }
}

技术亮点

1. 零改动调用方

updateMetadataIndex 接口保持不变,所有调用方(上传、删除、重命名等)无需修改。

2. 背压处理

channel 缓冲 100 个请求,超出时调用者会阻塞,自然形成背压。

3. 上下文取消支持

使用 select 监听 ctx.Done(),支持请求超时和取消。

4. 优雅的批量合并

  • 高并发:多个请求自动合并
  • 低并发:最多 50ms 延迟
  • 可调参数:maxBatchSizebatchTimeout

参数选择依据

参数理由
maxBatchSize20平衡内存占用和合并效率
batchTimeout50ms几乎无感知的延迟
channel buffer100支持突发流量,避免频繁阻塞

后续优化方向(未实施)

  1. 删除/移动也走批量通道:统一所有元数据操作
  2. 内存缓存:避免每次都读 S3,但增加一致性风险
  3. 动态参数调整:根据负载自动调整超时和批量大小

总结

通过 Go channel + worker 模式实现了自适应批量写入:

  • 正确性:保证元数据一致性(单一 worker 串行写入)
  • 性能:高并发时 N 次读写合并为 1 次
  • 延迟:单个请求最多延迟 50ms
  • 优雅:调用方无需任何修改