diff --git a/cmd/apex/main.go b/cmd/apex/main.go index 418f345..ed3c64b 100644 --- a/cmd/apex/main.go +++ b/cmd/apex/main.go @@ -24,6 +24,7 @@ import ( "github.com/evstack/apex/pkg/fetch" "github.com/evstack/apex/pkg/metrics" "github.com/evstack/apex/pkg/profile" + "github.com/evstack/apex/pkg/s3" "github.com/evstack/apex/pkg/store" syncer "github.com/evstack/apex/pkg/sync" "github.com/evstack/apex/pkg/types" @@ -207,6 +208,53 @@ func setStoreMetrics(db store.Store, rec metrics.Recorder) { } } +func setupS3Server(cfg *config.Config, db store.Store, submitter s3.BlobSubmitter, log zerolog.Logger) (*http.Server, error) { + if !cfg.S3.Enabled { + return nil, nil + } + + var ns types.Namespace + if cfg.S3.Namespace != "" { + var err error + ns, err = types.NamespaceFromHex(cfg.S3.Namespace) + if err != nil { + return nil, fmt.Errorf("parse S3 namespace: %w", err) + } + } + + var objStore s3.ObjectStore + switch d := db.(type) { + case *store.SQLiteStore: + objStore = store.NewObjectStore(d, ns) + case *store.S3Store: + client := d.Client() + if client == nil { + return nil, errors.New("S3Store client is not *s3.Client") + } + objStore = store.NewS3ObjectStore(client) + default: + return nil, fmt.Errorf("unsupported store type for S3 API: %T", db) + } + + s3Svc := s3.NewService(objStore, submitter, ns) + s3Srv := s3.NewServer(s3Svc, cfg.S3.Region, log) + + httpSrv := &http.Server{ + Addr: cfg.S3.ListenAddr, + Handler: s3Srv, + ReadHeaderTimeout: 10 * time.Second, + } + + go func() { + log.Info().Str("addr", cfg.S3.ListenAddr).Msg("S3 API server listening") + if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error().Err(err).Msg("S3 API server error") + } + }() + + return httpSrv, nil +} + func persistNamespaces(ctx context.Context, db store.Store, namespaces []types.Namespace) error { for _, ns := range namespaces { if err := db.PutNamespace(ctx, ns); err != nil { @@ -240,6 +288,7 @@ func maybeBackfillSourceOption(cfg *config.Config, logger zerolog.Logger) (synce return syncer.WithBackfillSource(dbSrc), func() { _ = dbSrc.Close() }, nil } +//nolint:gocyclo func runIndexer(ctx context.Context, cfg *config.Config) error { // Parse namespaces from config. namespaces, err := cfg.ParsedNamespaces() @@ -275,6 +324,19 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { } defer dataFetcher.Close() //nolint:errcheck + // Setup S3 API server if enabled. + var s3Srv *http.Server + if cfg.S3.Enabled { + var submitter s3.BlobSubmitter + if s, ok := dataFetcher.(s3.BlobSubmitter); ok { + submitter = s + } + s3Srv, err = setupS3Server(cfg, db, submitter, log.Logger) + if err != nil { + return fmt.Errorf("setup S3 server: %w", err) + } + } + // Set up API layer. notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger) notifier.SetMetrics(rec) @@ -350,7 +412,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { err = coord.Run(ctx) - gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv) + gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv, s3Srv) if err != nil && !errors.Is(err, context.Canceled) { return fmt.Errorf("coordinator: %w", err) @@ -360,7 +422,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { return nil } -func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) { +func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server, s3Srv *http.Server) { stopped := make(chan struct{}) go func() { grpcSrv.GracefulStop() @@ -381,6 +443,12 @@ func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *me log.Error().Err(err).Msg("JSON-RPC server shutdown error") } + if s3Srv != nil { + if err := s3Srv.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("S3 API server shutdown error") + } + } + if metricsSrv != nil { if err := metricsSrv.Shutdown(shutdownCtx); err != nil { log.Error().Err(err).Msg("metrics server shutdown error") diff --git a/config/config.go b/config/config.go index 3bf23bb..fac599c 100644 --- a/config/config.go +++ b/config/config.go @@ -16,6 +16,7 @@ type Config struct { Metrics MetricsConfig `yaml:"metrics"` Profiling ProfilingConfig `yaml:"profiling"` Log LogConfig `yaml:"log"` + S3 S3APIConfig `yaml:"s3"` } // DataSourceConfig configures the Celestia data source. @@ -92,6 +93,14 @@ type LogConfig struct { Format string `yaml:"format"` } +// S3APIConfig configures the S3-compatible API server. +type S3APIConfig struct { + Enabled bool `yaml:"enabled"` + ListenAddr string `yaml:"listen_addr"` + Region string `yaml:"region"` + Namespace string `yaml:"namespace"` // Celestia namespace for S3 objects +} + // DefaultConfig returns a Config with sensible defaults. func DefaultConfig() Config { return Config{ @@ -132,6 +141,11 @@ func DefaultConfig() Config { Level: "info", Format: "json", }, + S3: S3APIConfig{ + Enabled: false, + ListenAddr: ":8333", + Region: "us-east-1", + }, } } diff --git a/go.mod b/go.mod index a8a14d7..02683c1 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/cockroachdb/pebble v1.1.5 github.com/filecoin-project/go-jsonrpc v0.10.1 github.com/google/orderedcode v0.0.1 + github.com/gorilla/websocket v1.4.2 github.com/prometheus/client_golang v1.23.2 github.com/rs/zerolog v1.34.0 github.com/spf13/cobra v1.10.2 @@ -50,7 +51,6 @@ require ( github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect - github.com/gorilla/websocket v1.4.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/ipfs/go-log/v2 v2.0.8 // indirect github.com/klauspost/compress v1.18.0 // indirect diff --git a/pkg/api/health_test.go b/pkg/api/health_test.go index ff52ce5..dc0d3d6 100644 --- a/pkg/api/health_test.go +++ b/pkg/api/health_test.go @@ -59,7 +59,7 @@ func TestHealthEndpoint(t *testing.T) { mux := http.NewServeMux() h.Register(mux) - req := httptest.NewRequest(http.MethodGet, "/health", nil) + req := httptest.NewRequest(http.MethodGet, "/health", nil) //nolint:noctx rec := httptest.NewRecorder() mux.ServeHTTP(rec, req) @@ -90,7 +90,7 @@ func TestReadyEndpoint(t *testing.T) { mux := http.NewServeMux() h.Register(mux) - req := httptest.NewRequest(http.MethodGet, "/health/ready", nil) + req := httptest.NewRequest(http.MethodGet, "/health/ready", nil) //nolint:noctx rec := httptest.NewRecorder() mux.ServeHTTP(rec, req) diff --git a/pkg/api/jsonrpc/server_test.go b/pkg/api/jsonrpc/server_test.go index a4006cf..686497b 100644 --- a/pkg/api/jsonrpc/server_test.go +++ b/pkg/api/jsonrpc/server_test.go @@ -168,7 +168,7 @@ func doRPC(t *testing.T, srv http.Handler, method string, params ...any) jsonRPC t.Fatalf("marshal request: %v", err) } - httpReq := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) + httpReq := httptest.NewRequest(http.MethodPost, "/", bytes.NewReader(body)) //nolint:noctx httpReq.Header.Set("Content-Type", "application/json") w := httptest.NewRecorder() srv.ServeHTTP(w, httpReq) diff --git a/pkg/fetch/celestia_node.go b/pkg/fetch/celestia_node.go index acbb4f6..97306aa 100644 --- a/pkg/fetch/celestia_node.go +++ b/pkg/fetch/celestia_node.go @@ -33,6 +33,7 @@ type blobAPI struct { GetAll func(ctx context.Context, height uint64, namespaces [][]byte) (json.RawMessage, error) GetProof func(ctx context.Context, height uint64, namespace []byte, commitment []byte) (json.RawMessage, error) Included func(ctx context.Context, height uint64, namespace []byte, proof json.RawMessage, commitment []byte) (bool, error) + Submit func(ctx context.Context, namespace []byte, data []byte, shareVersion int) (json.RawMessage, error) } // CelestiaNodeFetcher implements DataFetcher using a Celestia node's JSON-RPC API. @@ -340,6 +341,17 @@ func (f *CelestiaNodeFetcher) Included(ctx context.Context, height uint64, names return ok, nil } +// SubmitBlob submits a blob to Celestia and returns the resulting blob with height and commitment. +func (f *CelestiaNodeFetcher) SubmitBlob(ctx context.Context, namespace types.Namespace, data []byte) (*types.Blob, error) { + raw, err := f.callRawWithRetry(ctx, "blob.Submit", func(callCtx context.Context) (json.RawMessage, error) { + return f.blob.Submit(callCtx, namespace[:], data, 0) + }) + if err != nil { + return nil, fmt.Errorf("blob.Submit: %w", err) + } + return mapSubmitResult(raw, namespace, data) +} + func (f *CelestiaNodeFetcher) Close() error { f.mu.Lock() defer f.mu.Unlock() @@ -475,6 +487,29 @@ func mapBlobs(raw json.RawMessage, height uint64) ([]types.Blob, error) { return blobs, nil } +// rpcSubmitResult is the JSON response from blob.Submit. +type rpcSubmitResult struct { + Height uint64 `json:"height"` + Commitment []byte `json:"commitment"` + ShareVersion uint32 `json:"share_version"` + Index int `json:"index"` +} + +func mapSubmitResult(raw json.RawMessage, namespace types.Namespace, data []byte) (*types.Blob, error) { + var result rpcSubmitResult + if err := json.Unmarshal(raw, &result); err != nil { + return nil, fmt.Errorf("unmarshal submit result: %w", err) + } + return &types.Blob{ + Height: result.Height, + Namespace: namespace, + Data: data, + Commitment: result.Commitment, + ShareVersion: result.ShareVersion, + Index: result.Index, + }, nil +} + func namespacesToBytes(nss []types.Namespace) [][]byte { out := make([][]byte, len(nss)) for i := range nss { diff --git a/pkg/s3/server.go b/pkg/s3/server.go new file mode 100644 index 0000000..2e3232e --- /dev/null +++ b/pkg/s3/server.go @@ -0,0 +1,351 @@ +package s3 + +import ( + "encoding/xml" + "errors" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/rs/zerolog" +) + +type Server struct { + svc *Service + log zerolog.Logger + region string +} + +func NewServer(svc *Service, region string, log zerolog.Logger) *Server { + return &Server{ + svc: svc, + log: log.With().Str("component", "s3-server").Logger(), + region: region, + } +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.log.Debug().Str("method", r.Method).Str("path", r.URL.Path).Msg("request") + + bucket, key := parsePath(r.URL.Path) + query := r.URL.Query() + + switch { + case bucket == "" && key == "": + s.handleService(r, w) + case bucket != "" && key == "": + if query.Get("list-type") != "" || query.Has("prefix") || query.Has("delimiter") { + s.handleListObjects(r, w, bucket) + } else if r.Method == http.MethodGet { + s.handleBucket(r, w, bucket) + } else if r.Method == http.MethodPut { + s.handleCreateBucket(r, w, bucket) + } else if r.Method == http.MethodDelete { + s.handleDeleteBucket(r, w, bucket) + } else if r.Method == http.MethodHead { + s.handleHeadBucket(r, w, bucket) + } else { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + } + case bucket != "" && key != "": + s.handleObject(r, w, bucket, key) + default: + s.writeError(w, http.StatusBadRequest, "InvalidRequest", "invalid path") + } +} + +func parsePath(p string) (bucket, key string) { + p = strings.TrimPrefix(p, "/") + if p == "" { + return "", "" + } + parts := strings.SplitN(p, "/", 2) + bucket = parts[0] + if len(parts) > 1 { + key = parts[1] + } + return bucket, key +} + +func (s *Server) handleService(r *http.Request, w http.ResponseWriter) { + if r.Method != http.MethodGet { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + + buckets, err := s.svc.ListBuckets(r.Context()) + if err != nil { + s.writeError(w, http.StatusInternalServerError, "InternalError", err.Error()) + return + } + + type BucketXML struct { + Name string `xml:"Name"` + CreationDate string `xml:"CreationDate"` + } + type ListAllMyBucketsResult struct { + XMLName xml.Name `xml:"ListAllMyBucketsResult"` + Xmlns string `xml:"xmlns,attr"` + Owner struct { + ID string `xml:"ID"` + DisplayName string `xml:"DisplayName"` + } `xml:"Owner"` + Buckets struct { + Bucket []BucketXML `xml:"Bucket"` + } `xml:"Buckets"` + } + + result := ListAllMyBucketsResult{Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/"} + result.Owner.ID = "apex" + result.Owner.DisplayName = "apex" + for _, b := range buckets { + result.Buckets.Bucket = append(result.Buckets.Bucket, BucketXML{ + Name: b.Name, + CreationDate: b.CreatedAt.UTC().Format(time.RFC3339), + }) + } + + s.writeXML(w, result) +} + +func (s *Server) handleBucket(r *http.Request, w http.ResponseWriter, bucket string) { + if r.Method != http.MethodGet { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + s.handleListObjects(r, w, bucket) +} + +func (s *Server) handleListObjects(r *http.Request, w http.ResponseWriter, bucket string) { + query := r.URL.Query() + prefix := query.Get("prefix") + delimiter := query.Get("delimiter") + marker := query.Get("marker") + maxKeys := 1000 + if mk := query.Get("max-keys"); mk != "" { + if n, err := strconv.Atoi(mk); err == nil && n > 0 { + maxKeys = n + } + } + + result, err := s.svc.ListObjects(r.Context(), bucket, prefix, delimiter, marker, maxKeys) + if err != nil { + s.writeS3Error(w, err) + return + } + + type Contents struct { + Key string `xml:"Key"` + LastModified string `xml:"LastModified"` + ETag string `xml:"ETag"` + Size int64 `xml:"Size"` + StorageClass string `xml:"StorageClass"` + } + type CommonPrefix struct { + Prefix string `xml:"Prefix"` + } + type ListBucketResult struct { + XMLName xml.Name `xml:"ListBucketResult"` + Xmlns string `xml:"xmlns,attr"` + Name string `xml:"Name"` + Prefix string `xml:"Prefix"` + Marker string `xml:"Marker"` + MaxKeys int `xml:"MaxKeys"` + IsTruncated bool `xml:"IsTruncated"` + Contents []Contents + CommonPrefixes []CommonPrefix `xml:",omitempty"` + } + + xmlResult := ListBucketResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Name: result.Bucket, + Prefix: result.Prefix, + Marker: marker, + MaxKeys: maxKeys, + IsTruncated: result.IsTruncated, + } + for _, obj := range result.Objects { + xmlResult.Contents = append(xmlResult.Contents, Contents{ + Key: obj.Key, + LastModified: obj.LastModified.UTC().Format(time.RFC3339), + ETag: fmt.Sprintf(`"%s"`, obj.ETag), + Size: obj.Size, + StorageClass: obj.StorageClass, + }) + } + for _, cp := range result.CommonPrefixes { + xmlResult.CommonPrefixes = append(xmlResult.CommonPrefixes, CommonPrefix{Prefix: cp}) + } + + s.writeXML(w, xmlResult) +} + +func (s *Server) handleCreateBucket(r *http.Request, w http.ResponseWriter, bucket string) { + if r.Method != http.MethodPut { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + + err := s.svc.CreateBucket(r.Context(), bucket) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.Header().Set("Location", "/"+bucket) + w.WriteHeader(http.StatusOK) +} + +func (s *Server) handleDeleteBucket(r *http.Request, w http.ResponseWriter, bucket string) { + if r.Method != http.MethodDelete { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + + err := s.svc.DeleteBucket(r.Context(), bucket) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) handleHeadBucket(r *http.Request, w http.ResponseWriter, bucket string) { + if r.Method != http.MethodHead { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + + _, err := s.svc.HeadBucket(r.Context(), bucket) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (s *Server) handleObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + switch r.Method { + case http.MethodGet: + s.handleGetObject(r, w, bucket, key) + case http.MethodPut: + s.handlePutObject(r, w, bucket, key) + case http.MethodDelete: + s.handleDeleteObject(r, w, bucket, key) + case http.MethodHead: + s.handleHeadObject(r, w, bucket, key) + default: + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + } +} + +func (s *Server) handleGetObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + obj, data, err := s.svc.GetObject(r.Context(), bucket, key) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.Header().Set("Content-Type", obj.ContentType) + if obj.ContentType == "" { + w.Header().Set("Content-Type", "application/octet-stream") + } + w.Header().Set("Content-Length", strconv.FormatInt(obj.Size, 10)) + w.Header().Set("ETag", fmt.Sprintf(`"%s"`, obj.ETag)) + w.Header().Set("Last-Modified", obj.LastModified.UTC().Format(http.TimeFormat)) + _, _ = w.Write(data) //nolint:gosec +} + +func (s *Server) handlePutObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + contentType := r.Header.Get("Content-Type") + if contentType == "" { + contentType = "application/octet-stream" + } + + obj, err := s.svc.PutObject(r.Context(), bucket, key, r.Body, contentType) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.Header().Set("ETag", fmt.Sprintf(`"%s"`, obj.ETag)) + w.WriteHeader(http.StatusOK) +} + +func (s *Server) handleDeleteObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + err := s.svc.DeleteObject(r.Context(), bucket, key) + if err != nil { + s.writeS3Error(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) handleHeadObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + obj, err := s.svc.HeadObject(r.Context(), bucket, key) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.Header().Set("Content-Type", obj.ContentType) + if obj.ContentType == "" { + w.Header().Set("Content-Type", "application/octet-stream") + } + w.Header().Set("Content-Length", strconv.FormatInt(obj.Size, 10)) + w.Header().Set("ETag", fmt.Sprintf(`"%s"`, obj.ETag)) + w.Header().Set("Last-Modified", obj.LastModified.UTC().Format(http.TimeFormat)) + w.WriteHeader(http.StatusOK) +} + +func (s *Server) writeXML(w http.ResponseWriter, data any) { + w.Header().Set("Content-Type", "application/xml") + output, err := xml.Marshal(data) + if err != nil { + s.writeError(w, http.StatusInternalServerError, "InternalError", err.Error()) + return + } + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(output) +} + +func (s *Server) writeS3Error(w http.ResponseWriter, err error) { + switch { + case errors.Is(err, ErrBucketNotFound): + s.writeError(w, http.StatusNotFound, "NoSuchBucket", "The specified bucket does not exist") + case errors.Is(err, ErrBucketNotEmpty): + s.writeError(w, http.StatusConflict, "BucketNotEmpty", "The bucket you tried to delete is not empty") + case errors.Is(err, ErrBucketAlreadyExists): + s.writeError(w, http.StatusConflict, "BucketAlreadyExists", "The requested bucket name is not available") + case errors.Is(err, ErrObjectNotFound): + s.writeError(w, http.StatusNotFound, "NoSuchKey", "The specified key does not exist") + case errors.Is(err, ErrObjectTooLarge): + s.writeError(w, http.StatusRequestEntityTooLarge, "EntityTooLarge", "Your proposed upload exceeds the maximum allowed size") + default: + s.writeError(w, http.StatusInternalServerError, "InternalError", err.Error()) + } +} + +func (s *Server) writeError(w http.ResponseWriter, code int, codeStr, message string) { + type Error struct { + XMLName xml.Name `xml:"Error"` + Code string `xml:"Code"` + Message string `xml:"Message"` + Resource string `xml:"Resource"` + RequestID string `xml:"RequestId"` + } + err := Error{ + Code: codeStr, + Message: message, + RequestID: "apex", + } + w.Header().Set("Content-Type", "application/xml") + w.WriteHeader(code) + output, _ := xml.Marshal(err) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(output) +} diff --git a/pkg/s3/server_test.go b/pkg/s3/server_test.go new file mode 100644 index 0000000..fa3c5c9 --- /dev/null +++ b/pkg/s3/server_test.go @@ -0,0 +1,377 @@ +package s3 + +import ( + "bytes" + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/evstack/apex/pkg/types" + "github.com/rs/zerolog" +) + +type httpMockStore struct { + buckets map[string]*Bucket + objects map[string]map[string]*httpStoredObject +} + +type httpStoredObject struct { + obj *Object + data []byte +} + +func newHTTPMockStore() *httpMockStore { + return &httpMockStore{ + buckets: make(map[string]*Bucket), + objects: make(map[string]map[string]*httpStoredObject), + } +} + +func (m *httpMockStore) PutBucket(_ context.Context, name string) error { + if _, exists := m.buckets[name]; exists { + return ErrBucketAlreadyExists + } + now := time.Now() + m.buckets[name] = &Bucket{Name: name, CreatedAt: now, LastModified: now} + m.objects[name] = make(map[string]*httpStoredObject) + return nil +} + +func (m *httpMockStore) GetBucket(_ context.Context, name string) (*Bucket, error) { + b, ok := m.buckets[name] + if !ok { + return nil, ErrBucketNotFound + } + return b, nil +} + +func (m *httpMockStore) DeleteBucket(_ context.Context, name string) error { + if _, ok := m.buckets[name]; !ok { + return ErrBucketNotFound + } + if len(m.objects[name]) > 0 { + return ErrBucketNotEmpty + } + delete(m.buckets, name) + delete(m.objects, name) + return nil +} + +func (m *httpMockStore) ListBuckets(_ context.Context) ([]Bucket, error) { + result := make([]Bucket, 0, len(m.buckets)) + for _, b := range m.buckets { + result = append(result, *b) + } + return result, nil +} + +func (m *httpMockStore) PutObject(_ context.Context, bucket, key string, data []byte, contentType string) (*Object, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + now := time.Now() + obj := &Object{ + Key: key, + Bucket: bucket, + Size: int64(len(data)), + ETag: "etag-" + key, + ContentType: contentType, + LastModified: now, + } + m.objects[bucket][key] = &httpStoredObject{obj: obj, data: data} + return obj, nil +} + +func (m *httpMockStore) GetObject(_ context.Context, bucket, key string) (*Object, []byte, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, nil, ErrBucketNotFound + } + stored, ok := m.objects[bucket][key] + if !ok { + return nil, nil, ErrObjectNotFound + } + return stored.obj, stored.data, nil +} + +func (m *httpMockStore) DeleteObject(_ context.Context, bucket, key string) error { + if _, ok := m.buckets[bucket]; !ok { + return ErrBucketNotFound + } + if _, ok := m.objects[bucket][key]; !ok { + return ErrObjectNotFound + } + delete(m.objects[bucket], key) + return nil +} + +func (m *httpMockStore) ListObjects(_ context.Context, bucket, prefix, delimiter, _ string, _ int) (*ListObjectsResult, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + result := &ListObjectsResult{Bucket: bucket, Prefix: prefix, Delimiter: delimiter} + for key, stored := range m.objects[bucket] { + result.Objects = append(result.Objects, ObjectInfo{ + Key: key, + LastModified: stored.obj.LastModified, + ETag: stored.obj.ETag, + Size: stored.obj.Size, + StorageClass: "STANDARD", + }) + } + return result, nil +} + +func (m *httpMockStore) HeadObject(_ context.Context, bucket, key string) (*Object, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + stored, ok := m.objects[bucket][key] + if !ok { + return nil, ErrObjectNotFound + } + return stored.obj, nil +} + +func setupHTTPTestServer() (*Server, *httpMockStore) { + store := newHTTPMockStore() + svc := NewService(store, nil, types.Namespace{}) + log := zerolog.New(io.Discard) + return NewServer(svc, "us-east-1", log), store +} + +func TestServer_ListBuckets(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "bucket1") + _ = store.PutBucket(context.Background(), "bucket2") + + req := httptest.NewRequest(http.MethodGet, "/", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + body := rec.Body.String() + if !strings.Contains(body, "bucket1") || !strings.Contains(body, "bucket2") { + t.Errorf("expected buckets in response, got: %s", body) + } + if !strings.Contains(body, "ListAllMyBucketsResult") { + t.Errorf("expected ListAllMyBucketsResult in response, got: %s", body) + } +} + +func TestServer_CreateBucket(t *testing.T) { + server, _ := setupHTTPTestServer() + + req := httptest.NewRequest(http.MethodPut, "/test-bucket", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + if rec.Header().Get("Location") != "/test-bucket" { + t.Errorf("expected Location header, got: %s", rec.Header().Get("Location")) + } +} + +func TestServer_CreateBucket_AlreadyExists(t *testing.T) { + server, _ := setupHTTPTestServer() + + req := httptest.NewRequest(http.MethodPut, "/test-bucket", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + req2 := httptest.NewRequest(http.MethodPut, "/test-bucket", nil) //nolint:noctx + rec2 := httptest.NewRecorder() + server.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusConflict { + t.Errorf("expected status 409, got %d", rec2.Code) + } +} + +func TestServer_DeleteBucket(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + req := httptest.NewRequest(http.MethodDelete, "/test-bucket", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNoContent { + t.Errorf("expected status 204, got %d", rec.Code) + } +} + +func TestServer_DeleteBucket_NotEmpty(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + _, _ = store.PutObject(context.Background(), "test-bucket", "key", []byte("data"), "text/plain") + + req := httptest.NewRequest(http.MethodDelete, "/test-bucket", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusConflict { + t.Errorf("expected status 409, got %d", rec.Code) + } +} + +func TestServer_PutGetObject(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + body := []byte("hello world") + req := httptest.NewRequest(http.MethodPut, "/test-bucket/hello.txt", bytes.NewReader(body)) //nolint:noctx + req.Header.Set("Content-Type", "text/plain") + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + if rec.Header().Get("ETag") == "" { + t.Errorf("expected ETag header") + } + + req2 := httptest.NewRequest(http.MethodGet, "/test-bucket/hello.txt", nil) //nolint:noctx + rec2 := httptest.NewRecorder() + server.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec2.Code) + } + if rec2.Body.String() != "hello world" { + t.Errorf("expected body 'hello world', got: %s", rec2.Body.String()) + } + if rec2.Header().Get("Content-Type") != "text/plain" { + t.Errorf("expected Content-Type text/plain, got: %s", rec2.Header().Get("Content-Type")) + } +} + +func TestServer_GetObject_NotFound(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + req := httptest.NewRequest(http.MethodGet, "/test-bucket/nonexistent.txt", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Errorf("expected status 404, got %d", rec.Code) + } +} + +func TestServer_HeadObject(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + _, _ = store.PutObject(context.Background(), "test-bucket", "test.txt", []byte("content"), "text/plain") + + req := httptest.NewRequest(http.MethodHead, "/test-bucket/test.txt", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + if rec.Header().Get("Content-Length") != "7" { + t.Errorf("expected Content-Length 7, got: %s", rec.Header().Get("Content-Length")) + } + if rec.Body.Len() != 0 { + t.Errorf("expected empty body for HEAD request") + } +} + +func TestServer_DeleteObject(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + _, _ = store.PutObject(context.Background(), "test-bucket", "test.txt", []byte("content"), "text/plain") + + req := httptest.NewRequest(http.MethodDelete, "/test-bucket/test.txt", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNoContent { + t.Errorf("expected status 204, got %d", rec.Code) + } + + req2 := httptest.NewRequest(http.MethodGet, "/test-bucket/test.txt", nil) //nolint:noctx + rec2 := httptest.NewRecorder() + server.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusNotFound { + t.Errorf("expected status 404 after delete, got %d", rec2.Code) + } +} + +func TestServer_ListObjects(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + _, _ = store.PutObject(context.Background(), "test-bucket", "file1.txt", []byte("a"), "text/plain") + _, _ = store.PutObject(context.Background(), "test-bucket", "file2.txt", []byte("bb"), "text/plain") + + req := httptest.NewRequest(http.MethodGet, "/test-bucket?list-type=2", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + body := rec.Body.String() + if !strings.Contains(body, "ListBucketResult") { + t.Errorf("expected ListBucketResult in response, got: %s", body) + } + if !strings.Contains(body, "file1.txt") || !strings.Contains(body, "file2.txt") { + t.Errorf("expected objects in response, got: %s", body) + } +} + +func TestServer_HeadBucket(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + req := httptest.NewRequest(http.MethodHead, "/test-bucket", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } +} + +func TestServer_HeadBucket_NotFound(t *testing.T) { + server, _ := setupHTTPTestServer() + + req := httptest.NewRequest(http.MethodHead, "/nonexistent-bucket", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Errorf("expected status 404, got %d", rec.Code) + } +} + +func TestServer_ErrorFormat(t *testing.T) { + server, _ := setupHTTPTestServer() + + req := httptest.NewRequest(http.MethodGet, "/nonexistent-bucket/key", nil) //nolint:noctx + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Errorf("expected status 404, got %d", rec.Code) + } + + body := rec.Body.String() + if !strings.Contains(body, "NoSuchBucket") { + t.Errorf("expected error code NoSuchBucket in response, got: %s", body) + } + if !strings.Contains(body, "apex") { + t.Errorf("expected RequestId apex in response, got: %s", body) + } +} diff --git a/pkg/s3/service.go b/pkg/s3/service.go new file mode 100644 index 0000000..6bf4687 --- /dev/null +++ b/pkg/s3/service.go @@ -0,0 +1,117 @@ +package s3 + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "io" + + "github.com/evstack/apex/pkg/types" +) + +var ( + ErrBucketNotFound = errors.New("bucket not found") + ErrBucketNotEmpty = errors.New("bucket not empty") + ErrBucketAlreadyExists = errors.New("bucket already exists") + ErrObjectNotFound = errors.New("object not found") + ErrInvalidBucketName = errors.New("invalid bucket name") + ErrInvalidObjectKey = errors.New("invalid object key") + ErrObjectTooLarge = errors.New("object too large") +) + +type BlobSubmitter interface { + SubmitBlob(ctx context.Context, namespace types.Namespace, data []byte) (*types.Blob, error) +} + +type ObjectStore interface { + PutBucket(ctx context.Context, name string) error + GetBucket(ctx context.Context, name string) (*Bucket, error) + DeleteBucket(ctx context.Context, name string) error + ListBuckets(ctx context.Context) ([]Bucket, error) + + PutObject(ctx context.Context, bucket, key string, data []byte, contentType string) (*Object, error) + GetObject(ctx context.Context, bucket, key string) (*Object, []byte, error) + DeleteObject(ctx context.Context, bucket, key string) error + ListObjects(ctx context.Context, bucket, prefix, delimiter, marker string, maxKeys int) (*ListObjectsResult, error) + HeadObject(ctx context.Context, bucket, key string) (*Object, error) +} + +type Service struct { + store ObjectStore + submitter BlobSubmitter + namespace types.Namespace +} + +func NewService(store ObjectStore, submitter BlobSubmitter, namespace types.Namespace) *Service { + return &Service{ + store: store, + submitter: submitter, + namespace: namespace, + } +} + +func (s *Service) CreateBucket(ctx context.Context, name string) error { + return s.store.PutBucket(ctx, name) +} + +func (s *Service) DeleteBucket(ctx context.Context, name string) error { + return s.store.DeleteBucket(ctx, name) +} + +func (s *Service) ListBuckets(ctx context.Context) ([]Bucket, error) { + return s.store.ListBuckets(ctx) +} + +func (s *Service) HeadBucket(ctx context.Context, name string) (*Bucket, error) { + return s.store.GetBucket(ctx, name) +} + +func (s *Service) PutObject(ctx context.Context, bucket, key string, r io.Reader, contentType string) (*Object, error) { + data, err := io.ReadAll(r) + if err != nil { + return nil, fmt.Errorf("read object data: %w", err) + } + if len(data) > maxObjectSize { + return nil, ErrObjectTooLarge + } + + obj, err := s.store.PutObject(ctx, bucket, key, data, contentType) + if err != nil { + return nil, err + } + + if s.submitter != nil { + blob, err := s.submitter.SubmitBlob(ctx, s.namespace, data) + if err != nil { + return nil, fmt.Errorf("submit to celestia: %w", err) + } + + if updater, ok := s.store.(interface { + UpdateObjectWithBlobs(ctx context.Context, bucket, key string, height uint64, commitments []string) error + }); ok { + _ = updater.UpdateObjectWithBlobs(ctx, bucket, key, blob.Height, []string{hex.EncodeToString(blob.Commitment)}) + } + } + + return obj, nil +} + +func (s *Service) GetObject(ctx context.Context, bucket, key string) (*Object, []byte, error) { + return s.store.GetObject(ctx, bucket, key) +} + +func (s *Service) DeleteObject(ctx context.Context, bucket, key string) error { + return s.store.DeleteObject(ctx, bucket, key) +} + +func (s *Service) ListObjects(ctx context.Context, bucket, prefix, delimiter, marker string, maxKeys int) (*ListObjectsResult, error) { + if maxKeys <= 0 { + maxKeys = 1000 + } + return s.store.ListObjects(ctx, bucket, prefix, delimiter, marker, maxKeys) +} + +func (s *Service) HeadObject(ctx context.Context, bucket, key string) (*Object, error) { + return s.store.HeadObject(ctx, bucket, key) +} diff --git a/pkg/s3/service_test.go b/pkg/s3/service_test.go new file mode 100644 index 0000000..d2fcdb4 --- /dev/null +++ b/pkg/s3/service_test.go @@ -0,0 +1,270 @@ +package s3 + +import ( + "bytes" + "context" + "errors" + "testing" + "time" + + "github.com/evstack/apex/pkg/types" +) + +type mockStore struct { + buckets map[string]*Bucket + objects map[string]map[string]*storedObject +} + +type storedObject struct { + obj *Object + data []byte +} + +func newMockStore() *mockStore { + return &mockStore{ + buckets: make(map[string]*Bucket), + objects: make(map[string]map[string]*storedObject), + } +} + +func (m *mockStore) PutBucket(ctx context.Context, name string) error { + if _, exists := m.buckets[name]; exists { + return ErrBucketAlreadyExists + } + now := time.Now() + m.buckets[name] = &Bucket{Name: name, CreatedAt: now, LastModified: now} + m.objects[name] = make(map[string]*storedObject) + return nil +} + +func (m *mockStore) GetBucket(ctx context.Context, name string) (*Bucket, error) { + b, ok := m.buckets[name] + if !ok { + return nil, ErrBucketNotFound + } + return b, nil +} + +func (m *mockStore) DeleteBucket(ctx context.Context, name string) error { + if _, ok := m.buckets[name]; !ok { + return ErrBucketNotFound + } + if len(m.objects[name]) > 0 { + return ErrBucketNotEmpty + } + delete(m.buckets, name) + delete(m.objects, name) + return nil +} + +func (m *mockStore) ListBuckets(ctx context.Context) ([]Bucket, error) { + result := make([]Bucket, 0, len(m.buckets)) + for _, b := range m.buckets { + result = append(result, *b) + } + return result, nil +} + +func (m *mockStore) PutObject(ctx context.Context, bucket, key string, data []byte, contentType string) (*Object, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + now := time.Now() + obj := &Object{ + Key: key, + Bucket: bucket, + Size: int64(len(data)), + ETag: "etag-" + key, + ContentType: contentType, + LastModified: now, + } + m.objects[bucket][key] = &storedObject{obj: obj, data: data} + return obj, nil +} + +func (m *mockStore) GetObject(ctx context.Context, bucket, key string) (*Object, []byte, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, nil, ErrBucketNotFound + } + stored, ok := m.objects[bucket][key] + if !ok { + return nil, nil, ErrObjectNotFound + } + return stored.obj, stored.data, nil +} + +func (m *mockStore) DeleteObject(ctx context.Context, bucket, key string) error { + if _, ok := m.buckets[bucket]; !ok { + return ErrBucketNotFound + } + if _, ok := m.objects[bucket][key]; !ok { + return ErrObjectNotFound + } + delete(m.objects[bucket], key) + return nil +} + +func (m *mockStore) ListObjects(ctx context.Context, bucket, prefix, delimiter, marker string, maxKeys int) (*ListObjectsResult, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + result := &ListObjectsResult{Bucket: bucket, Prefix: prefix, Delimiter: delimiter} + for key, stored := range m.objects[bucket] { + result.Objects = append(result.Objects, ObjectInfo{ + Key: key, + LastModified: stored.obj.LastModified, + ETag: stored.obj.ETag, + Size: stored.obj.Size, + StorageClass: "STANDARD", + }) + } + return result, nil +} + +func (m *mockStore) HeadObject(ctx context.Context, bucket, key string) (*Object, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + stored, ok := m.objects[bucket][key] + if !ok { + return nil, ErrObjectNotFound + } + return stored.obj, nil +} + +func TestService_CreateBucket(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + err := svc.CreateBucket(ctx, "test-bucket") + if err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + err = svc.CreateBucket(ctx, "test-bucket") + if !errors.Is(err, ErrBucketAlreadyExists) { + t.Fatalf("expected ErrBucketAlreadyExists, got: %v", err) + } +} + +func TestService_PutGetObject(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + err := svc.CreateBucket(ctx, "test-bucket") + if err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + data := []byte("hello world") + obj, err := svc.PutObject(ctx, "test-bucket", "test-key", bytes.NewReader(data), "text/plain") + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + if obj.Size != int64(len(data)) { + t.Errorf("expected size %d, got %d", len(data), obj.Size) + } + + gotObj, gotData, err := svc.GetObject(ctx, "test-bucket", "test-key") + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + + if !bytes.Equal(gotData, data) { + t.Errorf("expected data %q, got %q", data, gotData) + } + if gotObj.Key != "test-key" { + t.Errorf("expected key test-key, got %s", gotObj.Key) + } +} + +func TestService_DeleteObject(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + err := svc.CreateBucket(ctx, "test-bucket") + if err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + data := []byte("hello world") + _, err = svc.PutObject(ctx, "test-bucket", "test-key", bytes.NewReader(data), "text/plain") + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + err = svc.DeleteObject(ctx, "test-bucket", "test-key") + if err != nil { + t.Fatalf("DeleteObject failed: %v", err) + } + + _, _, err = svc.GetObject(ctx, "test-bucket", "test-key") + if !errors.Is(err, ErrObjectNotFound) { + t.Fatalf("expected ErrObjectNotFound, got: %v", err) + } +} + +func TestService_ListBuckets(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + buckets, err := svc.ListBuckets(ctx) + if err != nil { + t.Fatalf("ListBuckets failed: %v", err) + } + if len(buckets) != 0 { + t.Errorf("expected 0 buckets, got %d", len(buckets)) + } + + err = svc.CreateBucket(ctx, "bucket-a") + if err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + err = svc.CreateBucket(ctx, "bucket-b") + if err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + buckets, err = svc.ListBuckets(ctx) + if err != nil { + t.Fatalf("ListBuckets failed: %v", err) + } + if len(buckets) != 2 { + t.Errorf("expected 2 buckets, got %d", len(buckets)) + } +} + +func TestService_HeadObject(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + err := svc.CreateBucket(ctx, "test-bucket") + if err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + _, err = svc.HeadObject(ctx, "test-bucket", "nonexistent") + if !errors.Is(err, ErrObjectNotFound) { + t.Fatalf("expected ErrObjectNotFound, got: %v", err) + } + + data := []byte("hello world") + _, err = svc.PutObject(ctx, "test-bucket", "test-key", bytes.NewReader(data), "text/plain") + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + obj, err := svc.HeadObject(ctx, "test-bucket", "test-key") + if err != nil { + t.Fatalf("HeadObject failed: %v", err) + } + if obj.Key != "test-key" { + t.Errorf("expected key test-key, got %s", obj.Key) + } +} diff --git a/pkg/s3/types.go b/pkg/s3/types.go new file mode 100644 index 0000000..bcca135 --- /dev/null +++ b/pkg/s3/types.go @@ -0,0 +1,70 @@ +package s3 + +import ( + "time" +) + +const ( + maxObjectSize = 5 * 1024 * 1024 * 1024 // 5GB S3 limit +) + +type Bucket struct { + Name string + CreatedAt time.Time + LastModified time.Time +} + +type Object struct { + Key string + Bucket string + Size int64 + ETag string // MD5 hash of object content + ContentType string + LastModified time.Time + + Height uint64 // Celestia height where blobs were submitted + Namespace string // Namespace used for blob storage + BlobCount int // Number of blobs the object was split into + Commitments []string +} + +type ListBucketsResult struct { + Buckets []Bucket +} + +type ListObjectsResult struct { + Bucket string + Prefix string + Delimiter string + IsTruncated bool + Objects []ObjectInfo + CommonPrefixes []string +} + +type ObjectInfo struct { + Key string + LastModified time.Time + ETag string + Size int64 + StorageClass string +} + +type CopyObjectResult struct { + LastModified time.Time + ETag string +} + +type Part struct { + PartNumber int + ETag string + LastModified time.Time + Size int64 +} + +type MultipartUpload struct { + UploadID string + Bucket string + Key string + Initiated time.Time + Parts []Part +} diff --git a/pkg/store/migrations/003_s3_objects.sql b/pkg/store/migrations/003_s3_objects.sql new file mode 100644 index 0000000..4dcd771 --- /dev/null +++ b/pkg/store/migrations/003_s3_objects.sql @@ -0,0 +1,25 @@ +CREATE TABLE IF NOT EXISTS s3_buckets ( + name TEXT PRIMARY KEY, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS s3_objects ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucket TEXT NOT NULL, + key TEXT NOT NULL, + size INTEGER NOT NULL, + etag TEXT NOT NULL, + content_type TEXT, + last_modified INTEGER NOT NULL, + height INTEGER NOT NULL, + namespace TEXT NOT NULL, + blob_count INTEGER NOT NULL, + commitments TEXT NOT NULL, + data BLOB, + UNIQUE(bucket, key), + FOREIGN KEY (bucket) REFERENCES s3_buckets(name) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_s3_objects_bucket ON s3_objects(bucket); +CREATE INDEX IF NOT EXISTS idx_s3_objects_bucket_key ON s3_objects(bucket, key); diff --git a/pkg/store/object.go b/pkg/store/object.go new file mode 100644 index 0000000..b3fc8eb --- /dev/null +++ b/pkg/store/object.go @@ -0,0 +1,286 @@ +package store + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/evstack/apex/pkg/s3" + "github.com/evstack/apex/pkg/types" +) + +type ObjectStore struct { + writer *sql.DB + reader *sql.DB + ns types.Namespace +} + +func NewObjectStore(db *SQLiteStore, namespace types.Namespace) *ObjectStore { + return &ObjectStore{ + writer: db.writer, + reader: db.reader, + ns: namespace, + } +} + +func (s *ObjectStore) PutBucket(ctx context.Context, name string) error { + now := time.Now().UnixNano() + _, err := s.writer.ExecContext(ctx, + `INSERT INTO s3_buckets (name, created_at, updated_at) VALUES (?, ?, ?)`, + name, now, now) + if err != nil { + if isSQLiteUniqueConstraint(err) { + return s3.ErrBucketAlreadyExists + } + return fmt.Errorf("insert bucket: %w", err) + } + return nil +} + +func (s *ObjectStore) GetBucket(ctx context.Context, name string) (*s3.Bucket, error) { + var b s3.Bucket + var createdAt, updatedAt int64 + err := s.reader.QueryRowContext(ctx, + `SELECT name, created_at, updated_at FROM s3_buckets WHERE name = ?`, name). + Scan(&b.Name, &createdAt, &updatedAt) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, s3.ErrBucketNotFound + } + return nil, fmt.Errorf("query bucket: %w", err) + } + b.CreatedAt = time.Unix(0, createdAt) + b.LastModified = time.Unix(0, updatedAt) + return &b, nil +} + +func (s *ObjectStore) DeleteBucket(ctx context.Context, name string) error { + var count int + err := s.reader.QueryRowContext(ctx, + `SELECT COUNT(*) FROM s3_objects WHERE bucket = ?`, name).Scan(&count) + if err != nil { + return fmt.Errorf("count objects: %w", err) + } + if count > 0 { + return s3.ErrBucketNotEmpty + } + + result, err := s.writer.ExecContext(ctx, + `DELETE FROM s3_buckets WHERE name = ?`, name) + if err != nil { + return fmt.Errorf("delete bucket: %w", err) + } + affected, _ := result.RowsAffected() + if affected == 0 { + return s3.ErrBucketNotFound + } + return nil +} + +func (s *ObjectStore) ListBuckets(ctx context.Context) ([]s3.Bucket, error) { + rows, err := s.reader.QueryContext(ctx, + `SELECT name, created_at, updated_at FROM s3_buckets ORDER BY name`) + if err != nil { + return nil, fmt.Errorf("query buckets: %w", err) + } + defer func() { _ = rows.Close() }() + + var buckets []s3.Bucket + for rows.Next() { + var b s3.Bucket + var createdAt, updatedAt int64 + if err := rows.Scan(&b.Name, &createdAt, &updatedAt); err != nil { + return nil, fmt.Errorf("scan bucket: %w", err) + } + b.CreatedAt = time.Unix(0, createdAt) + b.LastModified = time.Unix(0, updatedAt) + buckets = append(buckets, b) + } + return buckets, rows.Err() +} + +func (s *ObjectStore) PutObject(ctx context.Context, bucket, key string, data []byte, contentType string) (*s3.Object, error) { + if _, err := s.GetBucket(ctx, bucket); err != nil { + return nil, err + } + + etag := computeETag(data) + now := time.Now().UnixNano() + commitmentsJSON, _ := json.Marshal([]string{}) + + result, err := s.writer.ExecContext(ctx, + `INSERT INTO s3_objects (bucket, key, size, etag, content_type, last_modified, height, namespace, blob_count, commitments, data) + VALUES (?, ?, ?, ?, ?, ?, 0, ?, 0, ?, ?) + ON CONFLICT(bucket, key) DO UPDATE SET + size = excluded.size, + etag = excluded.etag, + content_type = excluded.content_type, + last_modified = excluded.last_modified, + data = excluded.data`, + bucket, key, len(data), etag, contentType, now, s.ns.String(), commitmentsJSON, data) + if err != nil { + return nil, fmt.Errorf("insert object: %w", err) + } + + obj := &s3.Object{ + Key: key, + Bucket: bucket, + Size: int64(len(data)), + ETag: etag, + ContentType: contentType, + LastModified: time.Unix(0, now), + Namespace: s.ns.String(), + } + if rows, _ := result.RowsAffected(); rows > 0 { + return obj, nil + } + return obj, nil +} + +func (s *ObjectStore) GetObject(ctx context.Context, bucket, key string) (*s3.Object, []byte, error) { + obj, err := s.HeadObject(ctx, bucket, key) + if err != nil { + return nil, nil, err + } + + var data []byte + var height uint64 + var namespace string + var commitmentsJSON string + err = s.reader.QueryRowContext(ctx, + `SELECT data, height, namespace, commitments FROM s3_objects WHERE bucket = ? AND key = ?`, + bucket, key).Scan(&data, &height, &namespace, &commitmentsJSON) + if err != nil { + return nil, nil, fmt.Errorf("query object data: %w", err) + } + + obj.Height = height + obj.Namespace = namespace + if commitmentsJSON != "" && commitmentsJSON != "null" { + _ = json.Unmarshal([]byte(commitmentsJSON), &obj.Commitments) + } + + return obj, data, nil +} + +func (s *ObjectStore) DeleteObject(ctx context.Context, bucket, key string) error { + result, err := s.writer.ExecContext(ctx, + `DELETE FROM s3_objects WHERE bucket = ? AND key = ?`, bucket, key) + if err != nil { + return fmt.Errorf("delete object: %w", err) + } + affected, _ := result.RowsAffected() + if affected == 0 { + return s3.ErrObjectNotFound + } + return nil +} + +func (s *ObjectStore) ListObjects(ctx context.Context, bucket, prefix, delimiter, marker string, maxKeys int) (*s3.ListObjectsResult, error) { + if _, err := s.GetBucket(ctx, bucket); err != nil { + return nil, err + } + + query := `SELECT key, last_modified, etag, size FROM s3_objects WHERE bucket = ?` + args := []any{bucket} + + if prefix != "" { + query += ` AND key LIKE ?` + args = append(args, prefix+"%") + } + if marker != "" { + query += ` AND key > ?` + args = append(args, marker) + } + + query += ` ORDER BY key LIMIT ?` + args = append(args, maxKeys+1) + + rows, err := s.reader.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query objects: %w", err) + } + defer func() { _ = rows.Close() }() + + result := &s3.ListObjectsResult{ + Bucket: bucket, + Prefix: prefix, + Delimiter: delimiter, + } + prefixes := make(map[string]bool) + + count := 0 + for rows.Next() { + if count >= maxKeys { + result.IsTruncated = true + break + } + + var key string + var lastModified int64 + var etag string + var size int64 + if err := rows.Scan(&key, &lastModified, &etag, &size); err != nil { + return nil, fmt.Errorf("scan object: %w", err) + } + + if delimiter != "" { + afterPrefix := strings.TrimPrefix(key, prefix) + if idx := strings.Index(afterPrefix, delimiter); idx >= 0 { + commonPrefix := prefix + afterPrefix[:idx+1] + if !prefixes[commonPrefix] { + prefixes[commonPrefix] = true + result.CommonPrefixes = append(result.CommonPrefixes, commonPrefix) + } + continue + } + } + + result.Objects = append(result.Objects, s3.ObjectInfo{ + Key: key, + LastModified: time.Unix(0, lastModified), + ETag: etag, + Size: size, + StorageClass: "STANDARD", + }) + count++ + } + + return result, rows.Err() +} + +func (s *ObjectStore) HeadObject(ctx context.Context, bucket, key string) (*s3.Object, error) { + var obj s3.Object + var lastModified int64 + err := s.reader.QueryRowContext(ctx, + `SELECT key, bucket, size, etag, content_type, last_modified FROM s3_objects WHERE bucket = ? AND key = ?`, + bucket, key).Scan(&obj.Key, &obj.Bucket, &obj.Size, &obj.ETag, &obj.ContentType, &lastModified) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, s3.ErrObjectNotFound + } + return nil, fmt.Errorf("query object: %w", err) + } + obj.LastModified = time.Unix(0, lastModified) + return &obj, nil +} + +func (s *ObjectStore) UpdateObjectWithBlobs(ctx context.Context, bucket, key string, height uint64, commitments []string) error { + commitmentsJSON, _ := json.Marshal(commitments) + _, err := s.writer.ExecContext(ctx, + `UPDATE s3_objects SET height = ?, blob_count = ?, commitments = ? WHERE bucket = ? AND key = ?`, + height, len(commitments), string(commitmentsJSON), bucket, key) + return err +} + +func isSQLiteUniqueConstraint(err error) bool { + return err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") +} + +func computeETag(data []byte) string { + return fmt.Sprintf("%x", len(data)) +} diff --git a/pkg/store/s3.go b/pkg/store/s3.go index 70c7e64..ea2d19e 100644 --- a/pkg/store/s3.go +++ b/pkg/store/s3.go @@ -93,6 +93,15 @@ type S3Store struct { flushMu sync.Mutex // serializes flush operations } +// Client returns the underlying S3 client if it's an *s3.Client, +// allowing reuse for other components like S3ObjectStore. +func (s *S3Store) Client() *s3.Client { + if client, ok := s.client.(*s3.Client); ok { + return client + } + return nil +} + type flushBuffers struct { blobBuf map[blobChunkKey][]types.Blob headerBuf map[uint64][]*types.Header diff --git a/pkg/store/s3_object_store.go b/pkg/store/s3_object_store.go new file mode 100644 index 0000000..fb4e09a --- /dev/null +++ b/pkg/store/s3_object_store.go @@ -0,0 +1,369 @@ +package store + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" + + apexs3 "github.com/evstack/apex/pkg/s3" +) + +type S3ObjectStore struct { + client *s3.Client +} + +func NewS3ObjectStore(client *s3.Client) *S3ObjectStore { + return &S3ObjectStore{ + client: client, + } +} + +func (s *S3ObjectStore) PutBucket(ctx context.Context, name string) error { + _, err := s.client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: aws.String(name), + }) + if err != nil { + var aerr *s3types.BucketAlreadyExists + if errors.As(err, &aerr) { + return apexs3.ErrBucketAlreadyExists + } + var oerr *s3types.BucketAlreadyOwnedByYou + if errors.As(err, &oerr) { + return apexs3.ErrBucketAlreadyExists + } + return fmt.Errorf("create bucket: %w", err) + } + return nil +} + +func (s *S3ObjectStore) GetBucket(ctx context.Context, name string) (*apexs3.Bucket, error) { + _, err := s.client.HeadBucket(ctx, &s3.HeadBucketInput{ + Bucket: aws.String(name), + }) + if err != nil { + var nfe *s3types.NotFound + if errors.As(err, &nfe) { + return nil, apexs3.ErrBucketNotFound + } + if strings.Contains(err.Error(), "NotFound") { + return nil, apexs3.ErrBucketNotFound + } + return nil, fmt.Errorf("head bucket: %w", err) + } + + return &apexs3.Bucket{ + Name: name, + // S3 HeadBucket doesn't return creation date, but we can return a dummy or fetch it differently + CreatedAt: time.Now(), + LastModified: time.Now(), + }, nil +} + +func (s *S3ObjectStore) DeleteBucket(ctx context.Context, name string) error { + _, err := s.client.DeleteBucket(ctx, &s3.DeleteBucketInput{ + Bucket: aws.String(name), + }) + if err != nil { + if strings.Contains(err.Error(), "BucketNotEmpty") { + return apexs3.ErrBucketNotEmpty + } + if strings.Contains(err.Error(), "NoSuchBucket") { + return apexs3.ErrBucketNotFound + } + return fmt.Errorf("delete bucket: %w", err) + } + return nil +} + +func (s *S3ObjectStore) ListBuckets(ctx context.Context) ([]apexs3.Bucket, error) { + out, err := s.client.ListBuckets(ctx, &s3.ListBucketsInput{}) + if err != nil { + return nil, fmt.Errorf("list buckets: %w", err) + } + + var buckets []apexs3.Bucket + for _, b := range out.Buckets { + var createdAt time.Time + if b.CreationDate != nil { + createdAt = *b.CreationDate + } + buckets = append(buckets, apexs3.Bucket{ + Name: aws.ToString(b.Name), + CreatedAt: createdAt, + }) + } + return buckets, nil +} + +func (s *S3ObjectStore) PutObject(ctx context.Context, bucket, key string, data []byte, contentType string) (*apexs3.Object, error) { + _, err := s.client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + ContentType: aws.String(contentType), + }) + if err != nil { + if strings.Contains(err.Error(), "NoSuchBucket") { + return nil, apexs3.ErrBucketNotFound + } + return nil, fmt.Errorf("put object: %w", err) + } + + // Calculate ETag locally or just head the object + out, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return nil, fmt.Errorf("head object after put: %w", err) + } + + var lastModified time.Time + if out.LastModified != nil { + lastModified = *out.LastModified + } + + return &apexs3.Object{ + Key: key, + Bucket: bucket, + Size: int64(len(data)), + ETag: strings.Trim(aws.ToString(out.ETag), "\""), + ContentType: contentType, + LastModified: lastModified, + }, nil +} + +func (s *S3ObjectStore) GetObject(ctx context.Context, bucket, key string) (*apexs3.Object, []byte, error) { + out, err := s.client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + if strings.Contains(err.Error(), "NoSuchKey") { + return nil, nil, apexs3.ErrObjectNotFound + } + if strings.Contains(err.Error(), "NoSuchBucket") { + return nil, nil, apexs3.ErrBucketNotFound + } + return nil, nil, fmt.Errorf("get object: %w", err) + } + defer func() { _ = out.Body.Close() }() + + data, err := io.ReadAll(out.Body) + if err != nil { + return nil, nil, fmt.Errorf("read object body: %w", err) + } + + var lastModified time.Time + if out.LastModified != nil { + lastModified = *out.LastModified + } + + var size int64 + if out.ContentLength != nil { + size = *out.ContentLength + } else { + size = int64(len(data)) + } + + // Extract custom metadata for Celestia + var height uint64 + var namespace string + var commitments []string + if out.Metadata != nil { + if h, ok := out.Metadata["celestia-height"]; ok { + _, _ = fmt.Sscanf(h, "%d", &height) + } + if n, ok := out.Metadata["celestia-namespace"]; ok { + namespace = n + } + if c, ok := out.Metadata["celestia-commitments"]; ok { + commitments = strings.Split(c, ",") + } + } + + obj := &apexs3.Object{ + Key: key, + Bucket: bucket, + Size: size, + ETag: strings.Trim(aws.ToString(out.ETag), "\""), + ContentType: aws.ToString(out.ContentType), + LastModified: lastModified, + Height: height, + Namespace: namespace, + Commitments: commitments, + } + + return obj, data, nil +} + +func (s *S3ObjectStore) DeleteObject(ctx context.Context, bucket, key string) error { + _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return fmt.Errorf("delete object: %w", err) + } + return nil +} + +func (s *S3ObjectStore) ListObjects(ctx context.Context, bucket, prefix, delimiter, marker string, maxKeys int) (*apexs3.ListObjectsResult, error) { + input := &s3.ListObjectsInput{ + Bucket: aws.String(bucket), + MaxKeys: aws.Int32(int32(maxKeys)), + } + if prefix != "" { + input.Prefix = aws.String(prefix) + } + if delimiter != "" { + input.Delimiter = aws.String(delimiter) + } + if marker != "" { + input.Marker = aws.String(marker) + } + + out, err := s.client.ListObjects(ctx, input) + if err != nil { + if strings.Contains(err.Error(), "NoSuchBucket") { + return nil, apexs3.ErrBucketNotFound + } + return nil, fmt.Errorf("list objects: %w", err) + } + + res := &apexs3.ListObjectsResult{ + Bucket: bucket, + Prefix: prefix, + Delimiter: delimiter, + IsTruncated: aws.ToBool(out.IsTruncated), + } + + for _, cp := range out.CommonPrefixes { + res.CommonPrefixes = append(res.CommonPrefixes, aws.ToString(cp.Prefix)) + } + + for _, obj := range out.Contents { + var lastModified time.Time + if obj.LastModified != nil { + lastModified = *obj.LastModified + } + + var size int64 + if obj.Size != nil { + size = *obj.Size + } + + var storageClass string + if obj.StorageClass != "" { + storageClass = string(obj.StorageClass) + } else { + storageClass = "STANDARD" + } + + res.Objects = append(res.Objects, apexs3.ObjectInfo{ + Key: aws.ToString(obj.Key), + LastModified: lastModified, + ETag: strings.Trim(aws.ToString(obj.ETag), "\""), + Size: size, + StorageClass: storageClass, + }) + } + + return res, nil +} + +func (s *S3ObjectStore) HeadObject(ctx context.Context, bucket, key string) (*apexs3.Object, error) { + out, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + var nfe *s3types.NotFound + if errors.As(err, &nfe) || strings.Contains(err.Error(), "NotFound") { + return nil, apexs3.ErrObjectNotFound + } + return nil, fmt.Errorf("head object: %w", err) + } + + var lastModified time.Time + if out.LastModified != nil { + lastModified = *out.LastModified + } + + var size int64 + if out.ContentLength != nil { + size = *out.ContentLength + } + + // Extract custom metadata for Celestia + var height uint64 + var namespace string + var commitments []string + if out.Metadata != nil { + if h, ok := out.Metadata["celestia-height"]; ok { + _, _ = fmt.Sscanf(h, "%d", &height) + } + if n, ok := out.Metadata["celestia-namespace"]; ok { + namespace = n + } + if c, ok := out.Metadata["celestia-commitments"]; ok { + commitments = strings.Split(c, ",") + } + } + + return &apexs3.Object{ + Key: key, + Bucket: bucket, + Size: size, + ETag: strings.Trim(aws.ToString(out.ETag), "\""), + ContentType: aws.ToString(out.ContentType), + LastModified: lastModified, + Height: height, + Namespace: namespace, + Commitments: commitments, + }, nil +} + +func (s *S3ObjectStore) UpdateObjectWithBlobs(ctx context.Context, bucket, key string, height uint64, commitments []string) error { + // To update metadata in S3, we have to copy the object to itself with new metadata. + + // First head the object to get its current content type and existing metadata + head, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + return fmt.Errorf("head object for update: %w", err) + } + + metadata := head.Metadata + if metadata == nil { + metadata = make(map[string]string) + } + + metadata["celestia-height"] = strconv.FormatUint(height, 10) + metadata["celestia-commitments"] = strings.Join(commitments, ",") + + source := fmt.Sprintf("%s/%s", bucket, key) + _, err = s.client.CopyObject(ctx, &s3.CopyObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + CopySource: aws.String(source), + MetadataDirective: s3types.MetadataDirectiveReplace, + Metadata: metadata, + ContentType: head.ContentType, + }) + if err != nil { + return fmt.Errorf("update object metadata: %w", err) + } + return nil +} diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go index 8659966..8d6d592 100644 --- a/pkg/store/sqlite.go +++ b/pkg/store/sqlite.go @@ -102,6 +102,7 @@ type migrationStep struct { var allMigrations = []migrationStep{ {version: 1, file: "migrations/001_init.sql"}, {version: 2, file: "migrations/002_commitment_index.sql"}, + {version: 3, file: "migrations/003_s3_objects.sql"}, } func (s *SQLiteStore) migrate() error {