diff --git a/cmd/apex/main.go b/cmd/apex/main.go index 461b7a0..d108e0f 100644 --- a/cmd/apex/main.go +++ b/cmd/apex/main.go @@ -24,6 +24,7 @@ import ( "github.com/evstack/apex/pkg/fetch" "github.com/evstack/apex/pkg/metrics" "github.com/evstack/apex/pkg/profile" + apexs3 "github.com/evstack/apex/pkg/s3" "github.com/evstack/apex/pkg/store" "github.com/evstack/apex/pkg/submit" syncer "github.com/evstack/apex/pkg/sync" @@ -214,6 +215,45 @@ func setStoreMetrics(db store.Store, rec metrics.Recorder) { } } +func setupS3Server(cfg *config.Config, db store.Store, blobSubmitter submit.Submitter, log zerolog.Logger) (*http.Server, error) { + if !cfg.S3.Enabled { + return nil, nil + } + + var ns types.Namespace + if cfg.S3.Namespace != "" { + var err error + ns, err = types.NamespaceFromHex(cfg.S3.Namespace) + if err != nil { + return nil, fmt.Errorf("parse S3 namespace: %w", err) + } + } + + sqliteDB, ok := db.(*store.SQLiteStore) + if !ok { + return nil, fmt.Errorf("S3 API requires SQLite store, got %T", db) + } + + objStore := store.NewObjectStore(sqliteDB, ns) + s3Svc := apexs3.NewService(objStore, blobSubmitter, ns) + s3Srv := apexs3.NewServer(s3Svc, cfg.S3.Region, cfg.S3.AccessKeyID, cfg.S3.SecretAccessKey, log) + + httpSrv := &http.Server{ + Addr: cfg.S3.ListenAddr, + Handler: s3Srv, + ReadHeaderTimeout: 10 * time.Second, + } + + go func() { + log.Info().Str("addr", cfg.S3.ListenAddr).Msg("S3 API server listening") + if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error().Err(err).Msg("S3 API server error") + } + }() + + return httpSrv, nil +} + func persistNamespaces(ctx context.Context, db store.Store, namespaces []types.Namespace) error { for _, ns := range namespaces { if err := db.PutNamespace(ctx, ns); err != nil { @@ -282,11 +322,21 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { } defer dataFetcher.Close() //nolint:errcheck - svc, notifier, closeSubmitter, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec) + blobSubmitter, err := openBlobSubmitter(cfg) if err != nil { return err } - defer closeSubmitter() + if blobSubmitter != nil { + defer blobSubmitter.Close() //nolint:errcheck + } + + // Setup S3 API server if enabled. + s3Srv, err := setupS3Server(cfg, db, blobSubmitter, log.Logger) + if err != nil { + return fmt.Errorf("setup S3 server: %w", err) + } + + svc, notifier := setupAPIService(cfg, db, dataFetcher, proofFwd, rec, blobSubmitter) // Build and run the sync coordinator with observer hook. coordOpts, closeBackfill, err := buildCoordinatorOptions(cfg, notifier, rec) @@ -342,7 +392,7 @@ func runIndexer(ctx context.Context, cfg *config.Config) error { err = coord.Run(ctx) - gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv) + gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv, s3Srv) if err != nil && !errors.Is(err, context.Canceled) { return fmt.Errorf("coordinator: %w", err) @@ -385,19 +435,7 @@ func openBlobSubmitter(cfg *config.Config) (*submit.DirectSubmitter, error) { return blobSubmitter, nil } -func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder) (*api.Service, *api.Notifier, func(), error) { - blobSubmitter, err := openBlobSubmitter(cfg) - if err != nil { - return nil, nil, nil, err - } - - closeSubmitter := func() {} - if blobSubmitter != nil { - closeSubmitter = func() { - _ = blobSubmitter.Close() - } - } - +func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder, blobSubmitter submit.Submitter) (*api.Service, *api.Notifier) { notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger) notifier.SetMetrics(rec) @@ -407,7 +445,7 @@ func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataF } svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger, svcOpts...) - return svc, notifier, closeSubmitter, nil + return svc, notifier } func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec metrics.Recorder) ([]syncer.Option, func(), error) { @@ -436,7 +474,7 @@ func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec met return coordOpts, closeBackfill, nil } -func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) { +func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server, s3Srv *http.Server) { stopped := make(chan struct{}) go func() { grpcSrv.GracefulStop() @@ -457,6 +495,12 @@ func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *me log.Error().Err(err).Msg("JSON-RPC server shutdown error") } + if s3Srv != nil { + if err := s3Srv.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("S3 API server shutdown error") + } + } + if metricsSrv != nil { if err := metricsSrv.Shutdown(shutdownCtx); err != nil { log.Error().Err(err).Msg("metrics server shutdown error") diff --git a/config/config.go b/config/config.go index 123e336..2a45cea 100644 --- a/config/config.go +++ b/config/config.go @@ -17,6 +17,7 @@ type Config struct { Profiling ProfilingConfig `yaml:"profiling"` Log LogConfig `yaml:"log"` Submission SubmissionConfig `yaml:"submission"` + S3 S3APIConfig `yaml:"s3"` } // DataSourceConfig configures the Celestia data source. @@ -94,6 +95,16 @@ type LogConfig struct { Format string `yaml:"format"` } +// S3APIConfig configures the S3-compatible API server. +type S3APIConfig struct { + Enabled bool `yaml:"enabled"` + ListenAddr string `yaml:"listen_addr"` + Region string `yaml:"region"` + Namespace string `yaml:"namespace"` // Celestia namespace for S3 objects (hex) + AccessKeyID string `yaml:"access_key_id"` // optional SigV4 access key for the S3 API + SecretAccessKey string `yaml:"secret_access_key"` // optional SigV4 secret key for the S3 API +} + // SubmissionConfig contains settings for the future blob submission pipeline. type SubmissionConfig struct { Enabled bool `yaml:"enabled"` @@ -155,6 +166,11 @@ func DefaultConfig() Config { Level: "info", Format: "json", }, + S3: S3APIConfig{ + Enabled: false, + ListenAddr: ":8333", + Region: "us-east-1", + }, } } diff --git a/config/load.go b/config/load.go index ca90809..9a0c696 100644 --- a/config/load.go +++ b/config/load.go @@ -95,6 +95,19 @@ storage: # endpoint: "" # custom endpoint for MinIO, R2, etc. # chunk_size: 64 # heights per S3 object +s3: + # Enable the S3-compatible HTTP API backed by SQLite object storage. + enabled: false + # Address for the S3-compatible API server. + listen_addr: ":8333" + # AWS region reported to clients. + region: "us-east-1" + # Namespace used when S3 uploads are submitted to Celestia. + namespace: "" + # Optional SigV4 credentials enforced by the S3 API. + access_key_id: "" + secret_access_key: "" + rpc: # Address for the JSON-RPC API server (HTTP/WebSocket) listen_addr: ":8080" @@ -290,6 +303,9 @@ func validate(cfg *Config) error { if err := validateSubmission(&cfg.Submission); err != nil { return err } + if err := validateS3API(&cfg.S3, &cfg.Storage, &cfg.Submission); err != nil { + return err + } if !validLogLevels[cfg.Log.Level] { return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level) } @@ -381,6 +397,42 @@ func validateSubmission(s *SubmissionConfig) error { return nil } +func validateS3API(s3cfg *S3APIConfig, storage *StorageConfig, submission *SubmissionConfig) error { + s3cfg.ListenAddr = strings.TrimSpace(s3cfg.ListenAddr) + s3cfg.Region = strings.TrimSpace(s3cfg.Region) + s3cfg.Namespace = strings.TrimSpace(s3cfg.Namespace) + s3cfg.AccessKeyID = strings.TrimSpace(s3cfg.AccessKeyID) + s3cfg.SecretAccessKey = strings.TrimSpace(s3cfg.SecretAccessKey) + if s3cfg.Region == "" { + s3cfg.Region = DefaultConfig().S3.Region + } + if (s3cfg.AccessKeyID == "") != (s3cfg.SecretAccessKey == "") { + return errors.New("s3.access_key_id and s3.secret_access_key must be provided together") + } + if !s3cfg.Enabled { + return nil + } + if s3cfg.ListenAddr == "" { + return errors.New("s3.listen_addr is required when s3.enabled is true") + } + if storage.Type != "sqlite" && storage.Type != "" { + return errors.New("s3.enabled requires storage.type to be \"sqlite\"") + } + if s3cfg.Namespace != "" { + ns, err := types.NamespaceFromHex(s3cfg.Namespace) + if err != nil { + return fmt.Errorf("s3.namespace is invalid: %w", err) + } + if err := ns.ValidateForBlob(); err != nil { + return fmt.Errorf("s3.namespace is invalid: %w", err) + } + } + if submission.Enabled && s3cfg.Namespace == "" { + return errors.New("s3.namespace is required when both s3.enabled and submission.enabled are true") + } + return nil +} + func resolveSubmissionSignerKeyPath(s *SubmissionConfig, baseDir string) error { if !s.Enabled { return nil diff --git a/config/load_test.go b/config/load_test.go index 9dad243..5b1d911 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -330,3 +330,175 @@ log: t.Fatalf("unexpected error: %v", err) } } + +func TestGenerateIncludesS3Section(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "config.yaml") + if err := Generate(path); err != nil { + t.Fatalf("Generate: %v", err) + } + + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("ReadFile: %v", err) + } + content := string(data) + if !strings.Contains(content, "\ns3:\n") { + t.Fatalf("generated config missing s3 section:\n%s", content) + } + if !strings.Contains(content, `listen_addr: ":8333"`) { + t.Fatalf("generated config missing s3 listen addr default:\n%s", content) + } +} + +func TestLoadRejectsS3EnabledWithNonSQLiteStorage(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "config.yaml") + content := ` +data_source: + type: "node" + celestia_node_url: "http://localhost:26658" + +storage: + type: "s3" + s3: + bucket: "bucket" + region: "us-east-1" + +s3: + enabled: true + listen_addr: ":8333" + +log: + level: "info" + format: "json" +` + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + _, err := Load(path) + if err == nil { + t.Fatal("expected validation error, got nil") + } + if !strings.Contains(err.Error(), `s3.enabled requires storage.type to be "sqlite"`) { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestLoadRejectsS3SubmissionWithoutNamespace(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "config.yaml") + keyPath := filepath.Join(t.TempDir(), "submit.key") + if err := os.WriteFile(keyPath, []byte("00112233"), 0o600); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + content := ` +data_source: + type: "app" + celestia_app_grpc_addr: "localhost:9090" + +storage: + type: "sqlite" + db_path: "apex.db" + +submission: + enabled: true + app_grpc_addr: "localhost:9090" + chain_id: "mychain" + signer_key: "` + keyPath + `" + +s3: + enabled: true + listen_addr: ":8333" + +log: + level: "info" + format: "json" +` + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + _, err := Load(path) + if err == nil { + t.Fatal("expected validation error, got nil") + } + if !strings.Contains(err.Error(), "s3.namespace is required") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestLoadRejectsInvalidS3Namespace(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "config.yaml") + content := ` +data_source: + type: "node" + celestia_node_url: "http://localhost:26658" + +storage: + type: "sqlite" + db_path: "apex.db" + +s3: + enabled: true + listen_addr: ":8333" + namespace: "not-a-namespace" + +log: + level: "info" + format: "json" +` + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + _, err := Load(path) + if err == nil { + t.Fatal("expected validation error, got nil") + } + if !strings.Contains(err.Error(), "s3.namespace is invalid") { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestLoadRejectsPartialS3Credentials(t *testing.T) { + t.Parallel() + + path := filepath.Join(t.TempDir(), "config.yaml") + content := ` +data_source: + type: "node" + celestia_node_url: "http://localhost:26658" + +storage: + type: "sqlite" + db_path: "apex.db" + +s3: + enabled: true + listen_addr: ":8333" + access_key_id: "app-key" + +log: + level: "info" + format: "json" +` + if err := os.WriteFile(path, []byte(content), 0o644); err != nil { + t.Fatalf("WriteFile: %v", err) + } + + _, err := Load(path) + if err == nil { + t.Fatal("expected validation error, got nil") + } + if !strings.Contains(err.Error(), "s3.access_key_id and s3.secret_access_key must be provided together") { + t.Fatalf("unexpected error: %v", err) + } +} diff --git a/e2e/go.mod b/e2e/go.mod index cfeeccd..0333164 100644 --- a/e2e/go.mod +++ b/e2e/go.mod @@ -14,12 +14,16 @@ replace ( ) require ( - github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3 + github.com/aws/aws-sdk-go-v2 v1.41.1 + github.com/aws/aws-sdk-go-v2/config v1.32.9 + github.com/aws/aws-sdk-go-v2/credentials v1.19.9 + github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0 github.com/celestiaorg/go-square/v3 v3.0.2 github.com/celestiaorg/tastora v0.16.0 github.com/cosmos/cosmos-sdk v0.50.12 github.com/cosmos/ibc-go/v8 v8.7.0 github.com/evstack/apex v0.0.0-00010101000000-000000000000 + google.golang.org/grpc v1.79.1 ) require ( @@ -43,10 +47,7 @@ require ( github.com/ProjectZKM/Ziren/crates/go-runtime/zkvm_runtime v0.0.0-20251001021608-1fe7b43fc4d6 // indirect github.com/StackExchange/wmi v1.2.1 // indirect github.com/avast/retry-go/v4 v4.6.1 // indirect - github.com/aws/aws-sdk-go-v2 v1.41.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect - github.com/aws/aws-sdk-go-v2/config v1.32.9 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.19.9 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.17 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.17 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.17 // indirect @@ -56,7 +57,6 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.8 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.17 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.17 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0 // indirect github.com/aws/aws-sdk-go-v2/service/signin v1.0.5 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.10 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.14 // indirect @@ -66,9 +66,14 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/bgentry/speakeasy v0.2.0 // indirect github.com/bits-and-blooms/bitset v1.20.0 // indirect + github.com/btcsuite/btcd v0.24.2 // indirect + github.com/btcsuite/btcd/btcec/v2 v2.3.5 // indirect + github.com/btcsuite/btcd/btcutil v1.1.6 // indirect + github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect github.com/bytedance/gopkg v0.1.3 // indirect github.com/bytedance/sonic v1.15.0 // indirect github.com/bytedance/sonic/loader v0.5.0 // indirect + github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3 // indirect github.com/celestiaorg/nmt v0.24.2 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -237,7 +242,6 @@ require ( google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d // indirect - google.golang.org/grpc v1.79.1 // indirect google.golang.org/protobuf v1.36.11 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools/v3 v3.5.2 // indirect diff --git a/e2e/go.sum b/e2e/go.sum index f575c49..78069d5 100644 --- a/e2e/go.sum +++ b/e2e/go.sum @@ -84,6 +84,7 @@ github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrd github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= github.com/adlio/schema v1.3.9 h1:MLYk1VX1dn7xHW7Kdm1ywKKLjh19DRnrc65axS5xQA8= github.com/adlio/schema v1.3.9/go.mod h1:GnxXztHzNh6pIc7qm3sw+jsmHrXgBy/x2RBSkKZ3L4w= +github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= @@ -157,11 +158,30 @@ github.com/bgentry/speakeasy v0.2.0 h1:tgObeVOf8WAvtuAX6DhJ4xks4CFNwPDZiqzGqIHE5 github.com/bgentry/speakeasy v0.2.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bits-and-blooms/bitset v1.20.0 h1:2F+rfL86jE2d/bmw7OhqUg2Sj/1rURkBn3MdfoPyRVU= github.com/bits-and-blooms/bitset v1.20.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= +github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c/go.mod h1:tjmYdS6MLJ5/s0Fj4DbLgSbDHbEqLJrtnHecBFkdz5M= +github.com/btcsuite/btcd v0.23.5-0.20231215221805-96c9fd8078fd/go.mod h1:nm3Bko6zh6bWP60UxwoT5LzdGJsQJaPo6HjduXq9p6A= github.com/btcsuite/btcd v0.24.2 h1:aLmxPguqxza+4ag8R1I2nnJjSu2iFn/kqtHTIImswcY= +github.com/btcsuite/btcd v0.24.2/go.mod h1:5C8ChTkl5ejr3WHj8tkQSCmydiMEPB0ZhQhehpq7Dgg= github.com/btcsuite/btcd/btcec/v2 v2.3.3 h1:6+iXlDKE8RMtKsvK0gshlXIuPbyWM/h84Ensb7o3sC0= github.com/btcsuite/btcd/btcec/v2 v2.3.3/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= +github.com/btcsuite/btcd/btcutil v1.0.0/go.mod h1:Uoxwv0pqYWhD//tfTiipkxNfdhG9UrLwaeswfjfdF0A= +github.com/btcsuite/btcd/btcutil v1.1.0/go.mod h1:5OapHB7A2hBBWLm48mmw4MOHNJCcUBTwmWH/0Jn8VHE= +github.com/btcsuite/btcd/btcutil v1.1.5/go.mod h1:PSZZ4UitpLBWzxGd5VGOrLnmOjtPP/a6HaFo12zMs00= github.com/btcsuite/btcd/btcutil v1.1.6 h1:zFL2+c3Lb9gEgqKNzowKUPQNb8jV7v5Oaodi/AYFd6c= github.com/btcsuite/btcd/btcutil v1.1.6/go.mod h1:9dFymx8HpuLqBnsPELrImQeTQfKBQqzqGbbV3jK55aE= +github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= +github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 h1:59Kx4K6lzOW5w6nFlA0v5+lk/6sjybR934QNHSJZPTQ= +github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0/go.mod h1:7SFka0XMvUgj3hfZtydOrQY2mwhPclbT2snogU7SQQc= +github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= +github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= +github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= +github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= +github.com/btcsuite/goleveldb v1.0.0/go.mod h1:QiK9vBlgftBg6rWQIj6wFzbPfRjiykIEhBH4obrXJ/I= +github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= +github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= +github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= +github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/bufbuild/protocompile v0.14.1 h1:iA73zAf/fyljNjQKwYzUHD6AD4R8KMasmwa/FBatYVw= github.com/bufbuild/protocompile v0.14.1/go.mod h1:ppVdAIhbr2H8asPk6k4pY7t9zB1OU5DoEw9xY/FUi1c= github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= @@ -286,6 +306,7 @@ github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7Do github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/danieljoos/wincred v1.2.1 h1:dl9cBrupW8+r5250DYkYxocLeZ1Y4vB1kxgtjxw8GQs= github.com/danieljoos/wincred v1.2.1/go.mod h1:uGaFL9fDn3OLTvzCGulzE+SzjEe5NGlh5FdCcyfPwps= +github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -294,10 +315,13 @@ github.com/dchest/siphash v1.2.3 h1:QXwFc8cFOR2dSa/gE6o/HokBMWtLUaNDVd+22aKHeEA= github.com/dchest/siphash v1.2.3/go.mod h1:0NvQU092bT0ipiFN++/rXm69QG9tVxLAlQHIXMPAkHc= github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM= github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4= +github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U0x++OzVrdms8= github.com/decred/dcrd/crypto/blake256 v1.1.0/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= +github.com/decred/dcrd/lru v1.0.0/go.mod h1:mxKOwFd7lFjN2GZYsiz/ecgqR6kkYAl+0pz0tEMk218= github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE= github.com/desertbit/timer v1.0.1 h1:yRpYNn5Vaaj6QXecdLMPMJsW81JLiI1eokUft5nBmeo= github.com/desertbit/timer v1.0.1/go.mod h1:htRrYeY5V/t4iu1xCJ5XsQvp4xve8QulXXctAzxqcwE= @@ -504,6 +528,7 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grafana/otel-profiling-go v0.5.1 h1:stVPKAFZSa7eGiqbYuG25VcqYksR6iWvF3YH66t4qL8= @@ -599,6 +624,8 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= +github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jhump/protoreflect v1.15.3 h1:6SFRuqU45u9hIZPJAoZ8c28T3nK64BNdp9w6jFonzls= github.com/jhump/protoreflect v1.15.3/go.mod h1:4ORHmSBmlCW8fh3xHmJMGyul1zNqZK4Elxc8qKP+p1k= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= @@ -608,6 +635,7 @@ github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= @@ -619,6 +647,7 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= @@ -732,12 +761,14 @@ github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:v github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c= github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= +github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= @@ -916,6 +947,7 @@ github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8 github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe h1:nbdqkIGOGfUAD54q1s2YBcBz/WcsxCO9HUQ4aGV5hUw= github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= +github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d h1:vfofYNRScrDdvS342BElfbETmL1Aiz3i2t0zfRj16Hs= github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d/go.mod h1:RRCYJbIwD5jmqPI9XoAFR0OcDxqUctll6zUj/+B4S48= github.com/tendermint/go-amino v0.16.0 h1:GyhmgQKvqF82e2oZeuMSp9JTN0N09emoSZlb2lyGa2E= @@ -1020,6 +1052,7 @@ go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/arch v0.15.0 h1:QtOrQd0bTUnhNVNndMpLHNWrDmYzZ2KDqSrEymqInZw= golang.org/x/arch v0.15.0/go.mod h1:JmwW7aLIoRUKgaTzhkiEFxvcEiQGyOg9BMonBJUS7EE= +golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= @@ -1049,6 +1082,7 @@ golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1069,6 +1103,7 @@ golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200421231249-e086a090c8fd/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -1124,8 +1159,10 @@ golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/e2e/s3_test.go b/e2e/s3_test.go new file mode 100644 index 0000000..bb3a170 --- /dev/null +++ b/e2e/s3_test.go @@ -0,0 +1,176 @@ +package e2e + +import ( + "bytes" + "context" + "io" + "path/filepath" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + awsconfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + awss3 "github.com/aws/aws-sdk-go-v2/service/s3" +) + +func TestS3ObjectLifecycle(t *testing.T) { + if testing.Short() { + t.Skip("skipping Docker-backed e2e test in short mode") + } + + ctx, cancel := context.WithTimeout(context.Background(), chainStartupTimeout) + defer cancel() + + grpcAddr, chainID, signerKeyHex, signerAddress := startSubmissionTestChain(t, ctx) + namespace := testNamespace(t, []byte("apex-s3")) + data := []byte("apex s3 e2e") + commitment := mustBlobCommitment(t, namespace, data) + accessKeyID := "settings-key" + secretKey := "settings-secret" + + apexBinary := buildApexBinary(t) + apexRPCAddr := reserveTCPAddr(t) + apexGRPCAddr := reserveTCPAddr(t) + apexS3Addr := reserveTCPAddr(t) + keyPath := writeSignerKey(t, signerKeyHex) + configPath := writeApexConfig(t, apexConfig{ + Namespace: namespace, + DataGRPCAddr: grpcAddr, + SubmissionGRPC: grpcAddr, + ChainID: chainID, + SignerKeyPath: keyPath, + StoragePath: filepath.Join(t.TempDir(), "apex.db"), + RPCListenAddr: apexRPCAddr, + GRPCListenAddr: apexGRPCAddr, + S3Enabled: true, + S3ListenAddr: apexS3Addr, + S3Region: "us-east-1", + S3Namespace: namespace, + S3AccessKeyID: accessKeyID, + S3SecretKey: secretKey, + GasPrice: submissionGasPrice, + MaxGasPrice: submissionGasPrice, + ConfirmTimeoutS: submissionConfirmTimeout, + }) + + proc := startApexProcess(t, apexBinary, configPath) + defer proc.Stop(t) + + waitForApexHTTP(t, proc, apexRPCAddr) + waitForS3HTTP(t, proc, apexS3Addr) + + client := newS3Client(t, "http://"+apexS3Addr, accessKeyID, secretKey) + + bucket := "apex-s3-e2e" + key := "hello.txt" + + listBuckets, err := client.ListBuckets(context.Background(), &awss3.ListBucketsInput{}) + if err != nil { + t.Fatalf("ListBuckets: %v", err) + } + if len(listBuckets.Buckets) != 0 { + t.Fatalf("expected no buckets, got %d", len(listBuckets.Buckets)) + } + + if _, err := client.CreateBucket(context.Background(), &awss3.CreateBucketInput{ + Bucket: aws.String(bucket), + }); err != nil { + t.Fatalf("CreateBucket: %v", err) + } + + putOut, err := client.PutObject(context.Background(), &awss3.PutObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + Body: bytes.NewReader(data), + ContentType: aws.String("text/plain"), + }) + if err != nil { + t.Fatalf("PutObject: %v", err) + } + if aws.ToString(putOut.ETag) == "" { + t.Fatal("expected ETag from PutObject") + } + + headOut, err := client.HeadObject(context.Background(), &awss3.HeadObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + t.Fatalf("HeadObject: %v", err) + } + if headOut.ContentLength == nil || aws.ToInt64(headOut.ContentLength) != int64(len(data)) { + t.Fatalf("content length = %v, want %d", headOut.ContentLength, len(data)) + } + if aws.ToString(headOut.ETag) == "" { + t.Fatal("expected ETag from HeadObject") + } + + getOut, err := client.GetObject(context.Background(), &awss3.GetObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }) + if err != nil { + t.Fatalf("GetObject: %v", err) + } + gotData, err := io.ReadAll(getOut.Body) + _ = getOut.Body.Close() + if err != nil { + t.Fatalf("read object body: %v", err) + } + if !bytes.Equal(gotData, data) { + t.Fatalf("object data = %q, want %q", gotData, data) + } + + listObjects, err := client.ListObjectsV2(context.Background(), &awss3.ListObjectsV2Input{ + Bucket: aws.String(bucket), + }) + if err != nil { + t.Fatalf("ListObjectsV2: %v", err) + } + if len(listObjects.Contents) != 1 { + t.Fatalf("expected 1 object, got %d", len(listObjects.Contents)) + } + if got := aws.ToString(listObjects.Contents[0].Key); got != key { + t.Fatalf("listed key = %q, want %q", got, key) + } + + waitForIndexedBlob(t, proc, apexRPCAddr, commitment, data, namespace, signerAddress) + + if _, err := client.DeleteObject(context.Background(), &awss3.DeleteObjectInput{ + Bucket: aws.String(bucket), + Key: aws.String(key), + }); err != nil { + t.Fatalf("DeleteObject: %v", err) + } + + if _, err := client.DeleteBucket(context.Background(), &awss3.DeleteBucketInput{ + Bucket: aws.String(bucket), + }); err != nil { + t.Fatalf("DeleteBucket: %v", err) + } + + listBuckets, err = client.ListBuckets(context.Background(), &awss3.ListBucketsInput{}) + if err != nil { + t.Fatalf("ListBuckets after delete: %v", err) + } + if len(listBuckets.Buckets) != 0 { + t.Fatalf("expected no buckets after delete, got %d", len(listBuckets.Buckets)) + } +} + +func newS3Client(t *testing.T, endpoint, accessKeyID, secretKey string) *awss3.Client { + t.Helper() + + cfg, err := awsconfig.LoadDefaultConfig(context.Background(), + awsconfig.WithRegion("us-east-1"), + awsconfig.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(accessKeyID, secretKey, "")), + ) + if err != nil { + t.Fatalf("load AWS config: %v", err) + } + + return awss3.NewFromConfig(cfg, func(o *awss3.Options) { + o.UsePathStyle = true + o.BaseEndpoint = aws.String(endpoint) + }) +} diff --git a/e2e/submission_test.go b/e2e/submission_test.go index fe4eb28..365d85b 100644 --- a/e2e/submission_test.go +++ b/e2e/submission_test.go @@ -117,6 +117,12 @@ type apexConfig struct { StoragePath string RPCListenAddr string GRPCListenAddr string + S3Enabled bool + S3ListenAddr string + S3Region string + S3Namespace []byte + S3AccessKeyID string + S3SecretKey string GasPrice float64 MaxGasPrice float64 ConfirmTimeoutS int @@ -212,6 +218,10 @@ func writeApexConfig(t *testing.T, cfg apexConfig) string { t.Helper() configPath := filepath.Join(t.TempDir(), "apex.yaml") + s3Region := cfg.S3Region + if s3Region == "" { + s3Region = "us-east-1" + } configYAML := fmt.Sprintf(`data_source: type: "app" celestia_app_grpc_addr: "%s" @@ -232,6 +242,14 @@ storage: type: "sqlite" db_path: "%s" +s3: + enabled: %t + listen_addr: "%s" + region: "%s" + namespace: "%s" + access_key_id: "%s" + secret_access_key: "%s" + rpc: listen_addr: "%s" grpc_listen_addr: "%s" @@ -266,6 +284,12 @@ log: cfg.MaxGasPrice, cfg.ConfirmTimeoutS, cfg.StoragePath, + cfg.S3Enabled, + cfg.S3ListenAddr, + s3Region, + hex.EncodeToString(cfg.S3Namespace), + cfg.S3AccessKeyID, + cfg.S3SecretKey, cfg.RPCListenAddr, cfg.GRPCListenAddr, ) @@ -349,6 +373,31 @@ func waitForApexHTTP(t *testing.T, proc *apexProcess, rpcAddr string) { t.Fatalf("apex HTTP endpoint did not become reachable at %s\n%s", url, proc.logs.String()) } +func waitForS3HTTP(t *testing.T, proc *apexProcess, s3Addr string) { + t.Helper() + + client := &http.Client{Timeout: 2 * time.Second} + url := "http://" + s3Addr + "/" + + deadline := time.Now().Add(apexReadyTimeout) + for time.Now().Before(deadline) { + select { + case <-proc.done: + t.Fatalf("apex exited before S3 became ready: %v\n%s", proc.waitErr, proc.logs.String()) + default: + } + + resp, err := client.Get(url) + if err == nil { + _ = resp.Body.Close() + return + } + time.Sleep(submissionPollInterval) + } + + t.Fatalf("apex S3 endpoint did not become reachable at %s\n%s", url, proc.logs.String()) +} + func mustBlobCommitment(t *testing.T, namespace []byte, data []byte) []byte { t.Helper() diff --git a/go.mod b/go.mod index e0519ca..1b9b766 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.19.9 github.com/aws/aws-sdk-go-v2/service/s3 v1.96.0 github.com/btcsuite/btcd/btcutil v1.1.6 + github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3 github.com/celestiaorg/go-square/v3 v3.0.2 github.com/cockroachdb/pebble v1.1.5 github.com/cosmos/btcutil v1.0.5 @@ -46,6 +47,7 @@ require ( github.com/btcsuite/btcd v0.24.2 // indirect github.com/btcsuite/btcd/btcec/v2 v2.1.3 // indirect github.com/btcsuite/btcd/chaincfg/chainhash v1.1.0 // indirect + github.com/celestiaorg/nmt v0.24.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cockroachdb/errors v1.11.3 // indirect github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect @@ -82,6 +84,7 @@ require ( golang.org/x/crypto v0.46.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/net v0.48.0 // indirect + golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.32.0 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect diff --git a/go.sum b/go.sum index 73d76a7..759fa78 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,12 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku github.com/btcsuite/snappy-go v1.0.0/go.mod h1:8woku9dyThutzjeg+3xrA5iCpBRH8XEEg3lh6TiUghc= github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY= github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= +github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3 h1:wP84mtwOCVNOTfS3zErICjxKLnh74Z1uf+tdrlSFjVM= +github.com/celestiaorg/go-square/merkle v0.0.0-20240627094109-7d01436067a3/go.mod h1:86qIYnEhmn/hfW+xvw98NOI3zGaDEB3x8JGjYo2FqLs= github.com/celestiaorg/go-square/v3 v3.0.2 h1:eSQOgNII8inK9IhiBZ+6GADQeWbRq4HYY72BOgcduA4= github.com/celestiaorg/go-square/v3 v3.0.2/go.mod h1:oFReMLsSDMRs82ICFEeFQFCqNvwdsbIM1BzCcb0f7dM= +github.com/celestiaorg/nmt v0.24.2 h1:LlpJSPOd6/Lw1Ig6HUhZuqiINHLka/ZSRTBzlNJpchg= +github.com/celestiaorg/nmt v0.24.2/go.mod h1:vgLBpWBi8F5KLxTdXSwb7AU4NhiIQ1AQRGa+PzdcLEA= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -145,6 +149,8 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/orderedcode v0.0.1 h1:UzfcAexk9Vhv8+9pNOgRu41f16lHq725vPwnSeiG/Us= github.com/google/orderedcode v0.0.1/go.mod h1:iVyU4/qPKHY5h/wSd6rZZCDcLJNxiWO6dvsYES2Sb20= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= @@ -243,6 +249,12 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY= github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8= diff --git a/pkg/s3/auth.go b/pkg/s3/auth.go new file mode 100644 index 0000000..c4f598b --- /dev/null +++ b/pkg/s3/auth.go @@ -0,0 +1,256 @@ +package s3 + +import ( + "crypto/hmac" + "crypto/sha256" + "crypto/subtle" + "encoding/hex" + "errors" + "fmt" + "net/http" + "net/url" + "sort" + "strings" + "time" +) + +const ( + authorizationAlgorithm = "AWS4-HMAC-SHA256" + emptyPayloadSHA256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" +) + +type authConfig struct { + accessKeyID string + secretAccessKey string +} + +type authHeader struct { + credential string + signedHeaders []string + signature string +} + +type authError struct { + code string + message string +} + +func (e *authError) Error() string { + if e == nil { + return "" + } + return e.code + ": " + e.message +} + +func (s *Server) authenticateRequest(r *http.Request) *authError { + if s.auth == nil { + return nil + } + + raw := strings.TrimSpace(r.Header.Get("Authorization")) + if raw == "" { + return &authError{code: "AccessDenied", message: "AWS Signature Version 4 authorization is required"} + } + + authz, err := parseAuthHeader(raw) + if err != nil { + return &authError{code: "AccessDenied", message: err.Error()} + } + + credentialParts := strings.Split(authz.credential, "/") + if len(credentialParts) != 5 || credentialParts[4] != "aws4_request" { + return &authError{code: "AccessDenied", message: "invalid credential scope"} + } + if credentialParts[0] != s.auth.accessKeyID { + return &authError{code: "InvalidAccessKeyId", message: "The AWS Access Key Id you provided does not exist in our records."} + } + if credentialParts[3] != "s3" { + return &authError{code: "AccessDenied", message: "invalid service scope"} + } + if s.region != "" && credentialParts[2] != s.region { + return &authError{code: "SignatureDoesNotMatch", message: "credential scope region does not match the configured S3 region"} + } + + amzDate := strings.TrimSpace(r.Header.Get("X-Amz-Date")) + if amzDate == "" { + return &authError{code: "AccessDenied", message: "missing X-Amz-Date header"} + } + if _, err := time.Parse("20060102T150405Z", amzDate); err != nil { + return &authError{code: "AccessDenied", message: "invalid X-Amz-Date header"} + } + + payloadHash := strings.TrimSpace(r.Header.Get("X-Amz-Content-Sha256")) + if payloadHash == "" { + payloadHash = emptyPayloadSHA256 + } + + canonicalRequest, err := buildCanonicalRequest(r, authz.signedHeaders, payloadHash) + if err != nil { + return &authError{code: "AccessDenied", message: err.Error()} + } + + scope := strings.Join(credentialParts[1:], "/") + stringToSign := strings.Join([]string{ + authorizationAlgorithm, + amzDate, + scope, + hashHex([]byte(canonicalRequest)), + }, "\n") + + signingKey := deriveSigningKey(s.auth.secretAccessKey, credentialParts[1], credentialParts[2], credentialParts[3]) + expectedSignature := hex.EncodeToString(hmacSHA256(signingKey, stringToSign)) + if subtle.ConstantTimeCompare([]byte(expectedSignature), []byte(authz.signature)) != 1 { + return &authError{code: "SignatureDoesNotMatch", message: "The request signature we calculated does not match the signature you provided."} + } + + return nil +} + +func parseAuthHeader(raw string) (*authHeader, error) { + if !strings.HasPrefix(raw, authorizationAlgorithm+" ") { + return nil, errors.New("unsupported authorization algorithm") + } + + fields := strings.Split(strings.TrimPrefix(raw, authorizationAlgorithm+" "), ",") + values := make(map[string]string, len(fields)) + for _, field := range fields { + part := strings.TrimSpace(field) + key, value, ok := strings.Cut(part, "=") + if !ok { + return nil, fmt.Errorf("invalid authorization field %q", part) + } + values[key] = value + } + + credential := values["Credential"] + signedHeaders := values["SignedHeaders"] + signature := values["Signature"] + if credential == "" || signedHeaders == "" || signature == "" { + return nil, errors.New("authorization header is missing required fields") + } + + headers := strings.Split(signedHeaders, ";") + for i := range headers { + headers[i] = strings.TrimSpace(strings.ToLower(headers[i])) + if headers[i] == "" { + return nil, errors.New("authorization header contains an empty signed header") + } + } + + return &authHeader{ + credential: credential, + signedHeaders: headers, + signature: strings.TrimSpace(signature), + }, nil +} + +func buildCanonicalRequest(r *http.Request, signedHeaders []string, payloadHash string) (string, error) { + canonicalURI := r.URL.EscapedPath() + if canonicalURI == "" { + canonicalURI = "/" + } + + canonicalQuery := canonicalQueryString(r.URL.Query()) + + var headerBuilder strings.Builder + for _, headerName := range signedHeaders { + value, ok := canonicalHeaderValue(r, headerName) + if !ok { + return "", fmt.Errorf("missing signed header %q", headerName) + } + headerBuilder.WriteString(headerName) + headerBuilder.WriteByte(':') + headerBuilder.WriteString(value) + headerBuilder.WriteByte('\n') + } + + return strings.Join([]string{ + r.Method, + canonicalURI, + canonicalQuery, + headerBuilder.String(), + strings.Join(signedHeaders, ";"), + payloadHash, + }, "\n"), nil +} + +func canonicalQueryString(values url.Values) string { + if len(values) == 0 { + return "" + } + + type pair struct { + key string + value string + } + + pairs := make([]pair, 0, len(values)) + for key, vals := range values { + if len(vals) == 0 { + pairs = append(pairs, pair{key: key, value: ""}) + continue + } + for _, value := range vals { + pairs = append(pairs, pair{key: key, value: value}) + } + } + + sort.Slice(pairs, func(i, j int) bool { + if pairs[i].key == pairs[j].key { + return pairs[i].value < pairs[j].value + } + return pairs[i].key < pairs[j].key + }) + + parts := make([]string, len(pairs)) + for i, p := range pairs { + parts[i] = awsEncode(p.key) + "=" + awsEncode(p.value) + } + return strings.Join(parts, "&") +} + +func canonicalHeaderValue(r *http.Request, name string) (string, bool) { + if name == "host" { + if r.Host == "" { + return "", false + } + return normalizeHeaderValue(r.Host), true + } + + values, ok := r.Header[http.CanonicalHeaderKey(name)] + if !ok || len(values) == 0 { + return "", false + } + + normalized := make([]string, len(values)) + for i := range values { + normalized[i] = normalizeHeaderValue(values[i]) + } + return strings.Join(normalized, ","), true +} + +func normalizeHeaderValue(value string) string { + return strings.Join(strings.Fields(strings.TrimSpace(value)), " ") +} + +func awsEncode(value string) string { + return strings.ReplaceAll(url.QueryEscape(value), "+", "%20") +} + +func deriveSigningKey(secret, date, region, service string) []byte { + kDate := hmacSHA256([]byte("AWS4"+secret), date) + kRegion := hmacSHA256(kDate, region) + kService := hmacSHA256(kRegion, service) + return hmacSHA256(kService, "aws4_request") +} + +func hmacSHA256(key []byte, value string) []byte { + mac := hmac.New(sha256.New, key) + _, _ = mac.Write([]byte(value)) + return mac.Sum(nil) +} + +func hashHex(value []byte) string { + sum := sha256.Sum256(value) + return hex.EncodeToString(sum[:]) +} diff --git a/pkg/s3/integration_test.go b/pkg/s3/integration_test.go new file mode 100644 index 0000000..0b9bd4d --- /dev/null +++ b/pkg/s3/integration_test.go @@ -0,0 +1,297 @@ +package s3_test + +import ( + "context" + "encoding/xml" + "errors" + "io" + "net/http" + "net/http/httptest" + "path/filepath" + "strings" + "testing" + + gsquare "github.com/celestiaorg/go-square/v3/share" + "github.com/rs/zerolog" + + apexs3 "github.com/evstack/apex/pkg/s3" + "github.com/evstack/apex/pkg/store" + "github.com/evstack/apex/pkg/submit" + "github.com/evstack/apex/pkg/types" +) + +// mockBlobSubmitter records submissions without talking to Celestia. +type mockBlobSubmitter struct { + calls int +} + +func (m *mockBlobSubmitter) Submit(_ context.Context, req *submit.Request) (*submit.Result, error) { + m.calls++ + if req == nil || len(req.Blobs) != 1 { + return nil, errors.New("expected a single blob") + } + return &submit.Result{Height: uint64(100 + m.calls)}, nil +} + +func testNamespace() types.Namespace { + namespace := gsquare.MustNewV0Namespace([]byte("apexs3x")) + var ns types.Namespace + copy(ns[:], namespace.Bytes()) + return ns +} + +// setupIntegrationServer wires a real SQLite store + mock submitter into the S3 HTTP server. +func setupIntegrationServer(t *testing.T) *httptest.Server { + t.Helper() + + dbPath := filepath.Join(t.TempDir(), "integration.db") + sqliteStore, err := store.Open(dbPath) + if err != nil { + t.Fatalf("open store: %v", err) + } + t.Cleanup(func() { _ = sqliteStore.Close() }) + + objStore := store.NewObjectStore(sqliteStore, testNamespace()) + sub := &mockBlobSubmitter{} + svc := apexs3.NewService(objStore, sub, testNamespace()) + log := zerolog.New(io.Discard) + srv := apexs3.NewServer(svc, "us-east-1", "", "", log) + + return httptest.NewServer(srv) +} + +func TestIntegration_FullObjectLifecycle(t *testing.T) { + ts := setupIntegrationServer(t) + defer ts.Close() + + client := ts.Client() + base := ts.URL + + // 1. List buckets — should be empty. + resp := doReq(t, client, http.MethodGet, base+"/") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusOK) + body := readBody(t, resp) + if strings.Contains(body, "") { + t.Errorf("expected no buckets, got: %s", body) + } + + // 2. Create bucket. + resp = doReq(t, client, http.MethodPut, base+"/test-bucket") //nolint:bodyclose // closed in closeBody + assertStatus(t, resp, http.StatusOK) + closeBody(t, resp) + + // 3. Head bucket — should exist. + resp = doReq(t, client, http.MethodHead, base+"/test-bucket") //nolint:bodyclose // closed in closeBody + assertStatus(t, resp, http.StatusOK) + closeBody(t, resp) + + // 4. Put object. + resp = doPutObject(t, client, base+"/test-bucket/hello.txt", "hello world", "text/plain") + assertStatus(t, resp, http.StatusOK) + etag := resp.Header.Get("ETag") + if etag == "" { + t.Error("expected ETag on PUT response") + } + closeBody(t, resp) + + // 5. Get object — verify data roundtrip. + resp = doReq(t, client, http.MethodGet, base+"/test-bucket/hello.txt") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusOK) + gotBody := readBody(t, resp) + if gotBody != "hello world" { + t.Errorf("expected 'hello world', got %q", gotBody) + } + + // 6. Head object. + resp = doReq(t, client, http.MethodHead, base+"/test-bucket/hello.txt") + assertStatus(t, resp, http.StatusOK) + if resp.Header.Get("Content-Length") != "11" { + t.Errorf("expected Content-Length 11, got %s", resp.Header.Get("Content-Length")) + } + closeBody(t, resp) + + // 7. List objects. + resp = doReq(t, client, http.MethodGet, base+"/test-bucket?list-type=2") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusOK) + listBody := readBody(t, resp) + if !strings.Contains(listBody, "hello.txt") { + t.Errorf("expected hello.txt in list response, got: %s", listBody) + } + + // 8. Put second object, list again. + resp = doPutObject(t, client, base+"/test-bucket/docs/readme.md", "# README", "text/markdown") //nolint:bodyclose // closed in closeBody + assertStatus(t, resp, http.StatusOK) + closeBody(t, resp) + + // 9. List with prefix. + resp = doReq(t, client, http.MethodGet, base+"/test-bucket?prefix=docs/") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusOK) + prefixBody := readBody(t, resp) + if !strings.Contains(prefixBody, "readme.md") { + t.Errorf("expected readme.md in prefix list, got: %s", prefixBody) + } + if strings.Contains(prefixBody, "hello.txt") { + t.Errorf("hello.txt should not appear in docs/ prefix list") + } + + // 10. Delete object. + resp = doReq(t, client, http.MethodDelete, base+"/test-bucket/hello.txt") //nolint:bodyclose // closed in closeBody + assertStatus(t, resp, http.StatusNoContent) + closeBody(t, resp) + + // 11. Verify deleted. + resp = doReq(t, client, http.MethodGet, base+"/test-bucket/hello.txt") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusNotFound) + errBody := readBody(t, resp) + if !strings.Contains(errBody, "NoSuchKey") { + t.Errorf("expected NoSuchKey error, got: %s", errBody) + } + + // 12. Delete second object then bucket. + resp = doReq(t, client, http.MethodDelete, base+"/test-bucket/docs/readme.md") //nolint:bodyclose // closed in closeBody + assertStatus(t, resp, http.StatusNoContent) + closeBody(t, resp) + + resp = doReq(t, client, http.MethodDelete, base+"/test-bucket") //nolint:bodyclose // closed in closeBody + assertStatus(t, resp, http.StatusNoContent) + closeBody(t, resp) + + // 13. Verify bucket gone. + resp = doReq(t, client, http.MethodHead, base+"/test-bucket") //nolint:bodyclose // closed in closeBody + assertStatus(t, resp, http.StatusNotFound) + closeBody(t, resp) + + // 14. List buckets — should be empty again. + resp = doReq(t, client, http.MethodGet, base+"/") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusOK) + finalBody := readBody(t, resp) + if strings.Contains(finalBody, "test-bucket") { + t.Errorf("expected no buckets after deletion, got: %s", finalBody) + } +} + +func TestIntegration_DeleteNonEmptyBucket(t *testing.T) { + ts := setupIntegrationServer(t) + defer ts.Close() + + client := ts.Client() + base := ts.URL + + resp := doReq(t, client, http.MethodPut, base+"/bucket") //nolint:bodyclose // closed in closeBody + closeBody(t, resp) + resp = doPutObject(t, client, base+"/bucket/file.txt", "data", "text/plain") //nolint:bodyclose // closed in closeBody + closeBody(t, resp) + + resp = doReq(t, client, http.MethodDelete, base+"/bucket") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusConflict) + body := readBody(t, resp) + if !strings.Contains(body, "BucketNotEmpty") { + t.Errorf("expected BucketNotEmpty error, got: %s", body) + } +} + +func TestIntegration_XMLStructure(t *testing.T) { + ts := setupIntegrationServer(t) + defer ts.Close() + + client := ts.Client() + base := ts.URL + + resp := doReq(t, client, http.MethodPut, base+"/xml-bucket") //nolint:bodyclose // closed in closeBody + closeBody(t, resp) + resp = doPutObject(t, client, base+"/xml-bucket/file.txt", "content", "text/plain") //nolint:bodyclose // closed in closeBody + closeBody(t, resp) + + resp = doReq(t, client, http.MethodGet, base+"/xml-bucket?list-type=2") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusOK) + + var result struct { + XMLName xml.Name `xml:"ListBucketResult"` + Name string `xml:"Name"` + Contents []struct { + Key string `xml:"Key"` + Size int64 `xml:"Size"` + ETag string `xml:"ETag"` + } `xml:",any"` + } + body := readBody(t, resp) + if err := xml.Unmarshal([]byte(body), &result); err != nil { + t.Fatalf("XML parse failed: %v\nbody: %s", err, body) + } + if result.Name != "xml-bucket" { + t.Errorf("expected bucket name 'xml-bucket', got %s", result.Name) + } +} + +func TestIntegration_EmptyObject(t *testing.T) { + ts := setupIntegrationServer(t) + defer ts.Close() + + client := ts.Client() + base := ts.URL + + resp := doReq(t, client, http.MethodPut, base+"/bucket") //nolint:bodyclose // closed in closeBody + closeBody(t, resp) + resp = doPutObject(t, client, base+"/bucket/empty.txt", "", "text/plain") //nolint:bodyclose // closed in closeBody + assertStatus(t, resp, http.StatusOK) + closeBody(t, resp) + + resp = doReq(t, client, http.MethodGet, base+"/bucket/empty.txt") //nolint:bodyclose // closed in readBody + assertStatus(t, resp, http.StatusOK) + if readBody(t, resp) != "" { + t.Error("expected empty body for empty object") + } +} + +// --- helpers --- + +func doReq(t *testing.T, client *http.Client, method, url string) *http.Response { + t.Helper() + req, err := http.NewRequestWithContext(context.Background(), method, url, nil) + if err != nil { + t.Fatalf("create request: %v", err) + } + resp, err := client.Do(req) + if err != nil { + t.Fatalf("%s %s failed: %v", method, url, err) + } + return resp +} + +func doPutObject(t *testing.T, client *http.Client, url, body, contentType string) *http.Response { + t.Helper() + req, err := http.NewRequestWithContext(context.Background(), http.MethodPut, url, strings.NewReader(body)) + if err != nil { + t.Fatalf("create PUT request: %v", err) + } + req.Header.Set("Content-Type", contentType) + resp, err := client.Do(req) + if err != nil { + t.Fatalf("PUT %s failed: %v", url, err) + } + return resp +} + +func assertStatus(t *testing.T, resp *http.Response, expected int) { + t.Helper() + if resp.StatusCode != expected { + body := readBody(t, resp) + t.Fatalf("expected status %d, got %d. Body: %s", expected, resp.StatusCode, body) + } +} + +func readBody(t *testing.T, resp *http.Response) string { + t.Helper() + data, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("read body: %v", err) + } + resp.Body.Close() //nolint:errcheck + return string(data) +} + +func closeBody(t *testing.T, resp *http.Response) { + t.Helper() + _, _ = io.Copy(io.Discard, resp.Body) + resp.Body.Close() //nolint:errcheck +} diff --git a/pkg/s3/server.go b/pkg/s3/server.go new file mode 100644 index 0000000..16e8268 --- /dev/null +++ b/pkg/s3/server.go @@ -0,0 +1,376 @@ +package s3 + +import ( + "encoding/xml" + "errors" + "fmt" + "net/http" + "net/url" + "strconv" + "strings" + "time" + + "github.com/rs/zerolog" +) + +const defaultContentType = "application/octet-stream" + +// Server implements an S3-compatible HTTP handler. +type Server struct { + svc *Service + log zerolog.Logger + region string + auth *authConfig +} + +// NewServer creates a new S3-compatible HTTP server. +func NewServer(svc *Service, region, accessKeyID, secretAccessKey string, log zerolog.Logger) *Server { + var auth *authConfig + if accessKeyID != "" || secretAccessKey != "" { + auth = &authConfig{ + accessKeyID: accessKeyID, + secretAccessKey: secretAccessKey, + } + } + return &Server{ + svc: svc, + log: log.With().Str("component", "s3-server").Logger(), + region: region, + auth: auth, + } +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.log.Debug().Str("method", r.Method).Str("path", r.URL.Path).Msg("request") + + if authErr := s.authenticateRequest(r); authErr != nil { + s.writeError(w, http.StatusForbidden, authErr.code, authErr.message) + return + } + + bucket, key := parsePath(r.URL.Path) + query := r.URL.Query() + + switch { + case bucket == "" && key == "": + s.handleService(r, w) + case bucket != "" && key == "": + if query.Get("list-type") != "" || query.Has("prefix") || query.Has("delimiter") { + s.handleListObjects(r, w, bucket) + } else if r.Method == http.MethodGet { + s.handleBucket(r, w, bucket) + } else if r.Method == http.MethodPut { + s.handleCreateBucket(r, w, bucket) + } else if r.Method == http.MethodDelete { + s.handleDeleteBucket(r, w, bucket) + } else if r.Method == http.MethodHead { + s.handleHeadBucket(r, w, bucket) + } else { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + } + case bucket != "" && key != "": + s.handleObject(r, w, bucket, key) + default: + s.writeError(w, http.StatusBadRequest, "InvalidRequest", "invalid path") + } +} + +func parsePath(p string) (bucket, key string) { + p = strings.TrimPrefix(p, "/") + if p == "" { + return "", "" + } + + // URL-decode each component to handle special characters. + parts := strings.SplitN(p, "/", 2) + bucket, _ = url.PathUnescape(parts[0]) + if len(parts) > 1 { + key, _ = url.PathUnescape(parts[1]) + } + return bucket, key +} + +func (s *Server) handleService(r *http.Request, w http.ResponseWriter) { + if r.Method != http.MethodGet { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + + buckets, err := s.svc.ListBuckets(r.Context()) + if err != nil { + s.writeError(w, http.StatusInternalServerError, "InternalError", err.Error()) + return + } + + type BucketXML struct { + Name string `xml:"Name"` + CreationDate string `xml:"CreationDate"` + } + type ListAllMyBucketsResult struct { + XMLName xml.Name `xml:"ListAllMyBucketsResult"` + Xmlns string `xml:"xmlns,attr"` + Owner struct { + ID string `xml:"ID"` + DisplayName string `xml:"DisplayName"` + } `xml:"Owner"` + Buckets struct { + Bucket []BucketXML `xml:"Bucket"` + } `xml:"Buckets"` + } + + result := ListAllMyBucketsResult{Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/"} + result.Owner.ID = "apex" + result.Owner.DisplayName = "apex" + for _, b := range buckets { + result.Buckets.Bucket = append(result.Buckets.Bucket, BucketXML{ + Name: b.Name, + CreationDate: b.CreatedAt.UTC().Format(time.RFC3339), + }) + } + + s.writeXML(w, result) +} + +func (s *Server) handleBucket(r *http.Request, w http.ResponseWriter, bucket string) { + if r.Method != http.MethodGet { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + s.handleListObjects(r, w, bucket) +} + +func (s *Server) handleListObjects(r *http.Request, w http.ResponseWriter, bucket string) { + query := r.URL.Query() + prefix := query.Get("prefix") + delimiter := query.Get("delimiter") + marker := query.Get("marker") + maxKeys := 1000 + if mk := query.Get("max-keys"); mk != "" { + if n, err := strconv.Atoi(mk); err == nil && n > 0 { + maxKeys = n + } + } + + result, err := s.svc.ListObjects(r.Context(), bucket, prefix, delimiter, marker, maxKeys) + if err != nil { + s.writeS3Error(w, err) + return + } + + type Contents struct { + Key string `xml:"Key"` + LastModified string `xml:"LastModified"` + ETag string `xml:"ETag"` + Size int64 `xml:"Size"` + StorageClass string `xml:"StorageClass"` + } + type CommonPrefix struct { + Prefix string `xml:"Prefix"` + } + type ListBucketResult struct { + XMLName xml.Name `xml:"ListBucketResult"` + Xmlns string `xml:"xmlns,attr"` + Name string `xml:"Name"` + Prefix string `xml:"Prefix"` + Marker string `xml:"Marker"` + NextMarker string `xml:"NextMarker,omitempty"` + MaxKeys int `xml:"MaxKeys"` + IsTruncated bool `xml:"IsTruncated"` + Contents []Contents `xml:",omitempty"` + CommonPrefixes []CommonPrefix `xml:",omitempty"` + } + + xmlResult := ListBucketResult{ + Xmlns: "http://s3.amazonaws.com/doc/2006-03-01/", + Name: result.Bucket, + Prefix: result.Prefix, + Marker: marker, + MaxKeys: maxKeys, + IsTruncated: result.IsTruncated, + NextMarker: result.NextMarker, + } + for _, obj := range result.Objects { + xmlResult.Contents = append(xmlResult.Contents, Contents{ + Key: obj.Key, + LastModified: obj.LastModified.UTC().Format(time.RFC3339), + ETag: fmt.Sprintf(`"%s"`, obj.ETag), + Size: obj.Size, + StorageClass: obj.StorageClass, + }) + } + for _, cp := range result.CommonPrefixes { + xmlResult.CommonPrefixes = append(xmlResult.CommonPrefixes, CommonPrefix{Prefix: cp}) + } + + s.writeXML(w, xmlResult) +} + +func (s *Server) handleCreateBucket(r *http.Request, w http.ResponseWriter, bucket string) { + if r.Method != http.MethodPut { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + + err := s.svc.CreateBucket(r.Context(), bucket) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.Header().Set("Location", "/"+bucket) + w.WriteHeader(http.StatusOK) +} + +func (s *Server) handleDeleteBucket(r *http.Request, w http.ResponseWriter, bucket string) { + if r.Method != http.MethodDelete { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + + err := s.svc.DeleteBucket(r.Context(), bucket) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) handleHeadBucket(r *http.Request, w http.ResponseWriter, bucket string) { + if r.Method != http.MethodHead { + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + return + } + + _, err := s.svc.HeadBucket(r.Context(), bucket) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (s *Server) handleObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + switch r.Method { + case http.MethodGet: + s.handleGetObject(r, w, bucket, key) + case http.MethodPut: + s.handlePutObject(r, w, bucket, key) + case http.MethodDelete: + s.handleDeleteObject(r, w, bucket, key) + case http.MethodHead: + s.handleHeadObject(r, w, bucket, key) + default: + s.writeError(w, http.StatusMethodNotAllowed, "MethodNotAllowed", "") + } +} + +func (s *Server) handleGetObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + obj, data, err := s.svc.GetObject(r.Context(), bucket, key) + if err != nil { + s.writeS3Error(w, err) + return + } + + ct := obj.ContentType + if ct == "" { + ct = defaultContentType + } + w.Header().Set("Content-Type", ct) + w.Header().Set("Content-Length", strconv.FormatInt(obj.Size, 10)) + w.Header().Set("ETag", fmt.Sprintf(`"%s"`, obj.ETag)) + w.Header().Set("Last-Modified", obj.LastModified.UTC().Format(http.TimeFormat)) + _, _ = w.Write(data) +} + +func (s *Server) handlePutObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + contentType := r.Header.Get("Content-Type") + if contentType == "" { + contentType = defaultContentType + } + + obj, err := s.svc.PutObject(r.Context(), bucket, key, r.Body, contentType) + if err != nil { + s.writeS3Error(w, err) + return + } + + w.Header().Set("ETag", fmt.Sprintf(`"%s"`, obj.ETag)) + w.WriteHeader(http.StatusOK) +} + +func (s *Server) handleDeleteObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + err := s.svc.DeleteObject(r.Context(), bucket, key) + if err != nil { + s.writeS3Error(w, err) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) handleHeadObject(r *http.Request, w http.ResponseWriter, bucket, key string) { + obj, err := s.svc.HeadObject(r.Context(), bucket, key) + if err != nil { + s.writeS3Error(w, err) + return + } + + ct := obj.ContentType + if ct == "" { + ct = defaultContentType + } + w.Header().Set("Content-Type", ct) + w.Header().Set("Content-Length", strconv.FormatInt(obj.Size, 10)) + w.Header().Set("ETag", fmt.Sprintf(`"%s"`, obj.ETag)) + w.Header().Set("Last-Modified", obj.LastModified.UTC().Format(http.TimeFormat)) + w.WriteHeader(http.StatusOK) +} + +func (s *Server) writeXML(w http.ResponseWriter, data any) { + w.Header().Set("Content-Type", "application/xml") + output, err := xml.Marshal(data) + if err != nil { + s.writeError(w, http.StatusInternalServerError, "InternalError", err.Error()) + return + } + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(output) +} + +func (s *Server) writeS3Error(w http.ResponseWriter, err error) { + switch { + case errors.Is(err, ErrBucketNotFound): + s.writeError(w, http.StatusNotFound, "NoSuchBucket", "The specified bucket does not exist") + case errors.Is(err, ErrBucketNotEmpty): + s.writeError(w, http.StatusConflict, "BucketNotEmpty", "The bucket you tried to delete is not empty") + case errors.Is(err, ErrBucketAlreadyExists): + s.writeError(w, http.StatusConflict, "BucketAlreadyExists", "The requested bucket name is not available") + case errors.Is(err, ErrObjectNotFound): + s.writeError(w, http.StatusNotFound, "NoSuchKey", "The specified key does not exist") + case errors.Is(err, ErrObjectTooLarge): + s.writeError(w, http.StatusRequestEntityTooLarge, "EntityTooLarge", "Your proposed upload exceeds the maximum allowed size") + default: + s.writeError(w, http.StatusInternalServerError, "InternalError", err.Error()) + } +} + +func (s *Server) writeError(w http.ResponseWriter, code int, codeStr, message string) { + type Error struct { + XMLName xml.Name `xml:"Error"` + Code string `xml:"Code"` + Message string `xml:"Message"` + Resource string `xml:"Resource"` + RequestID string `xml:"RequestId"` + } + e := Error{ + Code: codeStr, + Message: message, + RequestID: "apex", + } + w.Header().Set("Content-Type", "application/xml") + w.WriteHeader(code) + output, _ := xml.Marshal(e) + _, _ = w.Write([]byte(xml.Header)) + _, _ = w.Write(output) +} diff --git a/pkg/s3/server_test.go b/pkg/s3/server_test.go new file mode 100644 index 0000000..ea980f6 --- /dev/null +++ b/pkg/s3/server_test.go @@ -0,0 +1,376 @@ +package s3 + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/evstack/apex/pkg/types" + "github.com/rs/zerolog" +) + +func setupHTTPTestServer() (*Server, *mockStore) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + log := zerolog.New(io.Discard) + return NewServer(svc, "us-east-1", "", "", log), store +} + +func setupHTTPTestServerWithAuth(accessKeyID, secretAccessKey string) (*Server, *mockStore) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + log := zerolog.New(io.Discard) + return NewServer(svc, "us-east-1", accessKeyID, secretAccessKey, log), store +} + +func TestServer_ListBuckets(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "bucket1") + _ = store.PutBucket(context.Background(), "bucket2") + + req := httptest.NewRequest(http.MethodGet, "/", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + body := rec.Body.String() + if !strings.Contains(body, "bucket1") || !strings.Contains(body, "bucket2") { + t.Errorf("expected buckets in response, got: %s", body) + } + if !strings.Contains(body, "ListAllMyBucketsResult") { + t.Errorf("expected ListAllMyBucketsResult in response, got: %s", body) + } +} + +func TestServer_CreateBucket(t *testing.T) { + server, _ := setupHTTPTestServer() + + req := httptest.NewRequest(http.MethodPut, "/test-bucket", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + if rec.Header().Get("Location") != "/test-bucket" { + t.Errorf("expected Location header, got: %s", rec.Header().Get("Location")) + } +} + +func TestServer_CreateBucket_AlreadyExists(t *testing.T) { + server, _ := setupHTTPTestServer() + + req := httptest.NewRequest(http.MethodPut, "/test-bucket", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + req2 := httptest.NewRequest(http.MethodPut, "/test-bucket", nil) + rec2 := httptest.NewRecorder() + server.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusConflict { + t.Errorf("expected status 409, got %d", rec2.Code) + } +} + +func TestServer_DeleteBucket(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + req := httptest.NewRequest(http.MethodDelete, "/test-bucket", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNoContent { + t.Errorf("expected status 204, got %d", rec.Code) + } +} + +func TestServer_DeleteBucket_NotEmpty(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + _, _ = store.PutObject(context.Background(), "test-bucket", "key", []byte("data"), "text/plain", 0, nil) + + req := httptest.NewRequest(http.MethodDelete, "/test-bucket", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusConflict { + t.Errorf("expected status 409, got %d", rec.Code) + } +} + +func TestServer_PutGetObject(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + body := []byte("hello world") + req := httptest.NewRequest(http.MethodPut, "/test-bucket/hello.txt", bytes.NewReader(body)) + req.Header.Set("Content-Type", "text/plain") + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + if rec.Header().Get("ETag") == "" { + t.Error("expected ETag header") + } + + req2 := httptest.NewRequest(http.MethodGet, "/test-bucket/hello.txt", nil) + rec2 := httptest.NewRecorder() + server.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec2.Code) + } + if rec2.Body.String() != "hello world" { + t.Errorf("expected body 'hello world', got: %s", rec2.Body.String()) + } + if rec2.Header().Get("Content-Type") != "text/plain" { + t.Errorf("expected Content-Type text/plain, got: %s", rec2.Header().Get("Content-Type")) + } +} + +func TestServer_GetObject_NotFound(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + req := httptest.NewRequest(http.MethodGet, "/test-bucket/nonexistent.txt", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Errorf("expected status 404, got %d", rec.Code) + } +} + +func TestServer_HeadObject(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + _, _ = store.PutObject(context.Background(), "test-bucket", "test.txt", []byte("content"), "text/plain", 0, nil) + + req := httptest.NewRequest(http.MethodHead, "/test-bucket/test.txt", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + if rec.Header().Get("Content-Length") != "7" { + t.Errorf("expected Content-Length 7, got: %s", rec.Header().Get("Content-Length")) + } + if rec.Body.Len() != 0 { + t.Error("expected empty body for HEAD request") + } +} + +func TestServer_DeleteObject(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + _, _ = store.PutObject(context.Background(), "test-bucket", "test.txt", []byte("content"), "text/plain", 0, nil) + + req := httptest.NewRequest(http.MethodDelete, "/test-bucket/test.txt", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNoContent { + t.Errorf("expected status 204, got %d", rec.Code) + } + + req2 := httptest.NewRequest(http.MethodGet, "/test-bucket/test.txt", nil) + rec2 := httptest.NewRecorder() + server.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusNotFound { + t.Errorf("expected status 404 after delete, got %d", rec2.Code) + } +} + +func TestServer_ListObjects(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + _, _ = store.PutObject(context.Background(), "test-bucket", "file1.txt", []byte("a"), "text/plain", 0, nil) + _, _ = store.PutObject(context.Background(), "test-bucket", "file2.txt", []byte("bb"), "text/plain", 0, nil) + + req := httptest.NewRequest(http.MethodGet, "/test-bucket?list-type=2", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + body := rec.Body.String() + if !strings.Contains(body, "ListBucketResult") { + t.Errorf("expected ListBucketResult in response, got: %s", body) + } + if !strings.Contains(body, "file1.txt") || !strings.Contains(body, "file2.txt") { + t.Errorf("expected objects in response, got: %s", body) + } +} + +func TestServer_HeadBucket(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + req := httptest.NewRequest(http.MethodHead, "/test-bucket", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } +} + +func TestServer_HeadBucket_NotFound(t *testing.T) { + server, _ := setupHTTPTestServer() + + req := httptest.NewRequest(http.MethodHead, "/nonexistent-bucket", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Errorf("expected status 404, got %d", rec.Code) + } +} + +func TestServer_ErrorFormat(t *testing.T) { + server, _ := setupHTTPTestServer() + + req := httptest.NewRequest(http.MethodGet, "/nonexistent-bucket/key", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Errorf("expected status 404, got %d", rec.Code) + } + + body := rec.Body.String() + if !strings.Contains(body, "NoSuchBucket") { + t.Errorf("expected error code NoSuchBucket in response, got: %s", body) + } + if !strings.Contains(body, "apex") { + t.Errorf("expected RequestId apex in response, got: %s", body) + } +} + +func TestServer_URLDecoding(t *testing.T) { + server, store := setupHTTPTestServer() + _ = store.PutBucket(context.Background(), "test-bucket") + + // PUT with URL-encoded key + req := httptest.NewRequest(http.MethodPut, "/test-bucket/my%20file.txt", bytes.NewReader([]byte("data"))) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec.Code) + } + + // GET with same URL-encoded key + req2 := httptest.NewRequest(http.MethodGet, "/test-bucket/my%20file.txt", nil) + rec2 := httptest.NewRecorder() + server.ServeHTTP(rec2, req2) + + if rec2.Code != http.StatusOK { + t.Errorf("expected status 200, got %d", rec2.Code) + } + if rec2.Body.String() != "data" { + t.Errorf("expected body 'data', got: %s", rec2.Body.String()) + } +} + +func TestServer_AuthRejectsMissingAuthorization(t *testing.T) { + server, _ := setupHTTPTestServerWithAuth("app-key", "app-secret") + + req := httptest.NewRequest(http.MethodGet, "/", nil) + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusForbidden { + t.Fatalf("expected status 403, got %d", rec.Code) + } + if !strings.Contains(rec.Body.String(), "AccessDenied") { + t.Fatalf("expected AccessDenied error, got: %s", rec.Body.String()) + } +} + +func TestServer_AuthAcceptsSignedRequest(t *testing.T) { + server, store := setupHTTPTestServerWithAuth("app-key", "app-secret") + _ = store.PutBucket(context.Background(), "bucket1") + + req := httptest.NewRequest(http.MethodGet, "/", nil) + signRequest(t, req, nil, "app-key", "app-secret", "us-east-1") + + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("expected status 200, got %d: %s", rec.Code, rec.Body.String()) + } +} + +func TestServer_AuthRejectsWrongSecret(t *testing.T) { + server, store := setupHTTPTestServerWithAuth("app-key", "app-secret") + _ = store.PutBucket(context.Background(), "bucket1") + + req := httptest.NewRequest(http.MethodGet, "/", nil) + signRequest(t, req, nil, "app-key", "wrong-secret", "us-east-1") + + rec := httptest.NewRecorder() + server.ServeHTTP(rec, req) + + if rec.Code != http.StatusForbidden { + t.Fatalf("expected status 403, got %d", rec.Code) + } + if !strings.Contains(rec.Body.String(), "SignatureDoesNotMatch") { + t.Fatalf("expected SignatureDoesNotMatch error, got: %s", rec.Body.String()) + } +} + +func signRequest(t *testing.T, req *http.Request, body []byte, accessKeyID, secretAccessKey, region string) { + t.Helper() + + if body != nil { + req.Body = io.NopCloser(bytes.NewReader(body)) + req.ContentLength = int64(len(body)) + } + + payloadHash := emptyPayloadSHA256 + if body != nil { + sum := sha256.Sum256(body) + payloadHash = hex.EncodeToString(sum[:]) + } + + signer := v4.NewSigner() + err := signer.SignHTTP( + context.Background(), + aws.Credentials{ + AccessKeyID: accessKeyID, + SecretAccessKey: secretAccessKey, + }, + req, + payloadHash, + "s3", + region, + time.Unix(1_700_000_000, 0).UTC(), + ) + if err != nil { + t.Fatalf("SignHTTP: %v", err) + } + + if body != nil { + req.Body = io.NopCloser(bytes.NewReader(body)) + } +} diff --git a/pkg/s3/service.go b/pkg/s3/service.go new file mode 100644 index 0000000..6e0f84d --- /dev/null +++ b/pkg/s3/service.go @@ -0,0 +1,125 @@ +package s3 + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "io" + + "github.com/evstack/apex/pkg/submit" + "github.com/evstack/apex/pkg/types" +) + +var ( + ErrBucketNotFound = errors.New("bucket not found") + ErrBucketNotEmpty = errors.New("bucket not empty") + ErrBucketAlreadyExists = errors.New("bucket already exists") + ErrObjectNotFound = errors.New("object not found") + ErrObjectTooLarge = errors.New("object too large") +) + +// ObjectStore is the persistence interface for S3 buckets and objects. +type ObjectStore interface { + PutBucket(ctx context.Context, name string) error + GetBucket(ctx context.Context, name string) (*Bucket, error) + DeleteBucket(ctx context.Context, name string) error + ListBuckets(ctx context.Context) ([]Bucket, error) + + PutObject(ctx context.Context, bucket, key string, data []byte, contentType string, height uint64, commitments []string) (*Object, error) + GetObject(ctx context.Context, bucket, key string) (*Object, []byte, error) + DeleteObject(ctx context.Context, bucket, key string) error + ListObjects(ctx context.Context, bucket, prefix, delimiter, marker string, maxKeys int) (*ListObjectsResult, error) + HeadObject(ctx context.Context, bucket, key string) (*Object, error) +} + +// Service implements S3 API business logic. +type Service struct { + store ObjectStore + submitter submit.Submitter + namespace types.Namespace +} + +// NewService creates a new S3 service. +func NewService(store ObjectStore, submitter submit.Submitter, namespace types.Namespace) *Service { + return &Service{ + store: store, + submitter: submitter, + namespace: namespace, + } +} + +func (s *Service) CreateBucket(ctx context.Context, name string) error { + return s.store.PutBucket(ctx, name) +} + +func (s *Service) DeleteBucket(ctx context.Context, name string) error { + return s.store.DeleteBucket(ctx, name) +} + +func (s *Service) ListBuckets(ctx context.Context) ([]Bucket, error) { + return s.store.ListBuckets(ctx) +} + +func (s *Service) HeadBucket(ctx context.Context, name string) (*Bucket, error) { + return s.store.GetBucket(ctx, name) +} + +// PutObject stores an object. If a submitter is configured, the blob is +// submitted to Celestia first; the SQLite write only happens on success. +// Empty objects (0 bytes) skip Celestia submission. +func (s *Service) PutObject(ctx context.Context, bucket, key string, r io.Reader, contentType string) (*Object, error) { + data, err := io.ReadAll(r) + if err != nil { + return nil, fmt.Errorf("read object data: %w", err) + } + if len(data) > maxObjectSize { + return nil, ErrObjectTooLarge + } + + var height uint64 + var commitments []string + + // Submit to Celestia first (if submitter configured and data non-empty). + if s.submitter != nil && len(data) > 0 { + blob, err := submit.BuildBlob(s.namespace, data, 0, nil) + if err != nil { + return nil, fmt.Errorf("build blob: %w", err) + } + result, submitErr := s.submitter.Submit(ctx, &submit.Request{ + Blobs: []submit.Blob{blob}, + }) + if submitErr != nil { + return nil, fmt.Errorf("submit to celestia: %w", submitErr) + } + height = result.Height + commitments = []string{hex.EncodeToString(blob.Commitment)} + } + + // Write to store only after successful Celestia submission. + obj, err := s.store.PutObject(ctx, bucket, key, data, contentType, height, commitments) + if err != nil { + return nil, err + } + + return obj, nil +} + +func (s *Service) GetObject(ctx context.Context, bucket, key string) (*Object, []byte, error) { + return s.store.GetObject(ctx, bucket, key) +} + +func (s *Service) DeleteObject(ctx context.Context, bucket, key string) error { + return s.store.DeleteObject(ctx, bucket, key) +} + +func (s *Service) ListObjects(ctx context.Context, bucket, prefix, delimiter, marker string, maxKeys int) (*ListObjectsResult, error) { + if maxKeys <= 0 { + maxKeys = 1000 + } + return s.store.ListObjects(ctx, bucket, prefix, delimiter, marker, maxKeys) +} + +func (s *Service) HeadObject(ctx context.Context, bucket, key string) (*Object, error) { + return s.store.HeadObject(ctx, bucket, key) +} diff --git a/pkg/s3/service_test.go b/pkg/s3/service_test.go new file mode 100644 index 0000000..aee5d94 --- /dev/null +++ b/pkg/s3/service_test.go @@ -0,0 +1,371 @@ +package s3 + +import ( + "bytes" + "context" + "encoding/hex" + "errors" + "testing" + "time" + + gsquare "github.com/celestiaorg/go-square/v3/share" + "github.com/evstack/apex/pkg/submit" + "github.com/evstack/apex/pkg/types" +) + +type mockStore struct { + buckets map[string]*Bucket + objects map[string]map[string]*storedObject +} + +type storedObject struct { + obj *Object + data []byte +} + +func newMockStore() *mockStore { + return &mockStore{ + buckets: make(map[string]*Bucket), + objects: make(map[string]map[string]*storedObject), + } +} + +func (m *mockStore) PutBucket(_ context.Context, name string) error { + if _, exists := m.buckets[name]; exists { + return ErrBucketAlreadyExists + } + now := time.Now() + m.buckets[name] = &Bucket{Name: name, CreatedAt: now, LastModified: now} + m.objects[name] = make(map[string]*storedObject) + return nil +} + +func (m *mockStore) GetBucket(_ context.Context, name string) (*Bucket, error) { + b, ok := m.buckets[name] + if !ok { + return nil, ErrBucketNotFound + } + return b, nil +} + +func (m *mockStore) DeleteBucket(_ context.Context, name string) error { + if _, ok := m.buckets[name]; !ok { + return ErrBucketNotFound + } + if len(m.objects[name]) > 0 { + return ErrBucketNotEmpty + } + delete(m.buckets, name) + delete(m.objects, name) + return nil +} + +func (m *mockStore) ListBuckets(_ context.Context) ([]Bucket, error) { + result := make([]Bucket, 0, len(m.buckets)) + for _, b := range m.buckets { + result = append(result, *b) + } + return result, nil +} + +func (m *mockStore) PutObject(_ context.Context, bucket, key string, data []byte, contentType string, height uint64, commitments []string) (*Object, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + now := time.Now() + obj := &Object{ + Key: key, + Bucket: bucket, + Size: int64(len(data)), + ETag: "etag-" + key, + ContentType: contentType, + LastModified: now, + Height: height, + Commitments: commitments, + } + m.objects[bucket][key] = &storedObject{obj: obj, data: data} + return obj, nil +} + +func (m *mockStore) GetObject(_ context.Context, bucket, key string) (*Object, []byte, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, nil, ErrBucketNotFound + } + stored, ok := m.objects[bucket][key] + if !ok { + return nil, nil, ErrObjectNotFound + } + return stored.obj, stored.data, nil +} + +func (m *mockStore) DeleteObject(_ context.Context, bucket, key string) error { + if _, ok := m.buckets[bucket]; !ok { + return ErrBucketNotFound + } + if _, ok := m.objects[bucket][key]; !ok { + return ErrObjectNotFound + } + delete(m.objects[bucket], key) + return nil +} + +func (m *mockStore) ListObjects(_ context.Context, bucket, prefix, delimiter, _ string, _ int) (*ListObjectsResult, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + result := &ListObjectsResult{Bucket: bucket, Prefix: prefix, Delimiter: delimiter} + for key, stored := range m.objects[bucket] { + result.Objects = append(result.Objects, ObjectInfo{ + Key: key, + LastModified: stored.obj.LastModified, + ETag: stored.obj.ETag, + Size: stored.obj.Size, + StorageClass: "STANDARD", + }) + } + return result, nil +} + +func (m *mockStore) HeadObject(_ context.Context, bucket, key string) (*Object, error) { + if _, ok := m.buckets[bucket]; !ok { + return nil, ErrBucketNotFound + } + stored, ok := m.objects[bucket][key] + if !ok { + return nil, ErrObjectNotFound + } + return stored.obj, nil +} + +type mockSubmitter struct { + calls int + err error + lastReq *submit.Request +} + +func (m *mockSubmitter) Submit(_ context.Context, req *submit.Request) (*submit.Result, error) { + m.calls++ + m.lastReq = req + if m.err != nil { + return nil, m.err + } + return &submit.Result{Height: uint64(100 + m.calls)}, nil +} + +func testNamespace() types.Namespace { + namespace := gsquare.MustNewV0Namespace([]byte("apexs3x")) + var ns types.Namespace + copy(ns[:], namespace.Bytes()) + return ns +} + +func TestService_CreateBucket(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + if err := svc.CreateBucket(ctx, "test-bucket"); err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + err := svc.CreateBucket(ctx, "test-bucket") + if !errors.Is(err, ErrBucketAlreadyExists) { + t.Fatalf("expected ErrBucketAlreadyExists, got: %v", err) + } +} + +func TestService_PutGetObject(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + if err := svc.CreateBucket(ctx, "test-bucket"); err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + data := []byte("hello world") + obj, err := svc.PutObject(ctx, "test-bucket", "test-key", bytes.NewReader(data), "text/plain") + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + if obj.Size != int64(len(data)) { + t.Errorf("expected size %d, got %d", len(data), obj.Size) + } + + gotObj, gotData, err := svc.GetObject(ctx, "test-bucket", "test-key") + if err != nil { + t.Fatalf("GetObject failed: %v", err) + } + if !bytes.Equal(gotData, data) { + t.Errorf("expected data %q, got %q", data, gotData) + } + if gotObj.Key != "test-key" { + t.Errorf("expected key test-key, got %s", gotObj.Key) + } +} + +func TestService_PutObject_WithSubmitter(t *testing.T) { + store := newMockStore() + sub := &mockSubmitter{} + svc := NewService(store, sub, testNamespace()) + + ctx := context.Background() + if err := svc.CreateBucket(ctx, "test-bucket"); err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + data := []byte("celestia blob data") + obj, err := svc.PutObject(ctx, "test-bucket", "key1", bytes.NewReader(data), "application/octet-stream") + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + if sub.calls != 1 { + t.Errorf("expected 1 submit call, got %d", sub.calls) + } + if obj.Height == 0 { + t.Error("expected non-zero height from submission") + } + if sub.lastReq == nil || len(sub.lastReq.Blobs) != 1 { + t.Fatalf("expected a single submitted blob, got %#v", sub.lastReq) + } + if got := sub.lastReq.Blobs[0]; !bytes.Equal(got.Data, data) { + t.Fatalf("submitted data = %q, want %q", got.Data, data) + } + if len(obj.Commitments) != 1 || obj.Commitments[0] != hex.EncodeToString(sub.lastReq.Blobs[0].Commitment) { + t.Fatalf("commitments = %v, want [%s]", obj.Commitments, hex.EncodeToString(sub.lastReq.Blobs[0].Commitment)) + } +} + +func TestService_PutObject_SubmitterFails(t *testing.T) { + store := newMockStore() + failSub := &failingSubmitter{} + svc := NewService(store, failSub, types.Namespace{}) + + ctx := context.Background() + if err := svc.CreateBucket(ctx, "test-bucket"); err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + _, err := svc.PutObject(ctx, "test-bucket", "key1", bytes.NewReader([]byte("data")), "text/plain") + if err == nil { + t.Fatal("expected error when submitter fails") + } + + // Object should NOT be in store (rollback behavior). + _, _, getErr := svc.GetObject(ctx, "test-bucket", "key1") + if !errors.Is(getErr, ErrObjectNotFound) { + t.Fatalf("expected ErrObjectNotFound after failed submission, got: %v", getErr) + } +} + +type failingSubmitter struct{} + +func (f *failingSubmitter) Submit(context.Context, *submit.Request) (*submit.Result, error) { + return nil, errors.New("celestia unavailable") +} + +func TestService_PutObject_EmptySkipsSubmission(t *testing.T) { + store := newMockStore() + sub := &mockSubmitter{} + svc := NewService(store, sub, testNamespace()) + + ctx := context.Background() + if err := svc.CreateBucket(ctx, "test-bucket"); err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + _, err := svc.PutObject(ctx, "test-bucket", "empty", bytes.NewReader([]byte{}), "text/plain") + if err != nil { + t.Fatalf("PutObject failed: %v", err) + } + if sub.calls != 0 { + t.Errorf("expected 0 submit calls for empty object, got %d", sub.calls) + } +} + +func TestService_PutObject_TooLarge(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + if err := svc.CreateBucket(ctx, "test-bucket"); err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + bigData := make([]byte, maxObjectSize+1) + _, err := svc.PutObject(ctx, "test-bucket", "big", bytes.NewReader(bigData), "application/octet-stream") + if !errors.Is(err, ErrObjectTooLarge) { + t.Fatalf("expected ErrObjectTooLarge, got: %v", err) + } +} + +func TestService_DeleteObject(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + if err := svc.CreateBucket(ctx, "test-bucket"); err != nil { + t.Fatalf("CreateBucket failed: %v", err) + } + + if _, err := svc.PutObject(ctx, "test-bucket", "key", bytes.NewReader([]byte("data")), "text/plain"); err != nil { + t.Fatalf("PutObject failed: %v", err) + } + + if err := svc.DeleteObject(ctx, "test-bucket", "key"); err != nil { + t.Fatalf("DeleteObject failed: %v", err) + } + + _, _, err := svc.GetObject(ctx, "test-bucket", "key") + if !errors.Is(err, ErrObjectNotFound) { + t.Fatalf("expected ErrObjectNotFound, got: %v", err) + } +} + +func TestService_ListBuckets(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + buckets, err := svc.ListBuckets(ctx) + if err != nil { + t.Fatalf("ListBuckets failed: %v", err) + } + if len(buckets) != 0 { + t.Errorf("expected 0 buckets, got %d", len(buckets)) + } + + _ = svc.CreateBucket(ctx, "a") + _ = svc.CreateBucket(ctx, "b") + + buckets, err = svc.ListBuckets(ctx) + if err != nil { + t.Fatalf("ListBuckets failed: %v", err) + } + if len(buckets) != 2 { + t.Errorf("expected 2 buckets, got %d", len(buckets)) + } +} + +func TestService_HeadObject(t *testing.T) { + store := newMockStore() + svc := NewService(store, nil, types.Namespace{}) + + ctx := context.Background() + _ = svc.CreateBucket(ctx, "test-bucket") + + _, err := svc.HeadObject(ctx, "test-bucket", "nonexistent") + if !errors.Is(err, ErrObjectNotFound) { + t.Fatalf("expected ErrObjectNotFound, got: %v", err) + } + + _, _ = svc.PutObject(ctx, "test-bucket", "key", bytes.NewReader([]byte("data")), "text/plain") + + obj, err := svc.HeadObject(ctx, "test-bucket", "key") + if err != nil { + t.Fatalf("HeadObject failed: %v", err) + } + if obj.Key != "key" { + t.Errorf("expected key 'key', got %s", obj.Key) + } +} diff --git a/pkg/s3/types.go b/pkg/s3/types.go new file mode 100644 index 0000000..91256b4 --- /dev/null +++ b/pkg/s3/types.go @@ -0,0 +1,52 @@ +package s3 + +import ( + "time" +) + +const ( + // maxObjectSize is the maximum single-PUT object size. + // Capped at Celestia's blob size limit since each object maps to one blob. + maxObjectSize = 2 * 1024 * 1024 // 2MB +) + +// Bucket represents an S3 bucket. +type Bucket struct { + Name string + CreatedAt time.Time + LastModified time.Time +} + +// Object represents an S3 object with optional Celestia anchoring metadata. +type Object struct { + Key string + Bucket string + Size int64 + ETag string // MD5 hash of object content + ContentType string + LastModified time.Time + + Height uint64 // Celestia height where blob was submitted + Namespace string // Namespace used for blob storage + Commitments []string // Celestia blob commitments +} + +// ListObjectsResult is the result of a ListObjects call. +type ListObjectsResult struct { + Bucket string + Prefix string + Delimiter string + IsTruncated bool + NextMarker string + Objects []ObjectInfo + CommonPrefixes []string +} + +// ObjectInfo is a summary of an object for list responses. +type ObjectInfo struct { + Key string + LastModified time.Time + ETag string + Size int64 + StorageClass string +} diff --git a/pkg/store/migrations/004_s3_objects.sql b/pkg/store/migrations/004_s3_objects.sql new file mode 100644 index 0000000..8ee2a2f --- /dev/null +++ b/pkg/store/migrations/004_s3_objects.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS s3_buckets ( + name TEXT PRIMARY KEY, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS s3_objects ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + bucket TEXT NOT NULL, + key TEXT NOT NULL, + size INTEGER NOT NULL, + etag TEXT NOT NULL, + content_type TEXT, + last_modified INTEGER NOT NULL, + height INTEGER NOT NULL DEFAULT 0, + namespace TEXT NOT NULL DEFAULT '', + commitments TEXT NOT NULL DEFAULT '[]', + data BLOB, + UNIQUE(bucket, key), + FOREIGN KEY (bucket) REFERENCES s3_buckets(name) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_s3_objects_bucket ON s3_objects(bucket); +CREATE INDEX IF NOT EXISTS idx_s3_objects_bucket_key ON s3_objects(bucket, key); diff --git a/pkg/store/object.go b/pkg/store/object.go new file mode 100644 index 0000000..1ae759d --- /dev/null +++ b/pkg/store/object.go @@ -0,0 +1,292 @@ +package store + +import ( + "context" + "crypto/md5" //nolint:gosec // MD5 required by S3 protocol for ETag + "database/sql" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "github.com/evstack/apex/pkg/s3" + "github.com/evstack/apex/pkg/types" +) + +// ObjectStore implements s3.ObjectStore using SQLite. +type ObjectStore struct { + writer *sql.DB + reader *sql.DB + ns types.Namespace +} + +// NewObjectStore creates an ObjectStore backed by the given SQLiteStore. +func NewObjectStore(db *SQLiteStore, namespace types.Namespace) *ObjectStore { + return &ObjectStore{ + writer: db.writer, + reader: db.reader, + ns: namespace, + } +} + +func (o *ObjectStore) PutBucket(ctx context.Context, name string) error { + now := time.Now().UnixNano() + _, err := o.writer.ExecContext(ctx, + `INSERT INTO s3_buckets (name, created_at, updated_at) VALUES (?, ?, ?)`, + name, now, now) + if err != nil { + if isSQLiteUniqueConstraint(err) { + return s3.ErrBucketAlreadyExists + } + return fmt.Errorf("insert bucket: %w", err) + } + return nil +} + +func (o *ObjectStore) GetBucket(ctx context.Context, name string) (*s3.Bucket, error) { + var b s3.Bucket + var createdAt, updatedAt int64 + err := o.reader.QueryRowContext(ctx, + `SELECT name, created_at, updated_at FROM s3_buckets WHERE name = ?`, name). + Scan(&b.Name, &createdAt, &updatedAt) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, s3.ErrBucketNotFound + } + return nil, fmt.Errorf("query bucket: %w", err) + } + b.CreatedAt = time.Unix(0, createdAt) + b.LastModified = time.Unix(0, updatedAt) + return &b, nil +} + +func (o *ObjectStore) DeleteBucket(ctx context.Context, name string) error { + var count int + err := o.reader.QueryRowContext(ctx, + `SELECT COUNT(*) FROM s3_objects WHERE bucket = ?`, name).Scan(&count) + if err != nil { + return fmt.Errorf("count objects: %w", err) + } + if count > 0 { + return s3.ErrBucketNotEmpty + } + + result, err := o.writer.ExecContext(ctx, + `DELETE FROM s3_buckets WHERE name = ?`, name) + if err != nil { + return fmt.Errorf("delete bucket: %w", err) + } + affected, _ := result.RowsAffected() + if affected == 0 { + return s3.ErrBucketNotFound + } + return nil +} + +func (o *ObjectStore) ListBuckets(ctx context.Context) ([]s3.Bucket, error) { + rows, err := o.reader.QueryContext(ctx, + `SELECT name, created_at, updated_at FROM s3_buckets ORDER BY name`) + if err != nil { + return nil, fmt.Errorf("query buckets: %w", err) + } + defer func() { _ = rows.Close() }() + + var buckets []s3.Bucket + for rows.Next() { + var b s3.Bucket + var createdAt, updatedAt int64 + if err := rows.Scan(&b.Name, &createdAt, &updatedAt); err != nil { + return nil, fmt.Errorf("scan bucket: %w", err) + } + b.CreatedAt = time.Unix(0, createdAt) + b.LastModified = time.Unix(0, updatedAt) + buckets = append(buckets, b) + } + return buckets, rows.Err() +} + +func (o *ObjectStore) PutObject(ctx context.Context, bucket, key string, data []byte, contentType string, height uint64, commitments []string) (*s3.Object, error) { + if _, err := o.GetBucket(ctx, bucket); err != nil { + return nil, err + } + + etag := computeETag(data) + now := time.Now().UnixNano() + if commitments == nil { + commitments = []string{} + } + commitmentsJSON, _ := json.Marshal(commitments) + + _, err := o.writer.ExecContext(ctx, + `INSERT INTO s3_objects (bucket, key, size, etag, content_type, last_modified, height, namespace, commitments, data) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(bucket, key) DO UPDATE SET + size = excluded.size, + etag = excluded.etag, + content_type = excluded.content_type, + last_modified = excluded.last_modified, + height = excluded.height, + commitments = excluded.commitments, + data = excluded.data`, + bucket, key, len(data), etag, contentType, now, height, o.ns.String(), string(commitmentsJSON), data) + if err != nil { + return nil, fmt.Errorf("insert object: %w", err) + } + + return &s3.Object{ + Key: key, + Bucket: bucket, + Size: int64(len(data)), + ETag: etag, + ContentType: contentType, + LastModified: time.Unix(0, now), + Height: height, + Namespace: o.ns.String(), + Commitments: commitments, + }, nil +} + +func (o *ObjectStore) GetObject(ctx context.Context, bucket, key string) (*s3.Object, []byte, error) { + var obj s3.Object + var lastModified int64 + var data []byte + var commitmentsJSON string + + err := o.reader.QueryRowContext(ctx, + `SELECT key, bucket, size, etag, content_type, last_modified, height, namespace, commitments, data + FROM s3_objects WHERE bucket = ? AND key = ?`, + bucket, key).Scan(&obj.Key, &obj.Bucket, &obj.Size, &obj.ETag, &obj.ContentType, + &lastModified, &obj.Height, &obj.Namespace, &commitmentsJSON, &data) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil, s3.ErrObjectNotFound + } + return nil, nil, fmt.Errorf("query object: %w", err) + } + obj.LastModified = time.Unix(0, lastModified) + if commitmentsJSON != "" && commitmentsJSON != "null" { + _ = json.Unmarshal([]byte(commitmentsJSON), &obj.Commitments) + } + + return &obj, data, nil +} + +func (o *ObjectStore) DeleteObject(ctx context.Context, bucket, key string) error { + result, err := o.writer.ExecContext(ctx, + `DELETE FROM s3_objects WHERE bucket = ? AND key = ?`, bucket, key) + if err != nil { + return fmt.Errorf("delete object: %w", err) + } + affected, _ := result.RowsAffected() + if affected == 0 { + return s3.ErrObjectNotFound + } + return nil +} + +func (o *ObjectStore) ListObjects(ctx context.Context, bucket, prefix, delimiter, marker string, maxKeys int) (*s3.ListObjectsResult, error) { + if _, err := o.GetBucket(ctx, bucket); err != nil { + return nil, err + } + + query := `SELECT key, last_modified, etag, size FROM s3_objects WHERE bucket = ?` + args := []any{bucket} + + if prefix != "" { + query += ` AND key LIKE ?` + args = append(args, prefix+"%") + } + if marker != "" { + query += ` AND key > ?` + args = append(args, marker) + } + + query += ` ORDER BY key LIMIT ?` + args = append(args, maxKeys+1) + + rows, err := o.reader.QueryContext(ctx, query, args...) + if err != nil { + return nil, fmt.Errorf("query objects: %w", err) + } + defer func() { _ = rows.Close() }() + + result := &s3.ListObjectsResult{ + Bucket: bucket, + Prefix: prefix, + Delimiter: delimiter, + } + prefixes := make(map[string]bool) + + count := 0 + for rows.Next() { + if count >= maxKeys { + result.IsTruncated = true + break + } + + var key string + var lastModified int64 + var etag string + var size int64 + if err := rows.Scan(&key, &lastModified, &etag, &size); err != nil { + return nil, fmt.Errorf("scan object: %w", err) + } + + if delimiter != "" { + afterPrefix := strings.TrimPrefix(key, prefix) + if idx := strings.Index(afterPrefix, delimiter); idx >= 0 { + commonPrefix := prefix + afterPrefix[:idx+1] + if !prefixes[commonPrefix] { + prefixes[commonPrefix] = true + result.CommonPrefixes = append(result.CommonPrefixes, commonPrefix) + } + count++ + continue + } + } + + result.Objects = append(result.Objects, s3.ObjectInfo{ + Key: key, + LastModified: time.Unix(0, lastModified), + ETag: etag, + Size: size, + StorageClass: "STANDARD", + }) + count++ + } + + // Set NextMarker when truncated. + if result.IsTruncated && len(result.Objects) > 0 { + result.NextMarker = result.Objects[len(result.Objects)-1].Key + } + + return result, rows.Err() +} + +func (o *ObjectStore) HeadObject(ctx context.Context, bucket, key string) (*s3.Object, error) { + var obj s3.Object + var lastModified int64 + err := o.reader.QueryRowContext(ctx, + `SELECT key, bucket, size, etag, content_type, last_modified FROM s3_objects WHERE bucket = ? AND key = ?`, + bucket, key).Scan(&obj.Key, &obj.Bucket, &obj.Size, &obj.ETag, &obj.ContentType, &lastModified) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, s3.ErrObjectNotFound + } + return nil, fmt.Errorf("query object: %w", err) + } + obj.LastModified = time.Unix(0, lastModified) + return &obj, nil +} + +func isSQLiteUniqueConstraint(err error) bool { + return err != nil && strings.Contains(err.Error(), "UNIQUE constraint failed") +} + +// computeETag returns the MD5 hex digest of data, matching S3's ETag spec. +func computeETag(data []byte) string { + h := md5.Sum(data) //nolint:gosec // MD5 required by S3 protocol + return hex.EncodeToString(h[:]) +} diff --git a/pkg/store/object_test.go b/pkg/store/object_test.go new file mode 100644 index 0000000..6d654f8 --- /dev/null +++ b/pkg/store/object_test.go @@ -0,0 +1,225 @@ +package store + +import ( + "context" + "errors" + "path/filepath" + "testing" + + "github.com/evstack/apex/pkg/s3" + "github.com/evstack/apex/pkg/types" +) + +func openTestObjectStore(t *testing.T) *ObjectStore { + t.Helper() + dbPath := filepath.Join(t.TempDir(), "test.db") + sqliteStore, err := Open(dbPath) + if err != nil { + t.Fatalf("open store: %v", err) + } + t.Cleanup(func() { _ = sqliteStore.Close() }) + return NewObjectStore(sqliteStore, types.Namespace{}) +} + +func TestObjectStore_BucketCRUD(t *testing.T) { + store := openTestObjectStore(t) + ctx := context.Background() + + // Create bucket. + if err := store.PutBucket(ctx, "my-bucket"); err != nil { + t.Fatalf("PutBucket: %v", err) + } + + // Get bucket. + b, err := store.GetBucket(ctx, "my-bucket") + if err != nil { + t.Fatalf("GetBucket: %v", err) + } + if b.Name != "my-bucket" { + t.Errorf("expected name 'my-bucket', got %s", b.Name) + } + + // Duplicate bucket. + err = store.PutBucket(ctx, "my-bucket") + if !errors.Is(err, s3.ErrBucketAlreadyExists) { + t.Fatalf("expected ErrBucketAlreadyExists, got: %v", err) + } + + // List buckets. + buckets, err := store.ListBuckets(ctx) + if err != nil { + t.Fatalf("ListBuckets: %v", err) + } + if len(buckets) != 1 { + t.Fatalf("expected 1 bucket, got %d", len(buckets)) + } + + // Delete bucket. + if err := store.DeleteBucket(ctx, "my-bucket"); err != nil { + t.Fatalf("DeleteBucket: %v", err) + } + + _, err = store.GetBucket(ctx, "my-bucket") + if !errors.Is(err, s3.ErrBucketNotFound) { + t.Fatalf("expected ErrBucketNotFound, got: %v", err) + } +} + +func TestObjectStore_ObjectCRUD(t *testing.T) { + store := openTestObjectStore(t) + ctx := context.Background() + + _ = store.PutBucket(ctx, "bucket") + + // Put object. + data := []byte("hello celestia") + obj, err := store.PutObject(ctx, "bucket", "greeting.txt", data, "text/plain", 42, []string{"abc123"}) + if err != nil { + t.Fatalf("PutObject: %v", err) + } + if obj.Size != int64(len(data)) { + t.Errorf("expected size %d, got %d", len(data), obj.Size) + } + if obj.Height != 42 { + t.Errorf("expected height 42, got %d", obj.Height) + } + + // Get object. + gotObj, gotData, err := store.GetObject(ctx, "bucket", "greeting.txt") + if err != nil { + t.Fatalf("GetObject: %v", err) + } + if string(gotData) != "hello celestia" { + t.Errorf("expected data 'hello celestia', got %q", gotData) + } + if gotObj.ETag == "" { + t.Error("expected non-empty ETag") + } + if len(gotObj.Commitments) != 1 || gotObj.Commitments[0] != "abc123" { + t.Errorf("expected commitments [abc123], got %v", gotObj.Commitments) + } + + // Head object. + headObj, err := store.HeadObject(ctx, "bucket", "greeting.txt") + if err != nil { + t.Fatalf("HeadObject: %v", err) + } + if headObj.Key != "greeting.txt" { + t.Errorf("expected key greeting.txt, got %s", headObj.Key) + } + + // Delete object. + if err := store.DeleteObject(ctx, "bucket", "greeting.txt"); err != nil { + t.Fatalf("DeleteObject: %v", err) + } + + _, _, err = store.GetObject(ctx, "bucket", "greeting.txt") + if !errors.Is(err, s3.ErrObjectNotFound) { + t.Fatalf("expected ErrObjectNotFound, got: %v", err) + } +} + +func TestObjectStore_ObjectUpsert(t *testing.T) { + store := openTestObjectStore(t) + ctx := context.Background() + + _ = store.PutBucket(ctx, "bucket") + + _, err := store.PutObject(ctx, "bucket", "key", []byte("v1"), "text/plain", 0, nil) + if err != nil { + t.Fatalf("PutObject v1: %v", err) + } + + _, err = store.PutObject(ctx, "bucket", "key", []byte("v2-updated"), "text/plain", 100, []string{"commit2"}) + if err != nil { + t.Fatalf("PutObject v2: %v", err) + } + + _, data, err := store.GetObject(ctx, "bucket", "key") + if err != nil { + t.Fatalf("GetObject: %v", err) + } + if string(data) != "v2-updated" { + t.Errorf("expected data 'v2-updated', got %q", data) + } +} + +func TestObjectStore_ListObjects_PrefixAndPagination(t *testing.T) { + store := openTestObjectStore(t) + ctx := context.Background() + + _ = store.PutBucket(ctx, "bucket") + _, _ = store.PutObject(ctx, "bucket", "docs/a.txt", []byte("a"), "text/plain", 0, nil) + _, _ = store.PutObject(ctx, "bucket", "docs/b.txt", []byte("b"), "text/plain", 0, nil) + _, _ = store.PutObject(ctx, "bucket", "images/cat.png", []byte("c"), "image/png", 0, nil) + + // List with prefix. + result, err := store.ListObjects(ctx, "bucket", "docs/", "", "", 10) + if err != nil { + t.Fatalf("ListObjects: %v", err) + } + if len(result.Objects) != 2 { + t.Errorf("expected 2 objects with prefix 'docs/', got %d", len(result.Objects)) + } + + // List with pagination (maxKeys=1). + result, err = store.ListObjects(ctx, "bucket", "", "", "", 1) + if err != nil { + t.Fatalf("ListObjects: %v", err) + } + if len(result.Objects) != 1 { + t.Errorf("expected 1 object, got %d", len(result.Objects)) + } + if !result.IsTruncated { + t.Error("expected IsTruncated=true") + } + if result.NextMarker == "" { + t.Error("expected non-empty NextMarker") + } +} + +func TestObjectStore_ListObjects_Delimiter(t *testing.T) { + store := openTestObjectStore(t) + ctx := context.Background() + + _ = store.PutBucket(ctx, "bucket") + _, _ = store.PutObject(ctx, "bucket", "photos/2024/jan.jpg", []byte("j"), "image/jpeg", 0, nil) + _, _ = store.PutObject(ctx, "bucket", "photos/2024/feb.jpg", []byte("f"), "image/jpeg", 0, nil) + _, _ = store.PutObject(ctx, "bucket", "photos/2025/mar.jpg", []byte("m"), "image/jpeg", 0, nil) + + result, err := store.ListObjects(ctx, "bucket", "photos/", "/", "", 100) + if err != nil { + t.Fatalf("ListObjects: %v", err) + } + + // Should group into common prefixes "photos/2024/" and "photos/2025/". + if len(result.CommonPrefixes) != 2 { + t.Errorf("expected 2 common prefixes, got %d: %v", len(result.CommonPrefixes), result.CommonPrefixes) + } + if len(result.Objects) != 0 { + t.Errorf("expected 0 direct objects, got %d", len(result.Objects)) + } +} + +func TestObjectStore_DeleteBucket_NotEmpty(t *testing.T) { + store := openTestObjectStore(t) + ctx := context.Background() + + _ = store.PutBucket(ctx, "bucket") + _, _ = store.PutObject(ctx, "bucket", "key", []byte("data"), "text/plain", 0, nil) + + err := store.DeleteBucket(ctx, "bucket") + if !errors.Is(err, s3.ErrBucketNotEmpty) { + t.Fatalf("expected ErrBucketNotEmpty, got: %v", err) + } +} + +func TestObjectStore_ETag_IsMD5(t *testing.T) { + data := []byte("hello world") + etag := computeETag(data) + // MD5 of "hello world" = 5eb63bbbe01eeed093cb22bb8f5acdc3 + expected := "5eb63bbbe01eeed093cb22bb8f5acdc3" + if etag != expected { + t.Errorf("expected MD5 ETag %s, got %s", expected, etag) + } +} diff --git a/pkg/store/sqlite.go b/pkg/store/sqlite.go index 852ef68..0485386 100644 --- a/pkg/store/sqlite.go +++ b/pkg/store/sqlite.go @@ -115,6 +115,7 @@ var allMigrations = []migrationStep{ {version: 1, file: "migrations/001_init.sql"}, {version: 2, file: "migrations/002_commitment_index.sql"}, {version: 3, file: "migrations/003_blob_index_unique.sql"}, + {version: 4, file: "migrations/004_s3_objects.sql"}, } func (s *SQLiteStore) migrate() error { diff --git a/pkg/submit/celestia_blob.go b/pkg/submit/celestia_blob.go index 630f1fb..a4b1508 100644 --- a/pkg/submit/celestia_blob.go +++ b/pkg/submit/celestia_blob.go @@ -3,9 +3,39 @@ package submit import ( "fmt" + "github.com/celestiaorg/go-square/merkle" + "github.com/celestiaorg/go-square/v3/inclusion" share "github.com/celestiaorg/go-square/v3/share" + "github.com/evstack/apex/pkg/types" ) +const subtreeRootThreshold = 64 + +// BuildBlob validates a Celestia blob payload, computes its commitment, and +// returns the Apex-owned submission shape. +func BuildBlob(namespace types.Namespace, data []byte, shareVersion uint32, signer []byte) (Blob, error) { + blob := Blob{ + Namespace: namespace, + Data: data, + ShareVersion: shareVersion, + Signer: signer, + Index: -1, + } + + squareBlob, err := convertSquareBlob(blob) + if err != nil { + return Blob{}, err + } + + commitment, err := inclusion.CreateCommitment(squareBlob, merkle.HashFromByteSlices, subtreeRootThreshold) + if err != nil { + return Blob{}, fmt.Errorf("create blob commitment: %w", err) + } + + blob.Commitment = commitment + return blob, nil +} + func convertSquareBlob(blob Blob) (*share.Blob, error) { namespace, err := share.NewNamespaceFromBytes(blob.Namespace[:]) if err != nil { diff --git a/pkg/submit/submit_test.go b/pkg/submit/submit_test.go index cba9299..8dcfb30 100644 --- a/pkg/submit/submit_test.go +++ b/pkg/submit/submit_test.go @@ -189,6 +189,46 @@ func TestMarshalResultRejectsNil(t *testing.T) { } } +func TestBuildBlob(t *testing.T) { + t.Parallel() + + ns := testNamespace(9) + blob, err := BuildBlob(ns, []byte("hello"), 0, nil) + if err != nil { + t.Fatalf("BuildBlob: %v", err) + } + if blob.Namespace != ns { + t.Fatalf("namespace = %x, want %x", blob.Namespace, ns) + } + if blob.ShareVersion != 0 { + t.Fatalf("share version = %d, want 0", blob.ShareVersion) + } + if blob.Index != -1 { + t.Fatalf("index = %d, want -1", blob.Index) + } + if len(blob.Commitment) == 0 { + t.Fatal("expected non-empty commitment") + } +} + +func TestBuildBlobRejectsInvalidShareVersion(t *testing.T) { + t.Parallel() + + _, err := BuildBlob(testNamespace(10), []byte("hello"), uint32(gsquare.MaxShareVersion)+1, nil) + if err == nil { + t.Fatal("expected error for invalid share version") + } +} + +func TestBuildBlobRejectsReservedNamespace(t *testing.T) { + t.Parallel() + + _, err := BuildBlob(reservedNamespace(), []byte("hello"), 0, nil) + if err == nil { + t.Fatal("expected error for reserved namespace") + } +} + func testNamespace(b byte) types.Namespace { namespace := gsquare.MustNewV0Namespace([]byte("apexns" + string([]byte{b}))) var ns types.Namespace