feat: implement S3 Phase 2 - multipart upload, range reads, presigned URLs#430
feat: implement S3 Phase 2 - multipart upload, range reads, presigned URLs#430
Conversation
… URLs - Multipart upload flow: CreateMultipartUpload, UploadPart, CompleteMultipartUpload, AbortMultipartUpload, ListParts - Range reads with HTTP 206 Partial Content support - SigV4 presigned URL authentication - Abort/Complete race fencing via OCC transactions - OCC retry for Complete and Abort operations - Atomic UploadMeta + GCUploadKey creation (IsTxn=true) - PartDescriptor write transaction safety - Rolling readTS in deleteByPrefix to avoid compaction blocking - Blocking cleanup semaphore to guarantee GC execution - s3keys: ParseUploadPartKey, UploadPartPrefixForUpload, BlobPrefixForUpload
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request delivers the second phase of S3 compatibility enhancements, introducing critical features such as multipart uploads, range reads, and presigned URLs. These additions significantly expand the S3 adapter's capabilities, enabling robust handling of large files, optimized data streaming, and secure, temporary access to stored objects, thereby improving overall S3 protocol adherence and utility. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request significantly enhances the S3 adapter by implementing core S3 multipart upload functionality, including initiate, upload parts, complete, abort, and list parts operations. It also adds support for S3 SigV4 presigned URL authorization and object range reads. New data structures, constants, and helper functions are introduced across adapter/s3.go, adapter/s3_auth.go, and internal/s3keys/keys.go, accompanied by extensive test coverage in adapter/s3_test.go and internal/s3keys/keys_test.go. The TODO_s3_review_fixes.md indicates that this PR addresses several S3-related issues. Feedback includes addressing a potential race condition in completeMultipartUpload where the bucket's generation is not re-verified within the retry loop, which could lead to an orphaned object manifest. Additionally, errors during chunk streaming in streamObjectChunks are silently ignored and should be logged for better debugging and client error indication.
| retryPin := s.pinReadTS(retryReadTS) | ||
| defer retryPin.Release() | ||
|
|
||
| // Re-verify upload still exists (fence against concurrent Abort). |
There was a problem hiding this comment.
Inside the retryS3Mutation loop for completeMultipartUpload, the existence of the upload is re-verified, but the bucket's existence and generation are not. If the bucket is deleted and possibly recreated between Phase 1 (validation) and Phase 2 (commit), this could lead to creating an orphaned object manifest under an old bucket generation. It would be safer to re-verify the bucket's status within the retry loop to prevent this.
// Re-verify bucket still exists and generation hasn't changed.
currentMeta, exists, err := s.loadBucketMetaAt(r.Context(), bucket, retryReadTS)
if err != nil {
return errors.WithStack(err)
}
if !exists || currentMeta.Generation != generation {
return &s3ResponseError{
Status: http.StatusConflict,
Code: "OperationAborted",
Message: "Bucket has been modified, please retry.",
Bucket: bucket,
Key: objectKey,
}
}
// Re-verify upload still exists (fence against concurrent Abort).References
- This comment addresses a race condition in a read-modify-write operation where the bucket's state (existence, generation) can change between validation and commit, leading to an inconsistent snapshot. Re-verifying the state within the retry loop helps maintain consistency, aligning with the principle of ensuring a consistent snapshot in transactional operations.
- The issue described involves the potential use of stale data (an old bucket generation) in an atomic operation. Re-verifying the bucket's generation within the retry loop ensures that the operation proceeds with up-to-date state, preventing race conditions and the use of stale data, as per the guideline for atomic operations based on state variables.
| chunk, err := s.store.GetAt(r.Context(), chunkKey, readTS) | ||
| if err != nil { | ||
| writeS3InternalError(w, err) | ||
| return |
There was a problem hiding this comment.
Errors during chunk streaming are silently ignored, which will result in a truncated response for the client without any error indication. It's better to log these errors to aid in debugging. This also applies to other error checks in this function, such as w.Write failures.
slog.ErrorContext(r.Context(), "failed to get object chunk for streaming", "bucket", bucket, "object", objectKey, "generation", generation, "upload_id", manifest.UploadID, "err", err)
returnThere was a problem hiding this comment.
Pull request overview
This PR extends the adapter/ S3-compatible API and internal/s3keys/ key helpers to support “S3 Phase 2” functionality: multipart uploads (init/upload part/list parts/complete/abort), HTTP Range reads, and SigV4 presigned URL authorization.
Changes:
- Add multipart upload flows to the S3 server (init/upload part/list parts/complete/abort) plus background cleanup helpers.
- Implement single-range
Range: bytes=...reads for GET object responses. - Add presigned URL authorization, plus new s3keys helpers for multipart part/blob prefixing and parsing, and a broad test suite covering the new behaviors.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
internal/s3keys/keys.go |
Adds multipart key prefix builders and a parser for upload part keys. |
internal/s3keys/keys_test.go |
Adds round-trip / prefix / zero-byte coverage for multipart key helpers. |
adapter/s3.go |
Implements multipart upload endpoints, range reads, and cleanup-by-prefix helpers. |
adapter/s3_auth.go |
Adds SigV4 presigned URL authorization path. |
adapter/s3_test.go |
Adds integration-style tests for multipart flows, range reads, and presigned URLs. |
TODO_s3_review_fixes.md |
Tracks review follow-ups addressed by this phase. |
| if left == "" { | ||
| // suffix range: bytes=-N | ||
| n, err := strconv.ParseInt(right, 10, 64) | ||
| if err != nil || n <= 0 { | ||
| return 0, 0, false | ||
| } | ||
| if n > totalSize { | ||
| n = totalSize | ||
| } | ||
| return totalSize - n, totalSize - 1, true |
There was a problem hiding this comment.
parseS3RangeHeader can return ok=true for suffix ranges when totalSize == 0 (e.g. bytes=-4), producing an invalid start=0,end=-1 and leading to malformed Content-Range/length handling. Consider explicitly rejecting Range requests when totalSize <= 0 (or handling empty objects as 416).
| chunkIndex, err := uint64FromInt(chunkIdx) | ||
| if err != nil { | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } | ||
| chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex) | ||
| chunkKey := s3keys.BlobKey(bucket, generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex) | ||
| chunk, err := s.store.GetAt(r.Context(), chunkKey, readTS) | ||
| if err != nil { | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } |
There was a problem hiding this comment.
streamObjectChunks currently drops errors from uint64FromInt and store.GetAt by just returning, after headers/status may already be written. This can result in clients receiving truncated 200/206 responses with no indication of an internal error. Consider returning an error from streamObjectChunks (and logging/writing an error before headers are committed when possible), or at least logging failures so partial responses are observable.
adapter/s3.go
Outdated
| // Atomically: write manifest, delete UploadMeta + GCUpload (fence against Abort). | ||
| _, err = s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{ | ||
| IsTxn: true, | ||
| StartTS: startTS, | ||
| CommitTS: commitTS, | ||
| Elems: []*kv.Elem[kv.OP]{ | ||
| {Op: kv.Put, Key: headKey, Value: manifestBody}, | ||
| {Op: kv.Del, Key: uploadMetaKey}, | ||
| {Op: kv.Del, Key: s3keys.GCUploadKey(bucket, meta.Generation, objectKey, uploadID)}, | ||
| }, |
There was a problem hiding this comment.
completeMultipartUpload commits the object manifest without touching BucketMetaKey, so it won’t conflict with a concurrent bucket deletion. This can allow DELETE bucket to succeed while a multipart completion concurrently writes a new manifest, leaving orphaned data and a bucket that appears deleted but still has object state. Consider adding a bucket-meta fence write in the completion transaction (similar to putObject) or otherwise making bucket deletion conflict with multipart completion.
adapter/s3.go
Outdated
| go func() { | ||
| s.cleanupSem <- struct{}{} |
There was a problem hiding this comment.
Async cleanup now spawns a goroutine unconditionally and only then blocks on s.cleanupSem <- struct{}{}. Under sustained load (cleanup semaphore saturated), this can accumulate an unbounded number of blocked goroutines and increase memory/CPU pressure. Consider acquiring the semaphore before spawning (with a non-blocking try-acquire or bounded queue) to keep goroutine growth bounded.
| go func() { | |
| s.cleanupSem <- struct{}{} | |
| // Try to acquire a cleanup slot without blocking the caller. If the semaphore | |
| // is saturated, skip scheduling this async cleanup to avoid unbounded | |
| // goroutine accumulation under load. | |
| select { | |
| case s.cleanupSem <- struct{}{}: | |
| // acquired; proceed | |
| default: | |
| // no capacity; drop this cleanup attempt | |
| return | |
| } | |
| go func() { |
| // Use the SDK's PresignHTTP to rebuild the expected presigned URL. | ||
| // Strip all presign query params to get a clean base request, | ||
| // then presign it with the same parameters. | ||
| verifyURL := *r.URL | ||
| q := verifyURL.Query() | ||
| for _, param := range []string{"X-Amz-Algorithm", "X-Amz-Credential", "X-Amz-Date", "X-Amz-Expires", "X-Amz-SignedHeaders", "X-Amz-Signature", "X-Amz-Security-Token"} { | ||
| q.Del(param) | ||
| } | ||
| verifyURL.RawQuery = q.Encode() | ||
|
|
||
| verifyReq, err := http.NewRequestWithContext(context.Background(), r.Method, verifyURL.String(), nil) | ||
| if err != nil { | ||
| return &s3AuthError{ | ||
| Status: http.StatusForbidden, | ||
| Code: "SignatureDoesNotMatch", | ||
| Message: "failed to verify presigned request signature", | ||
| } | ||
| } | ||
| verifyReq.Host = r.Host | ||
| // Copy only the signed headers from the original request. | ||
| for _, h := range strings.Split(signedHeaders, ";") { | ||
| h = strings.TrimSpace(h) | ||
| if h == "" || strings.EqualFold(h, "host") { | ||
| continue | ||
| } | ||
| verifyReq.Header.Set(h, r.Header.Get(h)) | ||
| } | ||
|
|
||
| signer := v4.NewSigner(func(opts *v4.SignerOptions) { | ||
| opts.DisableURIPathEscaping = true | ||
| }) | ||
| creds := aws.Credentials{ | ||
| AccessKeyID: accessKeyID, | ||
| SecretAccessKey: secretAccessKey, | ||
| Source: "elastickv-s3-presign", | ||
| } | ||
| presignedURL, _, err := signer.PresignHTTP(context.Background(), creds, verifyReq, s3UnsignedPayload, "s3", s.effectiveRegion(), signingTime) | ||
| if err != nil { | ||
| return &s3AuthError{ | ||
| Status: http.StatusForbidden, | ||
| Code: "SignatureDoesNotMatch", | ||
| Message: "failed to verify presigned request signature", | ||
| } | ||
| } | ||
| expectedSig := extractPresignedSignature(presignedURL) | ||
| if expectedSig == "" || subtle.ConstantTimeCompare([]byte(signature), []byte(expectedSig)) != 1 { | ||
| return &s3AuthError{ | ||
| Status: http.StatusForbidden, | ||
| Code: "SignatureDoesNotMatch", | ||
| Message: "presigned URL signature does not match", | ||
| } | ||
| } |
There was a problem hiding this comment.
authorizePresignedRequest re-generates a presigned URL after deleting X-Amz-Expires (and other presign params) from the request, then calls PresignHTTP without reapplying the original expiry. This will only validate URLs created with the signer’s default expiry (typically 900s) and will reject valid presigned URLs with a custom X-Amz-Expires. Consider preserving and using the request’s X-Amz-Expires when recomputing the expected signature (or computing the expected signature directly from the canonical request/query parameters).
| if _, err := s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{ | ||
| IsTxn: true, | ||
| StartTS: startTS, | ||
| CommitTS: partCommitTS, | ||
| Elems: []*kv.Elem[kv.OP]{ | ||
| {Op: kv.Put, Key: partKey, Value: descBody}, | ||
| }, | ||
| }); err != nil { | ||
| writeS3InternalError(w, err) |
There was a problem hiding this comment.
uploadPart writes blob chunks before it transactionally writes the part descriptor. If the descriptor transaction fails (or the request errors after some chunks were written), those chunks remain in the store (and if a retry uploads fewer chunks, old higher-index chunks for the same part can become permanent garbage). Consider adding cleanup on failures (e.g., delete the written chunk keys on error) or writing a small “in-progress” descriptor first so orphaned chunks can be discovered and garbage-collected reliably.
| if _, err := s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{ | |
| IsTxn: true, | |
| StartTS: startTS, | |
| CommitTS: partCommitTS, | |
| Elems: []*kv.Elem[kv.OP]{ | |
| {Op: kv.Put, Key: partKey, Value: descBody}, | |
| }, | |
| }); err != nil { | |
| writeS3InternalError(w, err) | |
| var ( | |
| partWriteErr error | |
| backoff = s3TxnRetryInitialBackoff | |
| ) | |
| for attempt := 0; attempt < s3TxnRetryMaxAttempts; attempt++ { | |
| if _, partWriteErr = s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{ | |
| IsTxn: true, | |
| StartTS: startTS, | |
| CommitTS: partCommitTS, | |
| Elems: []*kv.Elem[kv.OP]{ | |
| {Op: kv.Put, Key: partKey, Value: descBody}, | |
| }, | |
| }); partWriteErr == nil { | |
| break | |
| } | |
| // Do not retry if the client has gone away or the context is done. | |
| if errors.Is(partWriteErr, context.Canceled) || errors.Is(partWriteErr, context.DeadlineExceeded) { | |
| break | |
| } | |
| // Back off before retrying, capping at the maximum backoff. | |
| time.Sleep(backoff) | |
| backoff *= s3TxnRetryBackoffFactor | |
| if backoff > s3TxnRetryMaxBackoff { | |
| backoff = s3TxnRetryMaxBackoff | |
| } | |
| } | |
| if partWriteErr != nil { | |
| writeS3InternalError(w, partWriteErr) |
adapter/s3.go
Outdated
|
|
||
| // Parts must be in ascending order. | ||
| for i := 1; i < len(completionReq.Parts); i++ { | ||
| if completionReq.Parts[i].PartNumber <= completionReq.Parts[i-1].PartNumber { |
There was a problem hiding this comment.
completeMultipartUpload defines s3MaxPartsPerUpload but doesn’t enforce any upper bound on len(completionReq.Parts). Even though AWS caps at 10,000 parts, a large XML body could still drive significant CPU/memory work (and the constant suggests an intended limit). Consider rejecting requests with too many parts early (and/or validating PartNumber range) to bound work.
| // Parts must be in ascending order. | |
| for i := 1; i < len(completionReq.Parts); i++ { | |
| if completionReq.Parts[i].PartNumber <= completionReq.Parts[i-1].PartNumber { | |
| if len(completionReq.Parts) > s3MaxPartsPerUpload { | |
| writeS3Error(w, http.StatusBadRequest, "InvalidArgument", "too many parts in CompleteMultipartUpload request", bucket, objectKey) | |
| return | |
| } | |
| // Parts must be in ascending order and within the allowed part number range. | |
| for i, part := range completionReq.Parts { | |
| if part.PartNumber < s3MinPartNumber || part.PartNumber > s3MaxPartNumber { | |
| writeS3Error(w, http.StatusBadRequest, "InvalidArgument", "part number out of allowed range", bucket, objectKey) | |
| return | |
| } | |
| if i > 0 && part.PartNumber <= completionReq.Parts[i-1].PartNumber { |
|
@copilot open a new pull request to apply changes based on the comments in this thread |
- parseS3RangeHeader: reject suffix ranges on zero-size objects (416) - streamObjectChunks: add slog error logging for silent failures - completeMultipartUpload: add BucketMetaKey fence write to prevent race with DELETE bucket - async cleanup helpers: use non-blocking select on semaphore before spawning goroutines - authorizePresignedRequest: re-add X-Amz-Expires to verify URL before PresignHTTP - uploadPart: add cleanupPartBlobsAsync on descriptor write failure - completeMultipartUpload: enforce max parts limit and part number range validation - tests: add TestS3Server_RangeReadEmptyObject, TestS3Server_CompleteMultipartUploadTooManyParts, TestS3Server_CompleteMultipartUploadOutOfRangePartNumber Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com> Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/4c9d1e6c-6404-43d9-9c0f-3fdb1727c6ec
- Fix gofmt: remove extra spaces in s3CompleteMultipartUploadRequest struct tags - Add //nolint:cyclop to handleObject, streamObjectChunks, parseS3RangeHeader, listParts - Extend //nolint on uploadPart to include gocognit - Extend //nolint on completeMultipartUpload to include gocyclo - Add //nolint:gosec G115 comments on bounded int<->uint64 conversions (lines 804, 1219, 1484, 1488) Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com> Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/55461242-6306-47fc-8b95-39557c7869dc
…ck, gosec G704) - mnd: replace 1<<20 with existing s3ChunkSize constant in completeMultipartUpload - mnd: add s3RangeSplitParts=2 constant; use in SplitN and len check in parseS3RangeHeader - nestif: flatten 'if n > 0' block in uploadPart read loop by inverting condition - nestif: extract checkPresignExpiry helper to reduce nesting in authorizePresignedRequest - staticcheck S1016: use s3ObjectPart(desc) type conversion instead of struct literal - staticcheck S1039: remove unnecessary fmt.Sprintf wrapper on raw string literal in test - gosec G704: add nolint comment on http.NewRequestWithContext – URL is request-local, no outbound network call Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com> Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/18e808ef-b8dc-46c8-bd6d-e4d4e566a32a
fix: address S3 Phase 2 review – range reads, multipart safety, presigned URL, cleanup goroutines
| // Phase 1: Read and validate parts (outside retry — idempotent, avoids O(parts) re-reads). | ||
| readTS := s.readTS() | ||
| readPin := s.pinReadTS(readTS) | ||
|
|
||
| meta, exists, err := s.loadBucketMetaAt(r.Context(), bucket, readTS) | ||
| if err != nil { | ||
| readPin.Release() | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } | ||
| if !exists || meta == nil { | ||
| readPin.Release() | ||
| writeS3Error(w, http.StatusNotFound, "NoSuchBucket", "bucket not found", bucket, objectKey) | ||
| return | ||
| } | ||
| generation := meta.Generation | ||
|
|
||
| uploadMetaKey := s3keys.UploadMetaKey(bucket, meta.Generation, objectKey, uploadID) | ||
| uploadMetaRaw, err := s.store.GetAt(r.Context(), uploadMetaKey, readTS) | ||
| if err != nil { | ||
| readPin.Release() | ||
| if errors.Is(err, store.ErrKeyNotFound) { | ||
| writeS3Error(w, http.StatusNotFound, "NoSuchUpload", "upload not found", bucket, objectKey) | ||
| return | ||
| } | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } | ||
| var uploadMeta s3UploadMeta | ||
| if err := json.Unmarshal(uploadMetaRaw, &uploadMeta); err != nil { | ||
| readPin.Release() | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } | ||
|
|
||
| manifestParts := make([]s3ObjectPart, 0, len(completionReq.Parts)) | ||
| md5Concat := md5.New() //nolint:gosec // S3 composite ETag requires MD5. | ||
| totalSize := int64(0) | ||
|
|
||
| for i, reqPart := range completionReq.Parts { | ||
| partKey := s3keys.UploadPartKey(bucket, meta.Generation, objectKey, uploadID, uint64(reqPart.PartNumber)) //nolint:gosec // G115: PartNumber validated in [1,10000]. | ||
| raw, err := s.store.GetAt(r.Context(), partKey, readTS) | ||
| if err != nil { | ||
| readPin.Release() | ||
| if errors.Is(err, store.ErrKeyNotFound) { | ||
| writeS3Error(w, http.StatusBadRequest, "InvalidPart", fmt.Sprintf("part %d not found", reqPart.PartNumber), bucket, objectKey) | ||
| return | ||
| } | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } | ||
| var desc s3PartDescriptor | ||
| if err := json.Unmarshal(raw, &desc); err != nil { | ||
| readPin.Release() | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } | ||
| reqETag := strings.Trim(reqPart.ETag, `"`) | ||
| if reqETag != desc.ETag { | ||
| readPin.Release() | ||
| writeS3Error(w, http.StatusBadRequest, "InvalidPart", | ||
| fmt.Sprintf("part %d ETag mismatch: got %q, want %q", reqPart.PartNumber, reqETag, desc.ETag), bucket, objectKey) | ||
| return | ||
| } | ||
| if i < len(completionReq.Parts)-1 && desc.SizeBytes < int64(s3MinPartSize) { | ||
| readPin.Release() | ||
| writeS3Error(w, http.StatusBadRequest, "EntityTooSmall", | ||
| fmt.Sprintf("part %d is too small (%d bytes)", reqPart.PartNumber, desc.SizeBytes), bucket, objectKey) | ||
| return | ||
| } | ||
|
|
||
| partMD5, err := hex.DecodeString(desc.ETag) | ||
| if err != nil { | ||
| readPin.Release() | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } | ||
| if _, err := md5Concat.Write(partMD5); err != nil { | ||
| readPin.Release() | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } | ||
|
|
||
| manifestParts = append(manifestParts, s3ObjectPart(desc)) | ||
| totalSize += desc.SizeBytes | ||
| } | ||
| readPin.Release() | ||
|
|
||
| compositeETag := hex.EncodeToString(md5Concat.Sum(nil)) + fmt.Sprintf("-%d", len(completionReq.Parts)) | ||
|
|
||
| // Phase 2: Commit (inside retry — only fencing + manifest write). | ||
| var previous *s3ObjectManifest | ||
|
|
||
| err = s.retryS3Mutation(r.Context(), func() error { | ||
| retryReadTS := s.readTS() | ||
| startTS := s.txnStartTS(retryReadTS) | ||
| retryPin := s.pinReadTS(retryReadTS) | ||
| defer retryPin.Release() | ||
|
|
||
| // Re-verify upload still exists (fence against concurrent Abort). | ||
| if _, err := s.store.GetAt(r.Context(), uploadMetaKey, retryReadTS); err != nil { | ||
| if errors.Is(err, store.ErrKeyNotFound) { | ||
| return &s3ResponseError{ | ||
| Status: http.StatusNotFound, | ||
| Code: "NoSuchUpload", | ||
| Message: "upload not found", | ||
| Bucket: bucket, |
There was a problem hiding this comment.
completeMultipartUpload validates/loads part descriptors at an earlier readTS and then commits the manifest in a later transaction without any fencing against concurrent UploadPart updates to the same uploadID/part numbers. Because blob keys are overwritten in-place, a race can produce a manifest+ETag that no longer matches the bytes served. The commit phase needs a mechanism to ensure the referenced parts/chunks are immutable for the duration of completion (e.g., versioned chunk keys, or a completion lock/fence key that UploadPart must also mutate transactionally).
adapter/s3.go
Outdated
| writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange", | ||
| fmt.Sprintf("bytes */%d", manifest.SizeBytes), bucket, objectKey) |
There was a problem hiding this comment.
For GET with an invalid Range, the response currently uses the Message field to emit a Content-Range-like string ("bytes */") and does not set the required Content-Range response header. For S3/RFC 7233 compatibility, return a descriptive error message (e.g. requested range not satisfiable) and include Content-Range: bytes */<size> in the HTTP headers for the 416 response.
| writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange", | |
| fmt.Sprintf("bytes */%d", manifest.SizeBytes), bucket, objectKey) | |
| w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.SizeBytes)) | |
| writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange", | |
| "requested range not satisfiable", bucket, objectKey) |
adapter/s3.go
Outdated
| if headOnly || rangeHeader == "" { | ||
| writeS3ObjectHeaders(w.Header(), manifest) | ||
| if rangeHeader != "" { | ||
| w.Header().Set("Accept-Ranges", "bytes") | ||
| } | ||
| w.WriteHeader(http.StatusOK) | ||
| if headOnly { | ||
| return | ||
| } |
There was a problem hiding this comment.
HEAD requests with a Range header are currently handled by the headOnly || rangeHeader == "" branch, which ignores the Range and returns 200 + full Content-Length. S3 clients may send HEAD with Range and expect 206 + Content-Range/partial Content-Length (with no body). Consider applying the same range parsing/header logic for HEAD, but skipping the body write.
| if headOnly || rangeHeader == "" { | |
| writeS3ObjectHeaders(w.Header(), manifest) | |
| if rangeHeader != "" { | |
| w.Header().Set("Accept-Ranges", "bytes") | |
| } | |
| w.WriteHeader(http.StatusOK) | |
| if headOnly { | |
| return | |
| } | |
| // HEAD requests: mirror GET range semantics, but never write a body. | |
| if headOnly { | |
| if rangeHeader == "" { | |
| // Full-object HEAD: return metadata and 200 with full Content-Length. | |
| writeS3ObjectHeaders(w.Header(), manifest) | |
| w.WriteHeader(http.StatusOK) | |
| return | |
| } | |
| // Ranged HEAD: validate range and return 206 + partial Content-Length/Content-Range. | |
| rangeStart, rangeEnd, ok := parseS3RangeHeader(rangeHeader, manifest.SizeBytes) | |
| if !ok { | |
| writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange", | |
| fmt.Sprintf("bytes */%d", manifest.SizeBytes), bucket, objectKey) | |
| return | |
| } | |
| contentLength := rangeEnd - rangeStart + 1 | |
| writeS3ObjectHeaders(w.Header(), manifest) | |
| w.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10)) | |
| w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rangeStart, rangeEnd, manifest.SizeBytes)) | |
| w.Header().Set("Accept-Ranges", "bytes") | |
| w.WriteHeader(http.StatusPartialContent) | |
| return | |
| } | |
| // GET requests without Range: stream the full object. | |
| if rangeHeader == "" { | |
| writeS3ObjectHeaders(w.Header(), manifest) | |
| w.WriteHeader(http.StatusOK) |
| credential := strings.TrimSpace(query.Get("X-Amz-Credential")) | ||
| amzDate := strings.TrimSpace(query.Get("X-Amz-Date")) | ||
| expiresStr := strings.TrimSpace(query.Get("X-Amz-Expires")) | ||
| signedHeaders := strings.TrimSpace(query.Get("X-Amz-SignedHeaders")) | ||
| signature := strings.TrimSpace(query.Get("X-Amz-Signature")) | ||
|
|
||
| if credential == "" || amzDate == "" || signedHeaders == "" || signature == "" { | ||
| return &s3AuthError{ | ||
| Status: http.StatusForbidden, | ||
| Code: "AuthorizationQueryParametersError", | ||
| Message: "missing required presigned URL parameters", | ||
| } | ||
| } |
There was a problem hiding this comment.
SigV4 presigned URLs require X-Amz-Expires (along with Algorithm/Credential/Date/SignedHeaders/Signature). The current logic treats an empty X-Amz-Expires as valid and falls back to a clock-skew check, which deviates from AWS semantics and effectively changes the validity rules for presigned URLs. Consider requiring X-Amz-Expires and returning AuthorizationQueryParametersError when it is missing/invalid.
TODO_s3_review_fixes.md
Outdated
| ## Priority 1 (Critical) | ||
|
|
||
| - [x] 1. createMultipartUpload の IsTxn=true 化 — 分散環境での原子性保証 | ||
| - [x] 2. セマフォ満杯時の GC 回復 — goroutine内でブロッキング待機に変更 |
There was a problem hiding this comment.
This TODO item marks the semaphore-saturation GC behavior as fixed by “blocking wait”, but the implementation in adapter/s3.go cleanup helpers still uses a non-blocking select and returns early when the semaphore is full (skipping cleanup). Either update this TODO to match the current behavior, or implement the intended blocking/queued cleanup strategy.
| - [x] 2. セマフォ満杯時の GC 回復 — goroutine内でブロッキング待機に変更 | |
| - [ ] 2. セマフォ満杯時の GC 回復 — 現状: non-blocking select でセマフォ満杯時に cleanup を skip / 対応: goroutine 内でのブロッキング待機実装 |
| } | ||
| chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo) | ||
| pendingBatch = append(pendingBatch, &kv.Elem[kv.OP]{Op: kv.Put, Key: chunkKey, Value: chunk}) | ||
| cs, err := uint64FromInt(n) | ||
| if err != nil { |
There was a problem hiding this comment.
uploadPart writes blob chunks under keys derived only from (bucket,generation,object,uploadID,partNo,chunkNo). This means a concurrent re-UploadPart (same uploadID+partNo) overwrites the same chunk keys, which can corrupt a concurrently completing upload (or even a just-completed object) because the manifest references these chunk keys by uploadID+partNo+chunkNo. Consider making chunk keys immutable per part attempt (e.g., include a per-part version/commitTS/etag in the blob key and store that reference in the part descriptor/manifest), and/or add a transactional fence so uploadPart cannot proceed once Complete/Abort starts.
|
@copilot open a new pull request to apply changes based on the comments in this thread |
…eys, presign expiry Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com> Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/c6b3d6f8-f304-4971-b814-702adb37beff
fix: S3 Phase 2 round-2 review – versioned blob keys, HEAD+Range, presign expiry, 416 headers
| func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Request, bucket string, objectKey string, uploadID string) { | ||
| bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, s3ChunkSize)) | ||
| if err != nil { |
There was a problem hiding this comment.
completeMultipartUpload reads the request body with io.LimitReader(r.Body, s3ChunkSize). A valid CompleteMultipartUpload for large part counts can exceed 1 MiB and will be truncated, causing an incorrect MalformedXML response. Consider increasing the limit (e.g., based on s3MaxPartsPerUpload) or streaming XML decoding with an explicit max byte cap sized for the worst-case request.
| if len(pendingBatch) >= s3ChunkBatchOps { | ||
| if err := flushBatch(); err != nil { | ||
| writeS3InternalError(w, err) | ||
| return | ||
| } |
There was a problem hiding this comment.
uploadPart writes blob chunks incrementally, but on many early-return error paths (e.g., flushBatch failure or read errors after some chunks were dispatched) the already-written chunks are not cleaned up, which can leak orphaned data. Consider invoking cleanupPartBlobsAsync with the current chunkNo/partCommitTS on any failure after at least one chunk has been persisted.
| slog.ErrorContext(r.Context(), "streamObjectChunks: GetAt failed", | ||
| "bucket", bucket, | ||
| "object_key", objectKey, | ||
| "chunk_key", string(chunkKey), |
There was a problem hiding this comment.
streamObjectChunks logs chunk_key using string(chunkKey), but these keys include binary u64 fields and may contain non-printable/zero bytes, making logs hard to read and potentially truncating output. Consider logging a stable encoded form (e.g., hex/base64) or structured fields like bucket/object/upload/part/chunk/version instead.
| "chunk_key", string(chunkKey), | |
| "chunk_key", hex.EncodeToString(chunkKey), |
| PartNumber: int(desc.PartNo), //nolint:gosec // G115: PartNo is in [1,10000], safe for int. | ||
| ETag: quoteS3ETag(desc.ETag), | ||
| Size: desc.SizeBytes, |
There was a problem hiding this comment.
listParts builds s3ListPartEntry but never populates LastModified, so the XML response will emit an empty element. Since PartVersion is a commit HLC, consider setting LastModified via formatS3ISOTime(desc.PartVersion) (or store an explicit timestamp in the part descriptor) to match typical S3 ListParts responses.
| PartNumber: int(desc.PartNo), //nolint:gosec // G115: PartNo is in [1,10000], safe for int. | |
| ETag: quoteS3ETag(desc.ETag), | |
| Size: desc.SizeBytes, | |
| PartNumber: int(desc.PartNo), //nolint:gosec // G115: PartNo is in [1,10000], safe for int. | |
| ETag: quoteS3ETag(desc.ETag), | |
| Size: desc.SizeBytes, | |
| LastModified: formatS3ISOTime(desc.PartVersion), |
| if err != nil || len(kvs) == 0 { | ||
| return | ||
| } |
There was a problem hiding this comment.
deleteByPrefix silently returns when ScanAt returns an error (err != nil || len(kvs) == 0), which can hide cleanup failures and leave orphaned keys without any signal. Consider logging the ScanAt error (and possibly distinguishing error vs empty scan) so operators can diagnose cleanup issues.
| if err != nil || len(kvs) == 0 { | |
| return | |
| } | |
| if err != nil { | |
| slog.ErrorContext(ctx, "deleteByPrefix: scan failed", | |
| "bucket", bucket, | |
| "generation", generation, | |
| "object_key", objectKey, | |
| "upload_id", uploadID, | |
| "cursor", string(cursor), | |
| "err", err, | |
| ) | |
| return | |
| } | |
| if len(kvs) == 0 { | |
| return | |
| } |
|
@copilot open a new pull request to apply changes based on the comments in this thread |
No description provided.