背景
在解决批量上传产生游离文件的问题后(通过互斥锁保证原子性),发现新的性能问题:
- 每次上传完成都需要串行更新元数据
- 每次更新包含:读取 S3 → 解密 → 修改 → 加密 → 上传 S3
- 单次更新约 100-200ms,100 个文件额外增加 10-20 秒
需要在保证正确性的前提下优化性能。
设计方案
核心思路:自适应批量写入
利用 Go 的 channel + goroutine 实现生产者-消费者模式:
- 所有元数据更新请求发送到 channel
- 专门的 worker goroutine 收集请求
- 采用”时间 + 数量”双触发策略
- 批量合并后一次性写入
触发策略
┌─────────────────────────────────────────────────┐│ 第一个请求到达 → 启动 50ms 计时器 ││ ││ 等待期间: ││ ├─ 持续收集新请求 ││ ├─ 达到 20 个 → 立即批量写入 ││ └─ 50ms 超时 → 写入当前积攒的 │└─────────────────────────────────────────────────┘性能对比
| 场景 | 原方案(互斥锁) | 新方案(批量写入) |
|---|---|---|
| 20个文件并发 | 20次读写 | 1次读写 |
| 单个文件上传 | 即时完成 | 最多延迟50ms |
| 元数据一致性 | ✓ 保证 | ✓ 保证 |
实现细节
1. 数据结构
// 元数据更新请求
type metaUpdateRequest struct {
file *FileMetadata
done chan error // 用于通知调用者结果
}
// Server 添加 channel
type Server struct {
// ...
metaUpdateChan chan *metaUpdateRequest
}2. Worker 初始化
func NewServer(cfg *config.Config) (*Server, error) {
server := &Server{
// ...
metaUpdateChan: make(chan *metaUpdateRequest, 100), // 缓冲100个请求
}
// 启动批量写入 worker
go server.metaWriterWorker()
// ...
}3. 更新接口(对调用者透明)
func (s *Server) updateMetadataIndex(ctx context.Context, file *FileMetadata) error {
req := &metaUpdateRequest{
file: file,
done: make(chan error, 1),
}
select {
case s.metaUpdateChan <- req:
// 请求已发送
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-req.done:
return err
case <-ctx.Done():
return ctx.Err()
}
}4. Worker 核心逻辑
func (s *Server) metaWriterWorker() {
const (
maxBatchSize = 20
batchTimeout = 50 * time.Millisecond
)
var pending []*metaUpdateRequest
var timer <-chan time.Time
for {
select {
case req := <-s.metaUpdateChan:
pending = append(pending, req)
if len(pending) == 1 {
// 第一个请求,启动计时器
timer = time.After(batchTimeout)
}
if len(pending) >= maxBatchSize {
// 达到批量上限,立即写入
s.flushMetaUpdates(pending)
pending = nil
timer = nil
}
case <-timer:
// 超时,写入当前积攒的
if len(pending) > 0 {
s.flushMetaUpdates(pending)
pending = nil
}
timer = nil
}
}
}5. 批量写入
func (s *Server) flushMetaUpdates(requests []*metaUpdateRequest) {
ctx := context.Background()
// 1. 读取当前元数据
meta, err := s.loadMetadataIndex(ctx)
if err != nil {
// 通知所有请求失败
for _, req := range requests {
req.done <- err
}
return
}
// 2. 批量添加所有文件
for _, req := range requests {
meta.Files[req.file.ID] = req.file
}
// 3. 一次性保存
err = s.saveMetadataIndex(ctx, meta)
// 4. 通知所有等待者
for _, req := range requests {
req.done <- err
}
}技术亮点
1. 零改动调用方
updateMetadataIndex 接口保持不变,所有调用方(上传、删除、重命名等)无需修改。
2. 背压处理
channel 缓冲 100 个请求,超出时调用者会阻塞,自然形成背压。
3. 上下文取消支持
使用 select 监听 ctx.Done(),支持请求超时和取消。
4. 优雅的批量合并
- 高并发:多个请求自动合并
- 低并发:最多 50ms 延迟
- 可调参数:
maxBatchSize和batchTimeout
参数选择依据
| 参数 | 值 | 理由 |
|---|---|---|
| maxBatchSize | 20 | 平衡内存占用和合并效率 |
| batchTimeout | 50ms | 几乎无感知的延迟 |
| channel buffer | 100 | 支持突发流量,避免频繁阻塞 |
后续优化方向(未实施)
- 删除/移动也走批量通道:统一所有元数据操作
- 内存缓存:避免每次都读 S3,但增加一致性风险
- 动态参数调整:根据负载自动调整超时和批量大小
总结
通过 Go channel + worker 模式实现了自适应批量写入:
- 正确性:保证元数据一致性(单一 worker 串行写入)
- 性能:高并发时 N 次读写合并为 1 次
- 延迟:单个请求最多延迟 50ms
- 优雅:调用方无需任何修改