Skip to content

feat: implement S3 Phase 2 - multipart upload, range reads, presigned URLs#430

Open
bootjp wants to merge 11 commits intomainfrom
feature/s3-impl-p2
Open

feat: implement S3 Phase 2 - multipart upload, range reads, presigned URLs#430
bootjp wants to merge 11 commits intomainfrom
feature/s3-impl-p2

Conversation

@bootjp
Copy link
Owner

@bootjp bootjp commented Mar 25, 2026

No description provided.

… URLs

- Multipart upload flow: CreateMultipartUpload, UploadPart,
  CompleteMultipartUpload, AbortMultipartUpload, ListParts
- Range reads with HTTP 206 Partial Content support
- SigV4 presigned URL authentication
- Abort/Complete race fencing via OCC transactions
- OCC retry for Complete and Abort operations
- Atomic UploadMeta + GCUploadKey creation (IsTxn=true)
- PartDescriptor write transaction safety
- Rolling readTS in deleteByPrefix to avoid compaction blocking
- Blocking cleanup semaphore to guarantee GC execution
- s3keys: ParseUploadPartKey, UploadPartPrefixForUpload, BlobPrefixForUpload
@bootjp bootjp requested a review from Copilot March 25, 2026 08:34
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request delivers the second phase of S3 compatibility enhancements, introducing critical features such as multipart uploads, range reads, and presigned URLs. These additions significantly expand the S3 adapter's capabilities, enabling robust handling of large files, optimized data streaming, and secure, temporary access to stored objects, thereby improving overall S3 protocol adherence and utility.

Highlights

  • Multipart Upload Implementation: Added full support for S3 multipart uploads, including initiation, part uploading, completion, and abortion of uploads.
  • Range Read Support: Enabled efficient partial content retrieval for S3 objects using HTTP Range headers.
  • Presigned URL Functionality: Implemented validation and authorization for S3 presigned URLs, allowing secure, time-limited access to resources.
  • S3 API Expansion: Introduced new API endpoints and internal data structures to handle the new S3 operations.
  • Comprehensive Testing: Added extensive unit tests to cover the new multipart upload, range read, and presigned URL features, including various edge cases and error conditions.
  • Asynchronous Cleanup: Implemented background cleanup mechanisms for orphaned multipart upload data.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request significantly enhances the S3 adapter by implementing core S3 multipart upload functionality, including initiate, upload parts, complete, abort, and list parts operations. It also adds support for S3 SigV4 presigned URL authorization and object range reads. New data structures, constants, and helper functions are introduced across adapter/s3.go, adapter/s3_auth.go, and internal/s3keys/keys.go, accompanied by extensive test coverage in adapter/s3_test.go and internal/s3keys/keys_test.go. The TODO_s3_review_fixes.md indicates that this PR addresses several S3-related issues. Feedback includes addressing a potential race condition in completeMultipartUpload where the bucket's generation is not re-verified within the retry loop, which could lead to an orphaned object manifest. Additionally, errors during chunk streaming in streamObjectChunks are silently ignored and should be logged for better debugging and client error indication.

retryPin := s.pinReadTS(retryReadTS)
defer retryPin.Release()

// Re-verify upload still exists (fence against concurrent Abort).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Inside the retryS3Mutation loop for completeMultipartUpload, the existence of the upload is re-verified, but the bucket's existence and generation are not. If the bucket is deleted and possibly recreated between Phase 1 (validation) and Phase 2 (commit), this could lead to creating an orphaned object manifest under an old bucket generation. It would be safer to re-verify the bucket's status within the retry loop to prevent this.

        // Re-verify bucket still exists and generation hasn't changed.
			currentMeta, exists, err := s.loadBucketMetaAt(r.Context(), bucket, retryReadTS)
			if err != nil {
				return errors.WithStack(err)
			}
			if !exists || currentMeta.Generation != generation {
				return &s3ResponseError{
					Status:  http.StatusConflict,
					Code:    "OperationAborted",
					Message: "Bucket has been modified, please retry.",
					Bucket:  bucket,
					Key:     objectKey,
				}
			}

			// Re-verify upload still exists (fence against concurrent Abort).
