Go实现缓存分页拉取

本文描述实现一个缓存分页拉取系统,并尽量减少远程调用。且本地缓存系统添加一个最大缓存容量(MaxSize)限制。当缓存的数据量超过该上限时,使用LRU(最近最少使用)策略淘汰旧的数据。

1. 功能目标

  • 入参:offset, size

  • 出参:length, output, err

  • 使用本地缓存减少远程调用

  • 如果本地有缓存的数据则直接返回,否则调用远程接口,并缓存结果

其中:

概念 含义 类比
offset 从哪里开始(起始位置) 从第几页看书
size 要读多少(长度/数量) 看几页内容

示例:

原始数据:       h   e   l   l   o       w   o   r   l   d
字节索引:        0   1   2   3   4   5   6   7   8   9  10
                |------------|
offset=6size=5

输出结果:        w   o   r   l   d

2. 简单版本

2.1. 接口定义

// Fetcher 是远程数据获取接口,返回 offset 起始的 size 个字节的数据。
type Fetcher interface {
    Fetch(offset int, size int) ([]byte, error)
}

// Cache 是本地缓存结构体。
type Cache struct {
    mu     sync.RWMutex
    data   map[int]byte // offset -> byte
    fetcher Fetcher
}

2.2. 方法实现

func NewCache(fetcher Fetcher) *Cache {
    return &Cache{
        data:    make(map[int]byte),
        fetcher: fetcher,
    }
}

// Read 尝试从本地缓存读取 offset 起 size 个字节数据,缺失的部分会调用 fetcher.Fetch。
func (c *Cache) Read(offset int, size int) (length int, output []byte, err error) {
    c.mu.RLock()
    defer c.mu.RUnlock()

    output = make([]byte, 0, size)
    missing := make([]int, 0)

    // 记录已有缓存数据和缺失的 offset
    for i := 0; i < size; i++ {
        off := offset + i
        if val, ok := c.data[off]; ok {
            output = append(output, val)
        } else {
            missing = append(missing, off)
        }
    }

    // 如果没有缺失,直接返回
    if len(missing) == 0 {
        return len(output), output, nil
    }

    c.mu.RUnlock()  // 需要升级为写锁
    c.mu.Lock()
    defer c.mu.Unlock()

    // 再次检查(避免并发重复拉取)
    retryMissing := make([]int, 0)
    for _, off := range missing {
        if _, ok := c.data[off]; !ok {
            retryMissing = append(retryMissing, off)
        }
    }

    if len(retryMissing) > 0 {
        // 合并成连续段拉取(这里简化处理:一次拉取整段)
        start := retryMissing[0]
        end := retryMissing[len(retryMissing)-1] + 1
        fetched, err := c.fetcher.Fetch(start, end-start)
        if err != nil {
            return 0, nil, err
        }
        for i, b := range fetched {
            c.data[start+i] = b
        }
    }

    // 最终重新构建输出
    output = make([]byte, 0, size)
    for i := 0; i < size; i++ {
        b, ok := c.data[offset+i]
        if !ok {
            return 0, nil, fmt.Errorf("missing data at offset %d", offset+i)
        }
        output = append(output, b)
    }

    return len(output), output, nil
}

2.3. 远程接口实现

type DummyFetcher struct {
    Source []byte
}

func (f *DummyFetcher) Fetch(offset int, size int) ([]byte, error) {
    if offset >= len(f.Source) {
        return nil, fmt.Errorf("offset out of range")
    }
    end := offset + size
    if end > len(f.Source) {
        end = len(f.Source)
    }
    return f.Source[offset:end], nil
}

3. 增强版本(LRU淘汰策略)

为上面的本地缓存系统添加一个最大缓存容量(MaxSize)限制。当缓存的数据量超过该上限时,使用LRU(最近最少使用)策略淘汰旧的数据。

3.1. 更新结构体:增加 LRU 支持和 MaxSize 限制

import (
    "container/list"
    "fmt"
    "sync"
)

