Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion TODO_s3_review_fixes.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
## Priority 1 (Critical)

- [x] 1. createMultipartUpload の IsTxn=true 化 — 分散環境での原子性保証
- [x] 2. セマフォ満杯時の GC 回復 — goroutine内でブロッキング待機に変更
- [x] 2. セマフォ満杯時の GC 回復 — non-blocking select でセマフォ満杯時に cleanup を skip(goroutine スポーン前に semaphore 取得済み、無限 goroutine 蓄積を防止)
- [x] 3. completeMultipartUpload リトライ最適化 — パート検証を retry 外へ分離

## Priority 2 (Medium)
Expand Down
104 changes: 69 additions & 35 deletions adapter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@ type s3ObjectManifest struct {
}

type s3ObjectPart struct {
PartNo uint64 `json:"part_no"`
ETag string `json:"etag"`
SizeBytes int64 `json:"size_bytes"`
ChunkCount uint64 `json:"chunk_count"`
ChunkSizes []uint64 `json:"chunk_sizes,omitempty"`
PartNo uint64 `json:"part_no"`
ETag string `json:"etag"`
SizeBytes int64 `json:"size_bytes"`
ChunkCount uint64 `json:"chunk_count"`
ChunkSizes []uint64 `json:"chunk_sizes,omitempty"`
PartVersion uint64 `json:"part_version,omitempty"`
}

type s3ContinuationToken struct {
Expand Down Expand Up @@ -189,11 +190,12 @@ type s3UploadMeta struct {
}

type s3PartDescriptor struct {
PartNo uint64 `json:"part_no"`
ETag string `json:"etag"`
SizeBytes int64 `json:"size_bytes"`
ChunkCount uint64 `json:"chunk_count"`
ChunkSizes []uint64 `json:"chunk_sizes,omitempty"`
PartNo uint64 `json:"part_no"`
ETag string `json:"etag"`
SizeBytes int64 `json:"size_bytes"`
ChunkCount uint64 `json:"chunk_count"`
ChunkSizes []uint64 `json:"chunk_sizes,omitempty"`
PartVersion uint64 `json:"part_version,omitempty"`
}

type s3InitiateMultipartUploadResult struct {
Expand Down Expand Up @@ -761,23 +763,43 @@ func (s *S3Server) getObject(w http.ResponseWriter, r *http.Request, bucket stri
}

rangeHeader := strings.TrimSpace(r.Header.Get("Range"))
if headOnly || rangeHeader == "" {
writeS3ObjectHeaders(w.Header(), manifest)
if rangeHeader != "" {
w.Header().Set("Accept-Ranges", "bytes")

// HEAD requests: mirror GET range semantics but never write a body.
if headOnly {
if rangeHeader == "" {
writeS3ObjectHeaders(w.Header(), manifest)
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusOK)
if headOnly {
rangeStart, rangeEnd, ok := parseS3RangeHeader(rangeHeader, manifest.SizeBytes)
if !ok {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.SizeBytes))
writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange",
"range not satisfiable", bucket, objectKey)
return
}
contentLength := rangeEnd - rangeStart + 1
writeS3ObjectHeaders(w.Header(), manifest)
w.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10))
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rangeStart, rangeEnd, manifest.SizeBytes))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
return
}

// GET without Range: stream the full object.
if rangeHeader == "" {
writeS3ObjectHeaders(w.Header(), manifest)
w.WriteHeader(http.StatusOK)
s.streamObjectChunks(w, r, bucket, meta.Generation, objectKey, manifest, readTS, 0, manifest.SizeBytes)
return
}

rangeStart, rangeEnd, ok := parseS3RangeHeader(rangeHeader, manifest.SizeBytes)
if !ok {
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.SizeBytes))
writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange",
fmt.Sprintf("bytes */%d", manifest.SizeBytes), bucket, objectKey)
"range not satisfiable", bucket, objectKey)
return
}

Expand Down Expand Up @@ -817,7 +839,7 @@ func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bu
)
return
}
chunkKey := s3keys.BlobKey(bucket, generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex)
chunkKey := s3keys.VersionedBlobKey(bucket, generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex, part.PartVersion)
chunk, err := s.store.GetAt(r.Context(), chunkKey, readTS)
if err != nil {
slog.ErrorContext(r.Context(), "streamObjectChunks: GetAt failed",
Expand Down Expand Up @@ -1043,6 +1065,22 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str
}

r.Body = http.MaxBytesReader(w, r.Body, s3MaxPartSizeBytes)

// Pre-allocate the part's commit timestamp before writing any blob chunks so
// that the same version identifier is used for every chunk in this attempt.
// Embedding partCommitTS as PartVersion in the blob keys isolates each
// UploadPart attempt in its own key space: a concurrent or retried request
// for the same partNo receives a different timestamp and therefore writes to
// different keys, leaving the chunk data referenced by an in-progress
// CompleteMultipartUpload untouched.
partReadTS := s.readTS()
partStartTS := s.txnStartTS(partReadTS)
partCommitTS, err := s.nextTxnCommitTS(partStartTS)
if err != nil {
writeS3InternalError(w, err)
return
}

hasher := md5.New() //nolint:gosec // S3 ETag compatibility requires MD5.
sizeBytes := int64(0)
chunkNo := uint64(0)
Expand Down Expand Up @@ -1084,7 +1122,7 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str
writeS3InternalError(w, err)
return
}
chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo)
chunkKey := s3keys.VersionedBlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo, partCommitTS)
pendingBatch = append(pendingBatch, &kv.Elem[kv.OP]{Op: kv.Put, Key: chunkKey, Value: chunk})
cs, err := uint64FromInt(n)
if err != nil {
Expand Down Expand Up @@ -1120,34 +1158,29 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str

etag := hex.EncodeToString(hasher.Sum(nil))
partDesc := &s3PartDescriptor{
PartNo: partNo,
ETag: etag,
SizeBytes: sizeBytes,
ChunkCount: chunkNo,
ChunkSizes: chunkSizes,
PartNo: partNo,
ETag: etag,
SizeBytes: sizeBytes,
ChunkCount: chunkNo,
ChunkSizes: chunkSizes,
PartVersion: partCommitTS,
}
descBody, err := json.Marshal(partDesc)
if err != nil {
writeS3InternalError(w, err)
return
}
partKey := s3keys.UploadPartKey(bucket, meta.Generation, objectKey, uploadID, partNo)
startTS := s.txnStartTS(readTS)
partCommitTS, err := s.nextTxnCommitTS(startTS)
if err != nil {
writeS3InternalError(w, err)
return
}
if _, err := s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
StartTS: partStartTS,
CommitTS: partCommitTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: partKey, Value: descBody},
},
}); err != nil {
// Clean up orphaned blob chunks so they don't accumulate in the store.
s.cleanupPartBlobsAsync(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo)
s.cleanupPartBlobsAsync(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo, partCommitTS)
writeS3InternalError(w, err)
return
}
Expand Down Expand Up @@ -1522,7 +1555,8 @@ func (s *S3Server) cleanupUploadParts(ctx context.Context, bucket string, genera
// cleanupPartBlobsAsync asynchronously deletes the blob chunk keys for a single
// upload part. It is used to garbage-collect orphaned chunks when a part
// descriptor write fails after the chunks have already been committed.
func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objectKey string, uploadID string, partNo uint64, chunkCount uint64) {
// partVersion must match the value used when writing the chunk keys.
func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objectKey string, uploadID string, partNo uint64, chunkCount uint64, partVersion uint64) {
select {
case s.cleanupSem <- struct{}{}:
default:
Expand Down Expand Up @@ -1552,7 +1586,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec
for i := uint64(0); i < chunkCount; i++ {
pending = append(pending, &kv.Elem[kv.OP]{
Op: kv.Del,
Key: s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i),
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, partNo, i, partVersion),
})
if len(pending) >= s3ChunkBatchOps {
flush()
Expand Down Expand Up @@ -1859,7 +1893,7 @@ func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string,
}
pending = append(pending, &kv.Elem[kv.OP]{
Op: kv.Del,
Key: s3keys.BlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex),
Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion),
})
if len(pending) >= s3ChunkBatchOps {
flush()
Expand Down
14 changes: 5 additions & 9 deletions adapter/s3_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,18 +264,14 @@ func normalizeS3PayloadHash(raw string) string {
const s3PresignMaxExpiry = 7 * 24 * 60 * 60 // 604800 seconds (7 days)

// checkPresignExpiry validates the expiry of a presigned request.
// It returns an *s3AuthError if the request has expired or the expiry is invalid.
// It returns an *s3AuthError if X-Amz-Expires is missing, invalid, or the URL has expired.
func checkPresignExpiry(expiresStr string, signingTime time.Time) *s3AuthError {
if expiresStr == "" {
// No explicit expiry: fall back to clock skew check.
skew := time.Now().UTC().Sub(signingTime.UTC())
if skew < 0 {
skew = -skew
}
if skew > s3RequestTimeMaxSkew {
return &s3AuthError{Status: http.StatusForbidden, Code: "AccessDenied", Message: "presigned URL has expired"}
return &s3AuthError{
Status: http.StatusForbidden,
Code: "AuthorizationQueryParametersError",
Message: "X-Amz-Expires is required for presigned URLs",
}
return nil
}
expires, err := strconv.Atoi(expiresStr)
if err != nil || expires <= 0 || expires > s3PresignMaxExpiry {
Expand Down
104 changes: 102 additions & 2 deletions adapter/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,97 @@ func TestS3Server_RangeReadEmptyObject(t *testing.T) {
require.Equal(t, http.StatusRequestedRangeNotSatisfiable, rec.Code)
}

func TestS3Server_HeadWithRange(t *testing.T) {
t.Parallel()

st := store.NewMVCCStore()
server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil)

rec := httptest.NewRecorder()
server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-head-range", nil))
require.Equal(t, http.StatusOK, rec.Code)

payload := "hello ranged head"
rec = httptest.NewRecorder()
server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-head-range/obj.txt", strings.NewReader(payload)))
require.Equal(t, http.StatusOK, rec.Code)

// HEAD with valid Range: expect 206 + Content-Range, no body.
rec = httptest.NewRecorder()
req := newS3TestRequest(http.MethodHead, "/bucket-head-range/obj.txt", nil)
req.Header.Set("Range", "bytes=0-4")
server.handle(rec, req)
require.Equal(t, http.StatusPartialContent, rec.Code)
require.Equal(t, "bytes 0-4/17", rec.Header().Get("Content-Range"))
require.Equal(t, "5", rec.Header().Get("Content-Length"))
require.Empty(t, rec.Body.String())

// HEAD with out-of-range Range: expect 416 + Content-Range header.
rec = httptest.NewRecorder()
req = newS3TestRequest(http.MethodHead, "/bucket-head-range/obj.txt", nil)
req.Header.Set("Range", "bytes=999-1000")
server.handle(rec, req)
require.Equal(t, http.StatusRequestedRangeNotSatisfiable, rec.Code)
require.Equal(t, "bytes */17", rec.Header().Get("Content-Range"))
}

func TestS3Server_InvalidRangeContentRangeHeader(t *testing.T) {
t.Parallel()

st := store.NewMVCCStore()
server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil)

rec := httptest.NewRecorder()
server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-inv-range", nil))
require.Equal(t, http.StatusOK, rec.Code)

payload := "abcdefghij" // 10 bytes
rec = httptest.NewRecorder()
server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-inv-range/obj.txt", strings.NewReader(payload)))
require.Equal(t, http.StatusOK, rec.Code)

// GET with out-of-range Range: 416 response must include Content-Range header.
rec = httptest.NewRecorder()
req := newS3TestRequest(http.MethodGet, "/bucket-inv-range/obj.txt", nil)
req.Header.Set("Range", "bytes=100-200")
server.handle(rec, req)
require.Equal(t, http.StatusRequestedRangeNotSatisfiable, rec.Code)
require.Equal(t, "bytes */10", rec.Header().Get("Content-Range"))
}

func TestS3Server_PresignedURLMissingExpires(t *testing.T) {
t.Parallel()

st := store.NewMVCCStore()
server := NewS3Server(
nil, "", st, newLocalAdapterCoordinator(st), nil,
WithS3Region(testS3Region),
WithS3StaticCredentials(map[string]string{testS3AccessKey: testS3SecretKey}),
)

// Build a presigned URL without X-Amz-Expires: server must reject it.
signingTime := currentS3SigningTime()
presignReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://localhost/bucket/obj.txt", nil)
require.NoError(t, err)
signer := v4.NewSigner(func(opts *v4.SignerOptions) {
opts.DisableURIPathEscaping = true
})
creds := aws.Credentials{AccessKeyID: testS3AccessKey, SecretAccessKey: testS3SecretKey, Source: "test"}
presignedURL, _, err := signer.PresignHTTP(context.Background(), creds, presignReq,
s3UnsignedPayload, "s3", testS3Region, signingTime)
require.NoError(t, err)

parsedURL, err := url.Parse(presignedURL)
require.NoError(t, err)
presignGetReq := newS3TestRequest(http.MethodGet, parsedURL.RequestURI(), nil)
presignGetReq.Host = "localhost"
rec := httptest.NewRecorder()
server.handle(rec, presignGetReq)
require.Equal(t, http.StatusForbidden, rec.Code)
require.Contains(t, rec.Body.String(), "AuthorizationQueryParametersError")
require.Contains(t, rec.Body.String(), "X-Amz-Expires")
}

func TestS3Server_MultipartUploadETagComputation(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -1094,6 +1185,10 @@ func TestS3Server_PresignedURL(t *testing.T) {
// Build a presigned GET URL using a proper absolute URL.
presignReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://localhost/bucket-presign/obj.txt", nil)
require.NoError(t, err)
// Set X-Amz-Expires to satisfy the server's presigned URL requirement (900 s validity).
presignQuery := presignReq.URL.Query()
presignQuery.Set("X-Amz-Expires", "900")
presignReq.URL.RawQuery = presignQuery.Encode()
signer := v4.NewSigner(func(opts *v4.SignerOptions) {
opts.DisableURIPathEscaping = true
})
Expand Down Expand Up @@ -1134,11 +1229,16 @@ func TestS3Server_PresignedURLExpired(t *testing.T) {
WithS3StaticCredentials(map[string]string{testS3AccessKey: testS3SecretKey}),
)

// Build a presigned URL with the default 900s expiry that's already expired.
// Signing 20 minutes ago means expiry was 5 minutes ago.
// Build a presigned URL with a 60 s expiry that's already expired.
// Signing 20 minutes ago means expiry was ~19 minutes ago.
oldTime := time.Now().UTC().Add(-20 * time.Minute)
presignReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://localhost/bucket-presign/obj.txt", nil)
require.NoError(t, err)
// Set a 60 s expiry that will have elapsed given the signing time of 20 minutes ago,
// ensuring the presigned URL is expired when the server validates it.
presignQuery := presignReq.URL.Query()
presignQuery.Set("X-Amz-Expires", "60")
presignReq.URL.RawQuery = presignQuery.Encode()
signer := v4.NewSigner(func(opts *v4.SignerOptions) {
opts.DisableURIPathEscaping = true
})
Expand Down
28 changes: 28 additions & 0 deletions internal/s3keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,34 @@ func BlobKey(bucket string, generation uint64, object string, uploadID string, p
return buildObjectKey(blobPrefixBytes, bucket, generation, object, uploadID, partNo, chunkNo)
}

// VersionedBlobKey returns the blob key for a specific part attempt identified by
// partVersion (typically the part's commit timestamp). When partVersion is 0 the
// result is identical to BlobKey, preserving backward compatibility with data
// written before versioned keys were introduced. When partVersion is non-zero the
// key includes an extra u64 suffix so concurrent re-uploads of the same part
// (e.g. retry or parallel UploadPart for the same partNo) write to a distinct key
// space, making blob data immutable per attempt.
func VersionedBlobKey(bucket string, generation uint64, object string, uploadID string, partNo uint64, chunkNo uint64, partVersion uint64) []byte {
if partVersion == 0 {
return BlobKey(bucket, generation, object, uploadID, partNo, chunkNo)
}
// Capacity breakdown:
// - len(BlobPrefix) prefix bytes
// - len(bucket)+len(object)+len(uploadID) segment content
// - buildObjectExtraBytes escape/terminator overhead for 3 segments (bucket, object, uploadID)
// - 4*u64Bytes four fixed-width u64 fields: generation, partNo, chunkNo, partVersion
out := make([]byte, 0, len(BlobPrefix)+len(bucket)+len(object)+len(uploadID)+buildObjectExtraBytes+4*u64Bytes)
out = append(out, blobPrefixBytes...)
out = append(out, EncodeSegment([]byte(bucket))...)
out = appendU64(out, generation)
out = append(out, EncodeSegment([]byte(object))...)
out = append(out, EncodeSegment([]byte(uploadID))...)
out = appendU64(out, partNo)
out = appendU64(out, chunkNo)
out = appendU64(out, partVersion)
return out
}

func GCUploadKey(bucket string, generation uint64, object string, uploadID string) []byte {
return buildObjectKey(gcUploadPrefixBytes, bucket, generation, object, uploadID, 0, 0)
}
Expand Down
Loading
Loading