References
  1. This comment addresses a race condition in a read-modify-write operation where the bucket's state (existence, generation) can change between validation and commit, leading to an inconsistent snapshot. Re-verifying the state within the retry loop helps maintain consistency, aligning with the principle of ensuring a consistent snapshot in transactional operations.
  2. The issue described involves the potential use of stale data (an old bucket generation) in an atomic operation. Re-verifying the bucket's generation within the retry loop ensures that the operation proceeds with up-to-date state, preventing race conditions and the use of stale data, as per the guideline for atomic operations based on state variables.

chunk, err := s.store.GetAt(r.Context(), chunkKey, readTS)
if err != nil {
writeS3InternalError(w, err)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Errors during chunk streaming are silently ignored, which will result in a truncated response for the client without any error indication. It's better to log these errors to aid in debugging. This also applies to other error checks in this function, such as w.Write failures.

slog.ErrorContext(r.Context(), "failed to get object chunk for streaming", "bucket", bucket, "object", objectKey, "generation", generation, "upload_id", manifest.UploadID, "err", err)
				return

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the adapter/ S3-compatible API and internal/s3keys/ key helpers to support “S3 Phase 2” functionality: multipart uploads (init/upload part/list parts/complete/abort), HTTP Range reads, and SigV4 presigned URL authorization.

Changes:

  • Add multipart upload flows to the S3 server (init/upload part/list parts/complete/abort) plus background cleanup helpers.
  • Implement single-range Range: bytes=... reads for GET object responses.
  • Add presigned URL authorization, plus new s3keys helpers for multipart part/blob prefixing and parsing, and a broad test suite covering the new behaviors.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 7 comments.

Show a summary per file
File Description
internal/s3keys/keys.go Adds multipart key prefix builders and a parser for upload part keys.
internal/s3keys/keys_test.go Adds round-trip / prefix / zero-byte coverage for multipart key helpers.
adapter/s3.go Implements multipart upload endpoints, range reads, and cleanup-by-prefix helpers.
adapter/s3_auth.go Adds SigV4 presigned URL authorization path.
adapter/s3_test.go Adds integration-style tests for multipart flows, range reads, and presigned URLs.
TODO_s3_review_fixes.md Tracks review follow-ups addressed by this phase.

Comment on lines +851 to +860
if left == "" {
// suffix range: bytes=-N
n, err := strconv.ParseInt(right, 10, 64)
if err != nil || n <= 0 {
return 0, 0, false
}
if n > totalSize {
n = totalSize
}
return totalSize - n, totalSize - 1, true
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parseS3RangeHeader can return ok=true for suffix ranges when totalSize == 0 (e.g. bytes=-4), producing an invalid start=0,end=-1 and leading to malformed Content-Range/length handling. Consider explicitly rejecting Range requests when totalSize <= 0 (or handling empty objects as 416).

Copilot uses AI. Check for mistakes.
Comment on lines +808 to 816
chunkIndex, err := uint64FromInt(chunkIdx)
if err != nil {
writeS3InternalError(w, err)
return
}
chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex)
chunkKey := s3keys.BlobKey(bucket, generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex)
chunk, err := s.store.GetAt(r.Context(), chunkKey, readTS)
if err != nil {
writeS3InternalError(w, err)
return
}
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamObjectChunks currently drops errors from uint64FromInt and store.GetAt by just returning, after headers/status may already be written. This can result in clients receiving truncated 200/206 responses with no indication of an internal error. Consider returning an error from streamObjectChunks (and logging/writing an error before headers are committed when possible), or at least logging failures so partial responses are observable.

Copilot uses AI. Check for mistakes.
adapter/s3.go Outdated
Comment on lines +1294 to +1303
// Atomically: write manifest, delete UploadMeta + GCUpload (fence against Abort).
_, err = s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
CommitTS: commitTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: headKey, Value: manifestBody},
{Op: kv.Del, Key: uploadMetaKey},
{Op: kv.Del, Key: s3keys.GCUploadKey(bucket, meta.Generation, objectKey, uploadID)},
},
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completeMultipartUpload commits the object manifest without touching BucketMetaKey, so it won’t conflict with a concurrent bucket deletion. This can allow DELETE bucket to succeed while a multipart completion concurrently writes a new manifest, leaving orphaned data and a bucket that appears deleted but still has object state. Consider adding a bucket-meta fence write in the completion transaction (similar to putObject) or otherwise making bucket deletion conflict with multipart completion.