// 缓存项
type cacheItem struct {
    offset int
    value  byte
}

type Cache struct {
    mu       sync.RWMutex
    data     map[int]*list.Element
    lruList  *list.List
    maxSize  int
    fetcher  Fetcher
}

func NewCache(fetcher Fetcher, maxSize int) *Cache {
    return &Cache{
        data:    make(map[int]*list.Element),
        lruList: list.New(),
        maxSize: maxSize,
        fetcher: fetcher,
    }
}

3.2. 插入数据并更新 LRU

// 更新本地缓存
func (c *Cache) put(offset int, b byte) {
    if elem, exists := c.data[offset]; exists {
        c.lruList.MoveToFront(elem)
        elem.Value.(*cacheItem).value = b
        return
    }

    // 插入新元素
    elem := c.lruList.PushFront(&cacheItem{offset: offset, value: b})
    c.data[offset] = elem

    // 检查容量限制
    if c.lruList.Len() > c.maxSize {
        // 淘汰最旧的元素
        last := c.lruList.Back()
        if last != nil {
            item := last.Value.(*cacheItem)
            delete(c.data, item.offset)
            c.lruList.Remove(last)
        }
    }
}

// 获取本地缓存
func (c *Cache) get(offset int) (byte, bool) {
    if elem, ok := c.data[offset]; ok {
        c.lruList.MoveToFront(elem)
        return elem.Value.(*cacheItem).value, true
    }
    return 0, false
}

3.3. 修改 Read 方法以支持 LRU 缓存逻辑

func (c *Cache) Read(offset int, size int) (length int, output []byte, err error) {
    c.mu.RLock()
    output = make([]byte, 0, size)
    missing := make([]int, 0)

    for i := 0; i < size; i++ {
        off := offset + i
        if val, ok := c.get(off); ok {
            output = append(output, val)
        } else {
            missing = append(missing, off)
        }
    }
    c.mu.RUnlock()

    if len(missing) == 0 {
        return len(output), output, nil
    }

    c.mu.Lock()
    defer c.mu.Unlock()

    retryMissing := make([]int, 0)
    for _, off := range missing {
        if _, ok := c.data[off]; !ok {
            retryMissing = append(retryMissing, off)
        }
    }

    if len(retryMissing) > 0 {
        start := retryMissing[0]
        end := retryMissing[len(retryMissing)-1] + 1
        fetched, err := c.fetcher.Fetch(start, end-start)
        if err != nil {
            return 0, nil, err
        }
        for i, b := range fetched {
            // 存入本地缓存
            c.put(start+i, b)
        }
    }

    // 构建完整输出
    output = make([]byte, 0, size)
    for i := 0; i < size; i++ {
        b, ok := c.get(offset + i)  // 获取本地缓存
        if !ok {
            return 0, nil, fmt.Errorf("missing data at offset %d", offset+i)
        }
        output = append(output, b)
    }

    return len(output), output, nil
}

3.4. 使用用例

func main() {
    // 假设这是远程数据源
    src := []byte("The quick brown fox jumps over the lazy dog.")

    // 创建一个最大容量为 1000 字节的缓存
    fetcher := &DummyFetcher{Source: src}
    cache := NewCache(fetcher, 1000)

    // 第一次读取 offset=10, size=10,需要远程拉取
    length, out, err := cache.Read(10, 10)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Length: %d, Output: %s\n", length, string(out))

    // 第二次读取相同范围,不会触发远程拉取
    length, out, err = cache.Read(10, 10)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Length: %d, Output: %s (from cache)\n", length, string(out))

    // 另一次读取不同范围 offset=20, size=15,可能触发远程拉取
    length, out, err = cache.Read(20, 15)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Length: %d, Output: %s\n", length, string(out))
}

// 输出
2025/06/17 11:23:12 Remote fetch: offset=10 size=10
Length: 10, Output: brown fox 
Length: 10, Output: brown fox  (from cache)
2025/06/17 11:23:12 Remote fetch: offset=20 size=15
Length: 15, Output: jumps over the