feat(cache): add digest-based cache reuse and GC for models#31
feat(cache): add digest-based cache reuse and GC for models#31
Conversation
Signed-off-by: chlins <chlins.zhang@gmail.com>
📊 Code Coverage Report
📦 Per-package breakdown |
There was a problem hiding this comment.
Code Review
This pull request introduces a local caching mechanism for model files to optimize pull operations. It includes logic to resolve model digests, store models in a centralized cache directory using hardlinks to save space, and a background garbage collection process to remove expired entries. A critical issue was identified in the PullModel implementation where the use of singleflight causes concurrent requests for the same model to fail for all but the first caller due to volume-specific side effects being trapped inside the shared execution block. Additionally, the garbage collection logic lacks awareness of inflight pull operations, which could lead to premature deletion of cache entries.
| _, err, _ = worker.inflight.Do("cache-"+resolvedDigest, func() (interface{}, error) { | ||
| sourceModelDir, found, err := worker.getCachedModelDir(resolvedDigest) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if !found { | ||
| cacheTmpModelDir := cacheModelDir + ".pulling" | ||
| if err := os.MkdirAll(filepath.Dir(cacheModelDir), 0755); err != nil { | ||
| return nil, errors.Wrapf(err, "create cache dir: %s", cacheModelDir) | ||
| } | ||
| if err := os.RemoveAll(cacheTmpModelDir); err != nil { | ||
| return nil, errors.Wrapf(err, "cleanup temporary cache model dir: %s", cacheTmpModelDir) | ||
| } | ||
| err = worker.pullModel( | ||
| ctx, | ||
| statusPath, | ||
| volumeName, | ||
| mountID, | ||
| reference, | ||
| resolvedDigest, | ||
| cacheTmpModelDir, | ||
| checkDiskQuota, | ||
| excludeModelWeights, | ||
| excludeFilePatterns, | ||
| ) | ||
| if err != nil { | ||
| _ = os.RemoveAll(cacheTmpModelDir) | ||
| return nil, err | ||
| } | ||
| if err := os.RemoveAll(cacheModelDir); err != nil { | ||
| _ = os.RemoveAll(cacheTmpModelDir) | ||
| return nil, errors.Wrapf(err, "cleanup cache model dir before rename: %s", cacheModelDir) | ||
| } | ||
| if err := os.Rename(cacheTmpModelDir, cacheModelDir); err != nil { | ||
| _ = os.RemoveAll(cacheTmpModelDir) | ||
| return nil, errors.Wrapf(err, "rename cache model dir to %s", cacheModelDir) | ||
| } | ||
| sourceModelDir = cacheModelDir | ||
| } | ||
|
|
||
| if err := os.MkdirAll(filepath.Dir(modelDir), 0755); err != nil { | ||
| return nil, errors.Wrapf(err, "create mount dir for model: %s", modelDir) | ||
| } | ||
| if err := linkModelDir(sourceModelDir, modelDir); err != nil { | ||
| return nil, err | ||
| } | ||
| _, err = worker.sm.Set(statusPath, status.Status{ | ||
| VolumeName: volumeName, | ||
| MountID: mountID, | ||
| Reference: reference, | ||
| ResolvedDigest: resolvedDigest, | ||
| Reused: found, | ||
| State: status.StatePullSucceeded, | ||
| }) | ||
| if err != nil { | ||
| return nil, errors.Wrapf(err, "set model status after linking from cache") | ||
| } | ||
| logger.WithContext(ctx).Infof( | ||
| "linked model from cache: reference=%s digest=%s source=%s target=%s reused=%t", | ||
| reference, resolvedDigest, sourceModelDir, modelDir, found, | ||
| ) | ||
| return nil, nil | ||
| }) |
There was a problem hiding this comment.
There is a critical bug in the use of singleflight.Do here. The singleflight key is based on the resolvedDigest, which is shared across all volumes pulling the same model. However, the function inside Do performs a side effect—linking the model to a volume-specific modelDir. If multiple volumes request the same digest concurrently, only the first one will execute the linking logic for its specific directory. Subsequent callers will receive the result of the first call but their own modelDir will remain empty.
Additionally, the cache path in PullModel lacks a kmutex lock for the volume, which could lead to race conditions if multiple operations are performed on the same volume concurrently.
To fix this, the logic that ensures the model is in the cache should be separated from the logic that links the cached model to the volume directory.
cacheKey := "cache-" + resolvedDigest
_, err, _ = worker.inflight.Do(cacheKey, func() (interface{}, error) {
sourceModelDir, found, err := worker.getCachedModelDir(resolvedDigest)
if err != nil {
return nil, err
}
if found {
return nil, nil
}
cacheTmpModelDir := cacheModelDir + ".pulling"
if err := os.MkdirAll(filepath.Dir(cacheModelDir), 0755); err != nil {
return nil, errors.Wrapf(err, "create cache dir: %s", cacheModelDir)
}
if err := os.RemoveAll(cacheTmpModelDir); err != nil {
return nil, errors.Wrapf(err, "cleanup temporary cache model dir: %s", cacheTmpModelDir)
}
err = worker.pullModel(
ctx,
statusPath,
volumeName,
mountID,
reference,
resolvedDigest,
cacheTmpModelDir,
checkDiskQuota,
excludeModelWeights,
excludeFilePatterns,
)
if err != nil {
_ = os.RemoveAll(cacheTmpModelDir)
return nil, err
}
if err := os.RemoveAll(cacheModelDir); err != nil {
_ = os.RemoveAll(cacheTmpModelDir)
return nil, errors.Wrapf(err, "cleanup cache model dir before rename: %s", cacheModelDir)
}
if err := os.Rename(cacheTmpModelDir, cacheModelDir); err != nil {
_ = os.RemoveAll(cacheTmpModelDir)
return nil, errors.Wrapf(err, "rename cache model dir to %s", cacheModelDir)
}
return nil, nil
})
if err != nil {
metrics.NodeOpObserve("pull_image", start, err)
if !errors.Is(err, ErrConflict) {
_ = worker.DeleteModel(ctx, isStaticVolume, volumeName, mountID)
}
return err
}
sourceModelDir, found, err := worker.getCachedModelDir(resolvedDigest)
if err != nil {
return err
}
if err := os.MkdirAll(filepath.Dir(modelDir), 0755); err != nil {
return errors.Wrapf(err, "create mount dir for model: %s", modelDir)
}
if err := linkModelDir(sourceModelDir, modelDir); err != nil {
return err
}
_, err = worker.sm.Set(statusPath, status.Status{
VolumeName: volumeName,
MountID: mountID,
Reference: reference,
ResolvedDigest: resolvedDigest,
Reused: found,
State: status.StatePullSucceeded,
})
if err != nil {
return errors.Wrapf(err, "set model status after linking from cache")
}
logger.WithContext(ctx).Infof(
"linked model from cache: reference=%s digest=%s source=%s target=%s reused=%t",
reference, resolvedDigest, sourceModelDir, modelDir, found,
)| func (cm *CacheManager) gc() error { | ||
| active, err := cm.activeCacheKeys() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| cacheRoot := cm.cfg.Get().GetCacheSHA256Dir() | ||
| entries, err := os.ReadDir(cacheRoot) | ||
| if err != nil { | ||
| if os.IsNotExist(err) { | ||
| return nil | ||
| } | ||
| return errors.Wrapf(err, "read cache root: %s", cacheRoot) | ||
| } | ||
|
|
||
| deadline := time.Now().Add(-CacheTTL) | ||
| for _, entry := range entries { | ||
| if !entry.IsDir() { | ||
| continue | ||
| } | ||
| name := entry.Name() | ||
| if _, ok := active[name]; ok { | ||
| continue | ||
| } | ||
| info, err := entry.Info() | ||
| if err != nil { | ||
| logger.Logger().WithError(err).Warnf("stat cache dir: %s", name) | ||
| continue | ||
| } | ||
| if info.ModTime().After(deadline) { | ||
| continue | ||
| } | ||
| dirPath := filepath.Join(cacheRoot, name) | ||
| if err := os.RemoveAll(dirPath); err != nil { | ||
| logger.Logger().WithError(err).Warnf("remove cache dir: %s", dirPath) | ||
| continue | ||
| } | ||
| logger.Logger().Infof("removed expired cache dir: %s", dirPath) | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
The garbage collection logic has a potential race condition. activeCacheKeys is calculated by scanning mounted volumes, but a PullModel operation for a new volume might be in progress and not yet reflected in the status files. If the GC runs at that moment and the cache entry is older than CacheTTL, it could be deleted while PullModel is attempting to link from it. While the 1-hour TTL provides a safety buffer, it's better to ensure that entries currently being pulled or linked are also considered active.
This pull request introduces a model cache and reuse mechanism to improve model mounting performance and reduce redundant downloads. It adds a cache directory structure, cache management with garbage collection (GC), and logic to reuse cached models via hardlinks when possible. The changes also enhance model status tracking with new fields.
Key changes include:
Model cache and reuse implementation
RawConfigwith helper methods for cache path management, including methods likeGetCacheDir,GetCacheSHA256Dir,GetCacheKey, andGetCacheModelDir(pkg/config/config.go).pkg/service/reuse.gomodule with logic to resolve model digests, check for cached models, and hardlink cached model directories for reuse.Worker.PullModelto resolve the digest for a model reference, check for a cached model, and hardlink it into place if available, falling back to a direct pull if needed (pkg/service/worker.go).pullModelto accept and record the resolved digest and reuse status in the model status, and to support the new cache workflow (pkg/service/worker.go). [1] [2]Cache management and garbage collection
CacheManagerinpkg/service/cache.goto scan mounted models more cleanly, track active cache keys, and periodically remove unused cache directories older than a TTL (default 1 hour). Added GC and scan intervals as configuration variables. [1] [2] [3] [4] [5] [6]Model status enhancements
status.Statusstruct withResolvedDigestandReusedfields to track the resolved digest and whether the model was reused from cache (pkg/status/status.go).These changes together enable efficient model sharing between mounts, reduce redundant downloads, and provide a foundation for future improvements in cache management and reporting.