Copilot uses AI. Check for mistakes.
adapter/s3.go Outdated
Comment on lines +1713 to +1714
go func() {
s.cleanupSem <- struct{}{}
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async cleanup now spawns a goroutine unconditionally and only then blocks on s.cleanupSem <- struct{}{}. Under sustained load (cleanup semaphore saturated), this can accumulate an unbounded number of blocked goroutines and increase memory/CPU pressure. Consider acquiring the semaphore before spawning (with a non-blocking try-acquire or bounded queue) to keep goroutine growth bounded.

Suggested change
go func() {
s.cleanupSem <- struct{}{}
// Try to acquire a cleanup slot without blocking the caller. If the semaphore
// is saturated, skip scheduling this async cleanup to avoid unbounded
// goroutine accumulation under load.
select {
case s.cleanupSem <- struct{}{}:
// acquired; proceed
default:
// no capacity; drop this cleanup attempt
return
}
go func() {

Copilot uses AI. Check for mistakes.
Comment on lines +366 to +417
// Use the SDK's PresignHTTP to rebuild the expected presigned URL.
// Strip all presign query params to get a clean base request,
// then presign it with the same parameters.
verifyURL := *r.URL
q := verifyURL.Query()
for _, param := range []string{"X-Amz-Algorithm", "X-Amz-Credential", "X-Amz-Date", "X-Amz-Expires", "X-Amz-SignedHeaders", "X-Amz-Signature", "X-Amz-Security-Token"} {
q.Del(param)
}
verifyURL.RawQuery = q.Encode()

verifyReq, err := http.NewRequestWithContext(context.Background(), r.Method, verifyURL.String(), nil)
if err != nil {
return &s3AuthError{
Status: http.StatusForbidden,
Code: "SignatureDoesNotMatch",
Message: "failed to verify presigned request signature",
}
}
verifyReq.Host = r.Host
// Copy only the signed headers from the original request.
for _, h := range strings.Split(signedHeaders, ";") {
h = strings.TrimSpace(h)
if h == "" || strings.EqualFold(h, "host") {
continue
}
verifyReq.Header.Set(h, r.Header.Get(h))
}

signer := v4.NewSigner(func(opts *v4.SignerOptions) {
opts.DisableURIPathEscaping = true
})
creds := aws.Credentials{
AccessKeyID: accessKeyID,
SecretAccessKey: secretAccessKey,
Source: "elastickv-s3-presign",
}
presignedURL, _, err := signer.PresignHTTP(context.Background(), creds, verifyReq, s3UnsignedPayload, "s3", s.effectiveRegion(), signingTime)
if err != nil {
return &s3AuthError{
Status: http.StatusForbidden,
Code: "SignatureDoesNotMatch",
Message: "failed to verify presigned request signature",
}
}
expectedSig := extractPresignedSignature(presignedURL)
if expectedSig == "" || subtle.ConstantTimeCompare([]byte(signature), []byte(expectedSig)) != 1 {
return &s3AuthError{
Status: http.StatusForbidden,
Code: "SignatureDoesNotMatch",
Message: "presigned URL signature does not match",
}
}
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

authorizePresignedRequest re-generates a presigned URL after deleting X-Amz-Expires (and other presign params) from the request, then calls PresignHTTP without reapplying the original expiry. This will only validate URLs created with the signer’s default expiry (typically 900s) and will reject valid presigned URLs with a custom X-Amz-Expires. Consider preserving and using the request’s X-Amz-Expires when recomputing the expected signature (or computing the expected signature directly from the canonical request/query parameters).

Copilot uses AI. Check for mistakes.
Comment on lines +1109 to +1117
if _, err := s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
CommitTS: partCommitTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: partKey, Value: descBody},
},
}); err != nil {
writeS3InternalError(w, err)
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uploadPart writes blob chunks before it transactionally writes the part descriptor. If the descriptor transaction fails (or the request errors after some chunks were written), those chunks remain in the store (and if a retry uploads fewer chunks, old higher-index chunks for the same part can become permanent garbage). Consider adding cleanup on failures (e.g., delete the written chunk keys on error) or writing a small “in-progress” descriptor first so orphaned chunks can be discovered and garbage-collected reliably.

Suggested change
if _, err := s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
CommitTS: partCommitTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: partKey, Value: descBody},
},
}); err != nil {
writeS3InternalError(w, err)
var (
partWriteErr error
backoff = s3TxnRetryInitialBackoff
)
for attempt := 0; attempt < s3TxnRetryMaxAttempts; attempt++ {
if _, partWriteErr = s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
CommitTS: partCommitTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: partKey, Value: descBody},
},
}); partWriteErr == nil {
break
}
// Do not retry if the client has gone away or the context is done.
if errors.Is(partWriteErr, context.Canceled) || errors.Is(partWriteErr, context.DeadlineExceeded) {
break
}
// Back off before retrying, capping at the maximum backoff.
time.Sleep(backoff)
backoff *= s3TxnRetryBackoffFactor
if backoff > s3TxnRetryMaxBackoff {
backoff = s3TxnRetryMaxBackoff
}
}
if partWriteErr != nil {
writeS3InternalError(w, partWriteErr)

Copilot uses AI. Check for mistakes.
adapter/s3.go Outdated
Comment on lines +1141 to +1144

// Parts must be in ascending order.
for i := 1; i < len(completionReq.Parts); i++ {
if completionReq.Parts[i].PartNumber <= completionReq.Parts[i-1].PartNumber {
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completeMultipartUpload defines s3MaxPartsPerUpload but doesn’t enforce any upper bound on len(completionReq.Parts). Even though AWS caps at 10,000 parts, a large XML body could still drive significant CPU/memory work (and the constant suggests an intended limit). Consider rejecting requests with too many parts early (and/or validating PartNumber range) to bound work.

Suggested change
// Parts must be in ascending order.
for i := 1; i < len(completionReq.Parts); i++ {
if completionReq.Parts[i].PartNumber <= completionReq.Parts[i-1].PartNumber {
if len(completionReq.Parts) > s3MaxPartsPerUpload {
writeS3Error(w, http.StatusBadRequest, "InvalidArgument", "too many parts in CompleteMultipartUpload request", bucket, objectKey)
return
}
// Parts must be in ascending order and within the allowed part number range.
for i, part := range completionReq.Parts {
if part.PartNumber < s3MinPartNumber || part.PartNumber > s3MaxPartNumber {
writeS3Error(w, http.StatusBadRequest, "InvalidArgument", "part number out of allowed range", bucket, objectKey)
return
}
if i > 0 && part.PartNumber <= completionReq.Parts[i-1].PartNumber {

Copilot uses AI. Check for mistakes.
@bootjp
Copy link
Owner Author

bootjp commented Mar 25, 2026

@copilot open a new pull request to apply changes based on the comments in this thread

Copy link
Contributor

Copilot AI commented Mar 25, 2026

@bootjp I've opened a new pull request, #431, to work on those changes. Once the pull request is ready, I'll request review from you.

Copilot AI and others added 4 commits March 25, 2026 09:27
- parseS3RangeHeader: reject suffix ranges on zero-size objects (416)
- streamObjectChunks: add slog error logging for silent failures
- completeMultipartUpload: add BucketMetaKey fence write to prevent race with DELETE bucket
- async cleanup helpers: use non-blocking select on semaphore before spawning goroutines
- authorizePresignedRequest: re-add X-Amz-Expires to verify URL before PresignHTTP
- uploadPart: add cleanupPartBlobsAsync on descriptor write failure
- completeMultipartUpload: enforce max parts limit and part number range validation
- tests: add TestS3Server_RangeReadEmptyObject, TestS3Server_CompleteMultipartUploadTooManyParts, TestS3Server_CompleteMultipartUploadOutOfRangePartNumber

Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/4c9d1e6c-6404-43d9-9c0f-3fdb1727c6ec
- Fix gofmt: remove extra spaces in s3CompleteMultipartUploadRequest struct tags
- Add //nolint:cyclop to handleObject, streamObjectChunks, parseS3RangeHeader, listParts
- Extend //nolint on uploadPart to include gocognit
- Extend //nolint on completeMultipartUpload to include gocyclo
- Add //nolint:gosec G115 comments on bounded int<->uint64 conversions (lines 804, 1219, 1484, 1488)

Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/55461242-6306-47fc-8b95-39557c7869dc
…ck, gosec G704)

- mnd: replace 1<<20 with existing s3ChunkSize constant in completeMultipartUpload
- mnd: add s3RangeSplitParts=2 constant; use in SplitN and len check in parseS3RangeHeader
- nestif: flatten 'if n > 0' block in uploadPart read loop by inverting condition
- nestif: extract checkPresignExpiry helper to reduce nesting in authorizePresignedRequest
- staticcheck S1016: use s3ObjectPart(desc) type conversion instead of struct literal
- staticcheck S1039: remove unnecessary fmt.Sprintf wrapper on raw string literal in test
- gosec G704: add nolint comment on http.NewRequestWithContext – URL is request-local, no outbound network call

Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/18e808ef-b8dc-46c8-bd6d-e4d4e566a32a
fix: address S3 Phase 2 review – range reads, multipart safety, presigned URL, cleanup goroutines
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 6 comments.

Comment on lines +1193 to +1299
// Phase 1: Read and validate parts (outside retry — idempotent, avoids O(parts) re-reads).
readTS := s.readTS()
readPin := s.pinReadTS(readTS)

meta, exists, err := s.loadBucketMetaAt(r.Context(), bucket, readTS)
if err != nil {
readPin.Release()
writeS3InternalError(w, err)
return
}
if !exists || meta == nil {
readPin.Release()
writeS3Error(w, http.StatusNotFound, "NoSuchBucket", "bucket not found", bucket, objectKey)
return
}
generation := meta.Generation

uploadMetaKey := s3keys.UploadMetaKey(bucket, meta.Generation, objectKey, uploadID)
uploadMetaRaw, err := s.store.GetAt(r.Context(), uploadMetaKey, readTS)
if err != nil {
readPin.Release()
if errors.Is(err, store.ErrKeyNotFound) {
writeS3Error(w, http.StatusNotFound, "NoSuchUpload", "upload not found", bucket, objectKey)
return
}
writeS3InternalError(w, err)
return
}
var uploadMeta s3UploadMeta
if err := json.Unmarshal(uploadMetaRaw, &uploadMeta); err != nil {
readPin.Release()
writeS3InternalError(w, err)
return
}

manifestParts := make([]s3ObjectPart, 0, len(completionReq.Parts))
md5Concat := md5.New() //nolint:gosec // S3 composite ETag requires MD5.
totalSize := int64(0)

for i, reqPart := range completionReq.Parts {
partKey := s3keys.UploadPartKey(bucket, meta.Generation, objectKey, uploadID, uint64(reqPart.PartNumber)) //nolint:gosec // G115: PartNumber validated in [1,10000].
raw, err := s.store.GetAt(r.Context(), partKey, readTS)
if err != nil {
readPin.Release()
if errors.Is(err, store.ErrKeyNotFound) {
writeS3Error(w, http.StatusBadRequest, "InvalidPart", fmt.Sprintf("part %d not found", reqPart.PartNumber), bucket, objectKey)
return
}
writeS3InternalError(w, err)
return
}
var desc s3PartDescriptor
if err := json.Unmarshal(raw, &desc); err != nil {
readPin.Release()
writeS3InternalError(w, err)
return
}
reqETag := strings.Trim(reqPart.ETag, `"`)
if reqETag != desc.ETag {
readPin.Release()
writeS3Error(w, http.StatusBadRequest, "InvalidPart",
fmt.Sprintf("part %d ETag mismatch: got %q, want %q", reqPart.PartNumber, reqETag, desc.ETag), bucket, objectKey)
return
}
if i < len(completionReq.Parts)-1 && desc.SizeBytes < int64(s3MinPartSize) {
readPin.Release()
writeS3Error(w, http.StatusBadRequest, "EntityTooSmall",
fmt.Sprintf("part %d is too small (%d bytes)", reqPart.PartNumber, desc.SizeBytes), bucket, objectKey)
return
}

partMD5, err := hex.DecodeString(desc.ETag)
if err != nil {
readPin.Release()
writeS3InternalError(w, err)
return
}
if _, err := md5Concat.Write(partMD5); err != nil {
readPin.Release()
writeS3InternalError(w, err)
return
}

manifestParts = append(manifestParts, s3ObjectPart(desc))
totalSize += desc.SizeBytes
}
readPin.Release()

compositeETag := hex.EncodeToString(md5Concat.Sum(nil)) + fmt.Sprintf("-%d", len(completionReq.Parts))

// Phase 2: Commit (inside retry — only fencing + manifest write).
var previous *s3ObjectManifest

err = s.retryS3Mutation(r.Context(), func() error {
retryReadTS := s.readTS()
startTS := s.txnStartTS(retryReadTS)
retryPin := s.pinReadTS(retryReadTS)
defer retryPin.Release()

// Re-verify upload still exists (fence against concurrent Abort).
if _, err := s.store.GetAt(r.Context(), uploadMetaKey, retryReadTS); err != nil {
if errors.Is(err, store.ErrKeyNotFound) {
return &s3ResponseError{
Status: http.StatusNotFound,
Code: "NoSuchUpload",
Message: "upload not found",
Bucket: bucket,
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completeMultipartUpload validates/loads part descriptors at an earlier readTS and then commits the manifest in a later transaction without any fencing against concurrent UploadPart updates to the same uploadID/part numbers. Because blob keys are overwritten in-place, a race can produce a manifest+ETag that no longer matches the bytes served. The commit phase needs a mechanism to ensure the referenced parts/chunks are immutable for the duration of completion (e.g., versioned chunk keys, or a completion lock/fence key that UploadPart must also mutate transactionally).

Copilot uses AI. Check for mistakes.
adapter/s3.go Outdated
Comment on lines +779 to +780
writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange",
fmt.Sprintf("bytes */%d", manifest.SizeBytes), bucket, objectKey)
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For GET with an invalid Range, the response currently uses the Message field to emit a Content-Range-like string ("bytes */") and does not set the required Content-Range response header. For S3/RFC 7233 compatibility, return a descriptive error message (e.g. requested range not satisfiable) and include Content-Range: bytes */<size> in the HTTP headers for the 416 response.

Suggested change
writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange",
fmt.Sprintf("bytes */%d", manifest.SizeBytes), bucket, objectKey)
w.Header().Set("Content-Range", fmt.Sprintf("bytes */%d", manifest.SizeBytes))
writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange",
"requested range not satisfiable", bucket, objectKey)

Copilot uses AI. Check for mistakes.
adapter/s3.go Outdated
Comment on lines +764 to +772
if headOnly || rangeHeader == "" {
writeS3ObjectHeaders(w.Header(), manifest)
if rangeHeader != "" {
w.Header().Set("Accept-Ranges", "bytes")
}
w.WriteHeader(http.StatusOK)
if headOnly {
return
}
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HEAD requests with a Range header are currently handled by the headOnly || rangeHeader == "" branch, which ignores the Range and returns 200 + full Content-Length. S3 clients may send HEAD with Range and expect 206 + Content-Range/partial Content-Length (with no body). Consider applying the same range parsing/header logic for HEAD, but skipping the body write.

Suggested change
if headOnly || rangeHeader == "" {
writeS3ObjectHeaders(w.Header(), manifest)
if rangeHeader != "" {
w.Header().Set("Accept-Ranges", "bytes")
}
w.WriteHeader(http.StatusOK)
if headOnly {
return
}
// HEAD requests: mirror GET range semantics, but never write a body.
if headOnly {
if rangeHeader == "" {
// Full-object HEAD: return metadata and 200 with full Content-Length.
writeS3ObjectHeaders(w.Header(), manifest)
w.WriteHeader(http.StatusOK)
return
}
// Ranged HEAD: validate range and return 206 + partial Content-Length/Content-Range.
rangeStart, rangeEnd, ok := parseS3RangeHeader(rangeHeader, manifest.SizeBytes)
if !ok {
writeS3Error(w, http.StatusRequestedRangeNotSatisfiable, "InvalidRange",
fmt.Sprintf("bytes */%d", manifest.SizeBytes), bucket, objectKey)
return
}
contentLength := rangeEnd - rangeStart + 1
writeS3ObjectHeaders(w.Header(), manifest)
w.Header().Set("Content-Length", strconv.FormatInt(contentLength, 10))
w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", rangeStart, rangeEnd, manifest.SizeBytes))
w.Header().Set("Accept-Ranges", "bytes")
w.WriteHeader(http.StatusPartialContent)
return
}
// GET requests without Range: stream the full object.
if rangeHeader == "" {
writeS3ObjectHeaders(w.Header(), manifest)
w.WriteHeader(http.StatusOK)

Copilot uses AI. Check for mistakes.
Comment on lines +305 to +317
credential := strings.TrimSpace(query.Get("X-Amz-Credential"))
amzDate := strings.TrimSpace(query.Get("X-Amz-Date"))
expiresStr := strings.TrimSpace(query.Get("X-Amz-Expires"))
signedHeaders := strings.TrimSpace(query.Get("X-Amz-SignedHeaders"))
signature := strings.TrimSpace(query.Get("X-Amz-Signature"))

if credential == "" || amzDate == "" || signedHeaders == "" || signature == "" {
return &s3AuthError{
Status: http.StatusForbidden,
Code: "AuthorizationQueryParametersError",
Message: "missing required presigned URL parameters",
}
}
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SigV4 presigned URLs require X-Amz-Expires (along with Algorithm/Credential/Date/SignedHeaders/Signature). The current logic treats an empty X-Amz-Expires as valid and falls back to a clock-skew check, which deviates from AWS semantics and effectively changes the validity rules for presigned URLs. Consider requiring X-Amz-Expires and returning AuthorizationQueryParametersError when it is missing/invalid.

Copilot uses AI. Check for mistakes.
## Priority 1 (Critical)

- [x] 1. createMultipartUpload の IsTxn=true 化 — 分散環境での原子性保証
- [x] 2. セマフォ満杯時の GC 回復 — goroutine内でブロッキング待機に変更
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO item marks the semaphore-saturation GC behavior as fixed by “blocking wait”, but the implementation in adapter/s3.go cleanup helpers still uses a non-blocking select and returns early when the semaphore is full (skipping cleanup). Either update this TODO to match the current behavior, or implement the intended blocking/queued cleanup strategy.

Suggested change
- [x] 2. セマフォ満杯時の GC 回復 — goroutine内でブロッキング待機に変更
- [ ] 2. セマフォ満杯時の GC 回復 — 現状: non-blocking select でセマフォ満杯時に cleanup を skip / 対応: goroutine 内でのブロッキング待機実装

Copilot uses AI. Check for mistakes.
Comment on lines +1086 to +1090
}
chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo)
pendingBatch = append(pendingBatch, &kv.Elem[kv.OP]{Op: kv.Put, Key: chunkKey, Value: chunk})
cs, err := uint64FromInt(n)
if err != nil {
Copy link

Copilot AI Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uploadPart writes blob chunks under keys derived only from (bucket,generation,object,uploadID,partNo,chunkNo). This means a concurrent re-UploadPart (same uploadID+partNo) overwrites the same chunk keys, which can corrupt a concurrently completing upload (or even a just-completed object) because the manifest references these chunk keys by uploadID+partNo+chunkNo. Consider making chunk keys immutable per part attempt (e.g., include a per-part version/commitTS/etag in the blob key and store that reference in the part descriptor/manifest), and/or add a transactional fence so uploadPart cannot proceed once Complete/Abort starts.

Copilot uses AI. Check for mistakes.
@bootjp
Copy link
Owner Author

bootjp commented Mar 26, 2026

@copilot open a new pull request to apply changes based on the comments in this thread

Copy link
Contributor

Copilot AI commented Mar 26, 2026

@bootjp I've opened a new pull request, #434, to work on those changes. Once the pull request is ready, I'll request review from you.

Copilot AI and others added 2 commits March 26, 2026 02:30
…eys, presign expiry

Co-authored-by: bootjp <1306365+bootjp@users.noreply.github.com>
Agent-Logs-Url: https://github.com/bootjp/elastickv/sessions/c6b3d6f8-f304-4971-b814-702adb37beff
fix: S3 Phase 2 round-2 review – versioned blob keys, HEAD+Range, presign expiry, 416 headers
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Comment on lines +1193 to +1195
func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Request, bucket string, objectKey string, uploadID string) {
bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, s3ChunkSize))
if err != nil {
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

completeMultipartUpload reads the request body with io.LimitReader(r.Body, s3ChunkSize). A valid CompleteMultipartUpload for large part counts can exceed 1 MiB and will be truncated, causing an incorrect MalformedXML response. Consider increasing the limit (e.g., based on s3MaxPartsPerUpload) or streaming XML decoding with an explicit max byte cap sized for the worst-case request.

Copilot uses AI. Check for mistakes.
Comment on lines +1133 to +1137
if len(pendingBatch) >= s3ChunkBatchOps {
if err := flushBatch(); err != nil {
writeS3InternalError(w, err)
return
}
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uploadPart writes blob chunks incrementally, but on many early-return error paths (e.g., flushBatch failure or read errors after some chunks were dispatched) the already-written chunks are not cleaned up, which can leak orphaned data. Consider invoking cleanupPartBlobsAsync with the current chunkNo/partCommitTS on any failure after at least one chunk has been persisted.

Copilot uses AI. Check for mistakes.
slog.ErrorContext(r.Context(), "streamObjectChunks: GetAt failed",
"bucket", bucket,
"object_key", objectKey,
"chunk_key", string(chunkKey),
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

streamObjectChunks logs chunk_key using string(chunkKey), but these keys include binary u64 fields and may contain non-printable/zero bytes, making logs hard to read and potentially truncating output. Consider logging a stable encoded form (e.g., hex/base64) or structured fields like bucket/object/upload/part/chunk/version instead.

Suggested change
"chunk_key", string(chunkKey),
"chunk_key", hex.EncodeToString(chunkKey),

Copilot uses AI. Check for mistakes.
Comment on lines +1525 to +1527
PartNumber: int(desc.PartNo), //nolint:gosec // G115: PartNo is in [1,10000], safe for int.
ETag: quoteS3ETag(desc.ETag),
Size: desc.SizeBytes,
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listParts builds s3ListPartEntry but never populates LastModified, so the XML response will emit an empty element. Since PartVersion is a commit HLC, consider setting LastModified via formatS3ISOTime(desc.PartVersion) (or store an explicit timestamp in the part descriptor) to match typical S3 ListParts responses.

Suggested change
PartNumber: int(desc.PartNo), //nolint:gosec // G115: PartNo is in [1,10000], safe for int.
ETag: quoteS3ETag(desc.ETag),
Size: desc.SizeBytes,
PartNumber: int(desc.PartNo), //nolint:gosec // G115: PartNo is in [1,10000], safe for int.
ETag: quoteS3ETag(desc.ETag),
Size: desc.SizeBytes,
LastModified: formatS3ISOTime(desc.PartVersion),

Copilot uses AI. Check for mistakes.
Comment on lines +1626 to +1628
if err != nil || len(kvs) == 0 {
return
}
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteByPrefix silently returns when ScanAt returns an error (err != nil || len(kvs) == 0), which can hide cleanup failures and leave orphaned keys without any signal. Consider logging the ScanAt error (and possibly distinguishing error vs empty scan) so operators can diagnose cleanup issues.

Suggested change
if err != nil || len(kvs) == 0 {
return
}
if err != nil {
slog.ErrorContext(ctx, "deleteByPrefix: scan failed",
"bucket", bucket,
"generation", generation,
"object_key", objectKey,
"upload_id", uploadID,
"cursor", string(cursor),
"err", err,
)
return
}
if len(kvs) == 0 {
return
}

Copilot uses AI. Check for mistakes.
@bootjp
Copy link
Owner Author

bootjp commented Mar 26, 2026

@copilot open a new pull request to apply changes based on the comments in this thread

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants