Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ jobs:
go-version-file: go.mod
- run: go build -o bin/apex ./cmd/apex

e2e-submission:
name: E2E Submission
e2e:
name: E2E
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
Expand All @@ -55,4 +55,4 @@ jobs:
go.sum
e2e/go.sum
- working-directory: e2e
run: go test -race -count=1 -timeout 20m -run TestSubmissionViaJSONRPC ./...
run: go test -race -count=1 -timeout 20m ./...
83 changes: 65 additions & 18 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"github.com/evstack/apex/pkg/fetch"
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/profile"
apexs3 "github.com/evstack/apex/pkg/s3"
"github.com/evstack/apex/pkg/store"
"github.com/evstack/apex/pkg/submit"
syncer "github.com/evstack/apex/pkg/sync"
Expand Down Expand Up @@ -214,6 +215,45 @@
}
}

func setupS3Server(cfg *config.Config, db store.Store, blobSubmitter submit.Submitter, log zerolog.Logger) (*http.Server, error) {
if !cfg.S3.Enabled {
return nil, nil
}

var ns types.Namespace
if cfg.S3.Namespace != "" {
var err error
ns, err = types.NamespaceFromHex(cfg.S3.Namespace)
if err != nil {
return nil, fmt.Errorf("parse S3 namespace: %w", err)
}
}

sqliteDB, ok := db.(*store.SQLiteStore)
if !ok {
return nil, fmt.Errorf("S3 API requires SQLite store, got %T", db)
}

objStore := store.NewObjectStore(sqliteDB, ns)
s3Svc := apexs3.NewService(objStore, blobSubmitter, ns)
s3Srv := apexs3.NewServer(s3Svc, cfg.S3.Region, log)

httpSrv := &http.Server{
Addr: cfg.S3.ListenAddr,
Handler: s3Srv,
ReadHeaderTimeout: 10 * time.Second,
}

go func() {
log.Info().Str("addr", cfg.S3.ListenAddr).Msg("S3 API server listening")
if err := httpSrv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("S3 API server error")
}
}()

return httpSrv, nil
}

func persistNamespaces(ctx context.Context, db store.Store, namespaces []types.Namespace) error {
for _, ns := range namespaces {
if err := db.PutNamespace(ctx, ns); err != nil {
Expand Down Expand Up @@ -247,7 +287,7 @@
return syncer.WithBackfillSource(dbSrc), func() { _ = dbSrc.Close() }, nil
}

func runIndexer(ctx context.Context, cfg *config.Config) error {

Check failure on line 290 in cmd/apex/main.go

View workflow job for this annotation

GitHub Actions / Lint

cyclomatic complexity 16 of func `runIndexer` is high (> 15) (gocyclo)
// Parse namespaces from config.
namespaces, err := cfg.ParsedNamespaces()
if err != nil {
Expand Down Expand Up @@ -282,11 +322,24 @@
}
defer dataFetcher.Close() //nolint:errcheck

svc, notifier, closeSubmitter, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec)
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return err
}
if blobSubmitter != nil {
defer blobSubmitter.Close() //nolint:errcheck
}

// Setup S3 API server if enabled.
s3Srv, err := setupS3Server(cfg, db, blobSubmitter, log.Logger)
if err != nil {
return fmt.Errorf("setup S3 server: %w", err)
}

svc, notifier, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec, blobSubmitter)
if err != nil {
return err
}
defer closeSubmitter()

// Build and run the sync coordinator with observer hook.
coordOpts, closeBackfill, err := buildCoordinatorOptions(cfg, notifier, rec)
Expand Down Expand Up @@ -342,7 +395,7 @@

err = coord.Run(ctx)

gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv)
gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv, s3Srv)

if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("coordinator: %w", err)
Expand Down Expand Up @@ -385,19 +438,7 @@
return blobSubmitter, nil
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder) (*api.Service, *api.Notifier, func(), error) {
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return nil, nil, nil, err
}

closeSubmitter := func() {}
if blobSubmitter != nil {
closeSubmitter = func() {
_ = blobSubmitter.Close()
}
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder, blobSubmitter submit.Submitter) (*api.Service, *api.Notifier, error) {

Check failure on line 441 in cmd/apex/main.go

View workflow job for this annotation

GitHub Actions / Lint

setupAPIService - result 2 (error) is always nil (unparam)
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)

Expand All @@ -407,7 +448,7 @@
}

svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger, svcOpts...)
return svc, notifier, closeSubmitter, nil
return svc, notifier, nil
}

func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec metrics.Recorder) ([]syncer.Option, func(), error) {
Expand Down Expand Up @@ -436,7 +477,7 @@
return coordOpts, closeBackfill, nil
}

func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server, s3Srv *http.Server) {
stopped := make(chan struct{})
go func() {
grpcSrv.GracefulStop()
Expand All @@ -457,6 +498,12 @@
log.Error().Err(err).Msg("JSON-RPC server shutdown error")
}

if s3Srv != nil {
if err := s3Srv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("S3 API server shutdown error")
}
}

if metricsSrv != nil {
if err := metricsSrv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("metrics server shutdown error")
Expand Down
14 changes: 14 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
Profiling ProfilingConfig `yaml:"profiling"`
Log LogConfig `yaml:"log"`
Submission SubmissionConfig `yaml:"submission"`
S3 S3APIConfig `yaml:"s3"`
}

// DataSourceConfig configures the Celestia data source.
Expand Down Expand Up @@ -94,6 +95,14 @@ type LogConfig struct {
Format string `yaml:"format"`
}

// S3APIConfig configures the S3-compatible API server.
type S3APIConfig struct {
Enabled bool `yaml:"enabled"`
ListenAddr string `yaml:"listen_addr"`
Region string `yaml:"region"`
Namespace string `yaml:"namespace"` // Celestia namespace for S3 objects (hex)
}

// SubmissionConfig contains settings for the future blob submission pipeline.
type SubmissionConfig struct {
Enabled bool `yaml:"enabled"`
Expand Down Expand Up @@ -155,6 +164,11 @@ func DefaultConfig() Config {
Level: "info",
Format: "json",
},
S3: S3APIConfig{
Enabled: false,
ListenAddr: ":8333",
Region: "us-east-1",
},
}
}

Expand Down
44 changes: 44 additions & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ storage:
# endpoint: "" # custom endpoint for MinIO, R2, etc.
# chunk_size: 64 # heights per S3 object

s3:
# Enable the S3-compatible HTTP API backed by SQLite object storage.
enabled: false
# Address for the S3-compatible API server.
listen_addr: ":8333"
# AWS region reported to clients.
region: "us-east-1"
# Namespace used when S3 uploads are submitted to Celestia.
namespace: ""

rpc:
# Address for the JSON-RPC API server (HTTP/WebSocket)
listen_addr: ":8080"
Expand Down Expand Up @@ -290,6 +300,9 @@ func validate(cfg *Config) error {
if err := validateSubmission(&cfg.Submission); err != nil {
return err
}
if err := validateS3API(&cfg.S3, &cfg.Storage, &cfg.Submission); err != nil {
return err
}
if !validLogLevels[cfg.Log.Level] {
return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level)
}
Expand Down Expand Up @@ -381,6 +394,37 @@ func validateSubmission(s *SubmissionConfig) error {
return nil
}

func validateS3API(s3cfg *S3APIConfig, storage *StorageConfig, submission *SubmissionConfig) error {
s3cfg.ListenAddr = strings.TrimSpace(s3cfg.ListenAddr)
s3cfg.Region = strings.TrimSpace(s3cfg.Region)
s3cfg.Namespace = strings.TrimSpace(s3cfg.Namespace)
if s3cfg.Region == "" {
s3cfg.Region = DefaultConfig().S3.Region
}
if !s3cfg.Enabled {
return nil
}
if s3cfg.ListenAddr == "" {
return errors.New("s3.listen_addr is required when s3.enabled is true")
}
if storage.Type != "sqlite" && storage.Type != "" {
return errors.New("s3.enabled requires storage.type to be \"sqlite\"")
}
if s3cfg.Namespace != "" {
ns, err := types.NamespaceFromHex(s3cfg.Namespace)
if err != nil {
return fmt.Errorf("s3.namespace is invalid: %w", err)
}
if err := ns.ValidateForBlob(); err != nil {
return fmt.Errorf("s3.namespace is invalid: %w", err)
}
}
if submission.Enabled && s3cfg.Namespace == "" {
return errors.New("s3.namespace is required when both s3.enabled and submission.enabled are true")
}
return nil
}

