diff --git a/TODO_s3_review_fixes.md b/TODO_s3_review_fixes.md index d839c308..60bd74a8 100644 --- a/TODO_s3_review_fixes.md +++ b/TODO_s3_review_fixes.md @@ -3,7 +3,7 @@ ## Priority 1 (Critical) - [x] 1. createMultipartUpload の IsTxn=true 化 — 分散環境での原子性保証 -- [x] 2. セマフォ満杯時の GC 回復 — goroutine内でブロッキング待機に変更 +- [x] 2. セマフォ満杯時の GC 回復 — non-blocking select でセマフォ満杯時に cleanup を skip(goroutine スポーン前に semaphore 取得済み、無限 goroutine 蓄積を防止) - [x] 3. completeMultipartUpload リトライ最適化 — パート検証を retry 外へ分離 ## Priority 2 (Medium) diff --git a/adapter/s3.go b/adapter/s3.go index b467f876..6e12db86 100644 --- a/adapter/s3.go +++ b/adapter/s3.go @@ -95,11 +95,12 @@ type s3ObjectManifest struct { } type s3ObjectPart struct { - PartNo uint64 `json:"part_no"` - ETag string `json:"etag"` - SizeBytes int64 `json:"size_bytes"` - ChunkCount uint64 `json:"chunk_count"` - ChunkSizes []uint64 `json:"chunk_sizes,omitempty"` + PartNo uint64 `json:"part_no"` + ETag string `json:"etag"` + SizeBytes int64 `json:"size_bytes"` + ChunkCount uint64 `json:"chunk_count"` + ChunkSizes []uint64 `json:"chunk_sizes,omitempty"` + PartVersion uint64 `json:"part_version,omitempty"` } type s3ContinuationToken struct { @@ -189,11 +190,12 @@ type s3UploadMeta struct { } type s3PartDescriptor struct { - PartNo uint64 `json:"part_no"` - ETag string `json:"etag"` - SizeBytes int64 `json:"size_bytes"` - ChunkCount uint64 `json:"chunk_count"` - ChunkSizes []uint64 `json:"chunk_sizes,omitempty"` + PartNo uint64 `json:"part_no"` + ETag string `json:"etag"` + SizeBytes int64 `json:"size_bytes"` + ChunkCount uint64 `json:"chunk_count"` + ChunkSizes []uint64 `json:"chunk_sizes,omitempty"` + PartVersion uint64 `json:"part_version,omitempty"` } type s3InitiateMultipartUploadResult struct { @@ -761,23 +763,43 @@ func (s *S3Server) getObject(w http.ResponseWriter, r *http.Request, bucket stri } rangeHeader := strings.TrimSpace(r.Header.Get("Range")) - if headOnly || rangeHeader == "" { - writeS3ObjectHeaders(w.Header(), manifest) - if rangeHeader != "" { - w.Header().Set("Accept-Ranges", "bytes") + + // HEAD requests: mirror GET range semantics but never write a body. + if headOnly { + if rangeHeader == "" { + writeS3ObjectHeaders(w.Header(), manifest) + w.WriteHeader(http.StatusOK) + return } - w.WriteHeader(http.StatusOK) - if headOnly { + rangeStart, rangeEnd, ok := parseS3RangeHeader(rangeHeader, manifest.SizeBytes) + if !ok { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.SizeBytes)) + writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange", + "range not satisfiable", bucket, objectKey) return } + contentLength := rangeEnd - rangeStart + 1 + writeS3ObjectHeaders(w.Header(), manifest) + w.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10)) + w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rangeStart, rangeEnd, manifest.SizeBytes)) + w.Header().Set("Accept-Ranges", "bytes") + w.WriteHeader(http.StatusPartialContent) + return + } + + // GET without Range: stream the full object. + if rangeHeader == "" { + writeS3ObjectHeaders(w.Header(), manifest) + w.WriteHeader(http.StatusOK) s.streamObjectChunks(w, r, bucket, meta.Generation, objectKey, manifest, readTS, 0, manifest.SizeBytes) return } rangeStart, rangeEnd, ok := parseS3RangeHeader(rangeHeader, manifest.SizeBytes) if !ok { + w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.SizeBytes)) writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange", - fmt.Sprintf("bytes */%d", manifest.SizeBytes), bucket, objectKey) + "range not satisfiable", bucket, objectKey) return } @@ -817,7 +839,7 @@ func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bu ) return } - chunkKey := s3keys.BlobKey(bucket, generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex) + chunkKey := s3keys.VersionedBlobKey(bucket, generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex, part.PartVersion) chunk, err := s.store.GetAt(r.Context(), chunkKey, readTS) if err != nil { slog.ErrorContext(r.Context(), "streamObjectChunks: GetAt failed", @@ -1043,6 +1065,22 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str } r.Body = http.MaxBytesReader(w, r.Body, s3MaxPartSizeBytes) + + // Pre-allocate the part's commit timestamp before writing any blob chunks so + // that the same version identifier is used for every chunk in this attempt. + // Embedding partCommitTS as PartVersion in the blob keys isolates each + // UploadPart attempt in its own key space: a concurrent or retried request + // for the same partNo receives a different timestamp and therefore writes to + // different keys, leaving the chunk data referenced by an in-progress + // CompleteMultipartUpload untouched. + partReadTS := s.readTS() + partStartTS := s.txnStartTS(partReadTS) + partCommitTS, err := s.nextTxnCommitTS(partStartTS) + if err != nil { + writeS3InternalError(w, err) + return + } + hasher := md5.New() //nolint:gosec // S3 ETag compatibility requires MD5. sizeBytes := int64(0) chunkNo := uint64(0) @@ -1084,7 +1122,7 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str writeS3InternalError(w, err) return } - chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo) + chunkKey := s3keys.VersionedBlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo, partCommitTS) pendingBatch = append(pendingBatch, &kv.Elem[kv.OP]{Op: kv.Put, Key: chunkKey, Value: chunk}) cs, err := uint64FromInt(n) if err != nil { @@ -1120,11 +1158,12 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str etag := hex.EncodeToString(hasher.Sum(nil)) partDesc := &s3PartDescriptor{ - PartNo: partNo, - ETag: etag, - SizeBytes: sizeBytes, - ChunkCount: chunkNo, - ChunkSizes: chunkSizes, + PartNo: partNo, + ETag: etag, + SizeBytes: sizeBytes, + ChunkCount: chunkNo, + ChunkSizes: chunkSizes, + PartVersion: partCommitTS, } descBody, err := json.Marshal(partDesc) if err != nil { @@ -1132,22 +1171,16 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str return } partKey := s3keys.UploadPartKey(bucket, meta.Generation, objectKey, uploadID, partNo) - startTS := s.txnStartTS(readTS) - partCommitTS, err := s.nextTxnCommitTS(startTS) - if err != nil { - writeS3InternalError(w, err) - return - } if _, err := s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{ IsTxn: true, - StartTS: startTS, + StartTS: partStartTS, CommitTS: partCommitTS, Elems: []*kv.Elem[kv.OP]{ {Op: kv.Put, Key: partKey, Value: descBody}, }, }); err != nil { // Clean up orphaned blob chunks so they don't accumulate in the store. - s.cleanupPartBlobsAsync(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo) + s.cleanupPartBlobsAsync(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo, partCommitTS) writeS3InternalError(w, err) return } @@ -1522,7 +1555,8 @@ func (s *S3Server) cleanupUploadParts(ctx context.Context, bucket string, genera // cleanupPartBlobsAsync asynchronously deletes the blob chunk keys for a single // upload part. It is used to garbage-collect orphaned chunks when a part // descriptor write fails after the chunks have already been committed. -func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objectKey string, uploadID string, partNo uint64, chunkCount uint64) { +// partVersion must match the value used when writing the chunk keys. +func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objectKey string, uploadID string, partNo uint64, chunkCount uint64, partVersion uint64) { select { case s.cleanupSem <- struct{}{}: default: @@ -1552,7 +1586,7 @@ func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objec for i := uint64(0); i < chunkCount; i++ { pending = append(pending, &kv.Elem[kv.OP]{ Op: kv.Del, - Key: s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i), + Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, partNo, i, partVersion), }) if len(pending) >= s3ChunkBatchOps { flush() @@ -1859,7 +1893,7 @@ func (s *S3Server) appendPartBlobKeys(pending []*kv.Elem[kv.OP], bucket string, } pending = append(pending, &kv.Elem[kv.OP]{ Op: kv.Del, - Key: s3keys.BlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex), + Key: s3keys.VersionedBlobKey(bucket, generation, objectKey, uploadID, part.PartNo, chunkIndex, part.PartVersion), }) if len(pending) >= s3ChunkBatchOps { flush() diff --git a/adapter/s3_auth.go b/adapter/s3_auth.go index 02c79c42..09a8dab8 100644 --- a/adapter/s3_auth.go +++ b/adapter/s3_auth.go @@ -264,18 +264,14 @@ func normalizeS3PayloadHash(raw string) string { const s3PresignMaxExpiry = 7 * 24 * 60 * 60 // 604800 seconds (7 days) // checkPresignExpiry validates the expiry of a presigned request. -// It returns an *s3AuthError if the request has expired or the expiry is invalid. +// It returns an *s3AuthError if X-Amz-Expires is missing, invalid, or the URL has expired. func checkPresignExpiry(expiresStr string, signingTime time.Time) *s3AuthError { if expiresStr == "" { - // No explicit expiry: fall back to clock skew check. - skew := time.Now().UTC().Sub(signingTime.UTC()) - if skew < 0 { - skew = -skew - } - if skew > s3RequestTimeMaxSkew { - return &s3AuthError{Status: http.StatusForbidden, Code: "AccessDenied", Message: "presigned URL has expired"} + return &s3AuthError{ + Status: http.StatusForbidden, + Code: "AuthorizationQueryParametersError", + Message: "X-Amz-Expires is required for presigned URLs", } - return nil } expires, err := strconv.Atoi(expiresStr) if err != nil || expires <= 0 || expires > s3PresignMaxExpiry { diff --git a/adapter/s3_test.go b/adapter/s3_test.go index 04aabdfe..643f98a2 100644 --- a/adapter/s3_test.go +++ b/adapter/s3_test.go @@ -878,6 +878,97 @@ func TestS3Server_RangeReadEmptyObject(t *testing.T) { require.Equal(t, http.StatusRequestedRangeNotSatisfiable, rec.Code) } +func TestS3Server_HeadWithRange(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + rec := httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-head-range", nil)) + require.Equal(t, http.StatusOK, rec.Code) + + payload := "hello ranged head" + rec = httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-head-range/obj.txt", strings.NewReader(payload))) + require.Equal(t, http.StatusOK, rec.Code) + + // HEAD with valid Range: expect 206 + Content-Range, no body. + rec = httptest.NewRecorder() + req := newS3TestRequest(http.MethodHead, "/bucket-head-range/obj.txt", nil) + req.Header.Set("Range", "bytes=0-4") + server.handle(rec, req) + require.Equal(t, http.StatusPartialContent, rec.Code) + require.Equal(t, "bytes 0-4/17", rec.Header().Get("Content-Range")) + require.Equal(t, "5", rec.Header().Get("Content-Length")) + require.Empty(t, rec.Body.String()) + + // HEAD with out-of-range Range: expect 416 + Content-Range header. + rec = httptest.NewRecorder() + req = newS3TestRequest(http.MethodHead, "/bucket-head-range/obj.txt", nil) + req.Header.Set("Range", "bytes=999-1000") + server.handle(rec, req) + require.Equal(t, http.StatusRequestedRangeNotSatisfiable, rec.Code) + require.Equal(t, "bytes */17", rec.Header().Get("Content-Range")) +} + +func TestS3Server_InvalidRangeContentRangeHeader(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + rec := httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-inv-range", nil)) + require.Equal(t, http.StatusOK, rec.Code) + + payload := "abcdefghij" // 10 bytes + rec = httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-inv-range/obj.txt", strings.NewReader(payload))) + require.Equal(t, http.StatusOK, rec.Code) + + // GET with out-of-range Range: 416 response must include Content-Range header. + rec = httptest.NewRecorder() + req := newS3TestRequest(http.MethodGet, "/bucket-inv-range/obj.txt", nil) + req.Header.Set("Range", "bytes=100-200") + server.handle(rec, req) + require.Equal(t, http.StatusRequestedRangeNotSatisfiable, rec.Code) + require.Equal(t, "bytes */10", rec.Header().Get("Content-Range")) +} + +func TestS3Server_PresignedURLMissingExpires(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server( + nil, "", st, newLocalAdapterCoordinator(st), nil, + WithS3Region(testS3Region), + WithS3StaticCredentials(map[string]string{testS3AccessKey: testS3SecretKey}), + ) + + // Build a presigned URL without X-Amz-Expires: server must reject it. + signingTime := currentS3SigningTime() + presignReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://localhost/bucket/obj.txt", nil) + require.NoError(t, err) + signer := v4.NewSigner(func(opts *v4.SignerOptions) { + opts.DisableURIPathEscaping = true + }) + creds := aws.Credentials{AccessKeyID: testS3AccessKey, SecretAccessKey: testS3SecretKey, Source: "test"} + presignedURL, _, err := signer.PresignHTTP(context.Background(), creds, presignReq, + s3UnsignedPayload, "s3", testS3Region, signingTime) + require.NoError(t, err) + + parsedURL, err := url.Parse(presignedURL) + require.NoError(t, err) + presignGetReq := newS3TestRequest(http.MethodGet, parsedURL.RequestURI(), nil) + presignGetReq.Host = "localhost" + rec := httptest.NewRecorder() + server.handle(rec, presignGetReq) + require.Equal(t, http.StatusForbidden, rec.Code) + require.Contains(t, rec.Body.String(), "AuthorizationQueryParametersError") + require.Contains(t, rec.Body.String(), "X-Amz-Expires") +} + func TestS3Server_MultipartUploadETagComputation(t *testing.T) { t.Parallel() @@ -1094,6 +1185,10 @@ func TestS3Server_PresignedURL(t *testing.T) { // Build a presigned GET URL using a proper absolute URL. presignReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://localhost/bucket-presign/obj.txt", nil) require.NoError(t, err) + // Set X-Amz-Expires to satisfy the server's presigned URL requirement (900 s validity). + presignQuery := presignReq.URL.Query() + presignQuery.Set("X-Amz-Expires", "900") + presignReq.URL.RawQuery = presignQuery.Encode() signer := v4.NewSigner(func(opts *v4.SignerOptions) { opts.DisableURIPathEscaping = true }) @@ -1134,11 +1229,16 @@ func TestS3Server_PresignedURLExpired(t *testing.T) { WithS3StaticCredentials(map[string]string{testS3AccessKey: testS3SecretKey}), ) - // Build a presigned URL with the default 900s expiry that's already expired. - // Signing 20 minutes ago means expiry was 5 minutes ago. + // Build a presigned URL with a 60 s expiry that's already expired. + // Signing 20 minutes ago means expiry was ~19 minutes ago. oldTime := time.Now().UTC().Add(-20 * time.Minute) presignReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://localhost/bucket-presign/obj.txt", nil) require.NoError(t, err) + // Set a 60 s expiry that will have elapsed given the signing time of 20 minutes ago, + // ensuring the presigned URL is expired when the server validates it. + presignQuery := presignReq.URL.Query() + presignQuery.Set("X-Amz-Expires", "60") + presignReq.URL.RawQuery = presignQuery.Encode() signer := v4.NewSigner(func(opts *v4.SignerOptions) { opts.DisableURIPathEscaping = true }) diff --git a/internal/s3keys/keys.go b/internal/s3keys/keys.go index f0acc3d2..d40cd4a9 100644 --- a/internal/s3keys/keys.go +++ b/internal/s3keys/keys.go @@ -96,6 +96,34 @@ func BlobKey(bucket string, generation uint64, object string, uploadID string, p return buildObjectKey(blobPrefixBytes, bucket, generation, object, uploadID, partNo, chunkNo) } +// VersionedBlobKey returns the blob key for a specific part attempt identified by +// partVersion (typically the part's commit timestamp). When partVersion is 0 the +// result is identical to BlobKey, preserving backward compatibility with data +// written before versioned keys were introduced. When partVersion is non-zero the +// key includes an extra u64 suffix so concurrent re-uploads of the same part +// (e.g. retry or parallel UploadPart for the same partNo) write to a distinct key +// space, making blob data immutable per attempt. +func VersionedBlobKey(bucket string, generation uint64, object string, uploadID string, partNo uint64, chunkNo uint64, partVersion uint64) []byte { + if partVersion == 0 { + return BlobKey(bucket, generation, object, uploadID, partNo, chunkNo) + } + // Capacity breakdown: + // - len(BlobPrefix) prefix bytes + // - len(bucket)+len(object)+len(uploadID) segment content + // - buildObjectExtraBytes escape/terminator overhead for 3 segments (bucket, object, uploadID) + // - 4*u64Bytes four fixed-width u64 fields: generation, partNo, chunkNo, partVersion + out := make([]byte, 0, len(BlobPrefix)+len(bucket)+len(object)+len(uploadID)+buildObjectExtraBytes+4*u64Bytes) + out = append(out, blobPrefixBytes...) + out = append(out, EncodeSegment([]byte(bucket))...) + out = appendU64(out, generation) + out = append(out, EncodeSegment([]byte(object))...) + out = append(out, EncodeSegment([]byte(uploadID))...) + out = appendU64(out, partNo) + out = appendU64(out, chunkNo) + out = appendU64(out, partVersion) + return out +} + func GCUploadKey(bucket string, generation uint64, object string, uploadID string) []byte { return buildObjectKey(gcUploadPrefixBytes, bucket, generation, object, uploadID, 0, 0) } diff --git a/internal/s3keys/keys_test.go b/internal/s3keys/keys_test.go index 70563c5b..4af3533f 100644 --- a/internal/s3keys/keys_test.go +++ b/internal/s3keys/keys_test.go @@ -149,6 +149,41 @@ func TestUploadPartPrefixForUpload_IsPrefixOfPartKeys(t *testing.T) { require.False(t, bytes.HasPrefix(otherKey, prefix)) } +func TestVersionedBlobKey_ZeroVersionMatchesBlobKey(t *testing.T) { + t.Parallel() + + bucket := "bucket-v" + generation := uint64(5) + object := "file.bin" + uploadID := "upload-v" + partNo := uint64(2) + chunkNo := uint64(3) + + // VersionedBlobKey with version=0 must produce the same key as BlobKey. + require.Equal(t, BlobKey(bucket, generation, object, uploadID, partNo, chunkNo), + VersionedBlobKey(bucket, generation, object, uploadID, partNo, chunkNo, 0)) +} + +func TestVersionedBlobKey_NonZeroVersionDiffersFromBlobKey(t *testing.T) { + t.Parallel() + + bucket := "bucket-v" + generation := uint64(5) + object := "file.bin" + uploadID := "upload-v" + partNo := uint64(2) + chunkNo := uint64(3) + version := uint64(999) + + versionedKey := VersionedBlobKey(bucket, generation, object, uploadID, partNo, chunkNo, version) + unversionedKey := BlobKey(bucket, generation, object, uploadID, partNo, chunkNo) + require.NotEqual(t, unversionedKey, versionedKey) + + // Different part versions must produce different keys. + otherVersionKey := VersionedBlobKey(bucket, generation, object, uploadID, partNo, chunkNo, version+1) + require.NotEqual(t, versionedKey, otherVersionKey) +} + func TestBlobPrefixForUpload_IsPrefixOfBlobKeys(t *testing.T) { t.Parallel()