func resolveSubmissionSignerKeyPath(s *SubmissionConfig, baseDir string) error {
if !s.Enabled {
return nil
Expand Down
137 changes: 137 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,3 +330,140 @@ log:
t.Fatalf("unexpected error: %v", err)
}
}

func TestGenerateIncludesS3Section(t *testing.T) {
t.Parallel()

path := filepath.Join(t.TempDir(), "config.yaml")
if err := Generate(path); err != nil {
t.Fatalf("Generate: %v", err)
}

data, err := os.ReadFile(path)
if err != nil {
t.Fatalf("ReadFile: %v", err)
}
content := string(data)
if !strings.Contains(content, "\ns3:\n") {
t.Fatalf("generated config missing s3 section:\n%s", content)
}
if !strings.Contains(content, `listen_addr: ":8333"`) {
t.Fatalf("generated config missing s3 listen addr default:\n%s", content)
}
}

func TestLoadRejectsS3EnabledWithNonSQLiteStorage(t *testing.T) {
t.Parallel()

path := filepath.Join(t.TempDir(), "config.yaml")
content := `
data_source:
type: "node"
celestia_node_url: "http://localhost:26658"

storage:
type: "s3"
s3:
bucket: "bucket"
region: "us-east-1"

s3:
enabled: true
listen_addr: ":8333"

log:
level: "info"
format: "json"
`
if err := os.WriteFile(path, []byte(content), 0o644); err != nil {
t.Fatalf("WriteFile: %v", err)
}

_, err := Load(path)
if err == nil {
t.Fatal("expected validation error, got nil")
}
if !strings.Contains(err.Error(), `s3.enabled requires storage.type to be "sqlite"`) {
t.Fatalf("unexpected error: %v", err)
}
}

func TestLoadRejectsS3SubmissionWithoutNamespace(t *testing.T) {
t.Parallel()

path := filepath.Join(t.TempDir(), "config.yaml")
keyPath := filepath.Join(t.TempDir(), "submit.key")
if err := os.WriteFile(keyPath, []byte("00112233"), 0o600); err != nil {
t.Fatalf("WriteFile: %v", err)
}

content := `
data_source:
type: "app"
celestia_app_grpc_addr: "localhost:9090"

storage:
type: "sqlite"
db_path: "apex.db"

submission:
enabled: true
app_grpc_addr: "localhost:9090"
chain_id: "mychain"
signer_key: "` + keyPath + `"

s3:
enabled: true
listen_addr: ":8333"

log:
level: "info"
format: "json"
`
if err := os.WriteFile(path, []byte(content), 0o644); err != nil {
t.Fatalf("WriteFile: %v", err)
}

_, err := Load(path)
if err == nil {
t.Fatal("expected validation error, got nil")
}
if !strings.Contains(err.Error(), "s3.namespace is required") {
t.Fatalf("unexpected error: %v", err)
}
}

func TestLoadRejectsInvalidS3Namespace(t *testing.T) {
t.Parallel()

path := filepath.Join(t.TempDir(), "config.yaml")
content := `
data_source:
type: "node"
celestia_node_url: "http://localhost:26658"

storage:
type: "sqlite"
db_path: "apex.db"

s3:
enabled: true
listen_addr: ":8333"
namespace: "not-a-namespace"

log:
level: "info"
format: "json"
`
if err := os.WriteFile(path, []byte(content), 0o644); err != nil {
t.Fatalf("WriteFile: %v", err)
}

_, err := Load(path)
if err == nil {
t.Fatal("expected validation error, got nil")
}
if !strings.Contains(err.Error(), "s3.namespace is invalid") {
t.Fatalf("unexpected error: %v", err)
}
}
Loading
Loading