Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 80 additions & 35 deletions cmd/airbyte-source/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package airbyte_source

import (
"encoding/json"
"errors"
"fmt"
"os"

Expand All @@ -22,32 +23,33 @@ func init() {

func ReadCommand(ch *Helper) *cobra.Command {
readCmd := &cobra.Command{
Use: "read",
Short: "Converts rows from a PlanetScale database into AirbyteRecordMessages",
Run: func(cmd *cobra.Command, args []string) {
Use: "read",
Short: "Converts rows from a PlanetScale database into AirbyteRecordMessages",
SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()

ch.Logger = internal.NewLogger(cmd.OutOrStdout())
if readSourceConfigFilePath == "" {
fmt.Fprintf(cmd.ErrOrStderr(), "Please pass path to a valid source config file via the [%v] argument", "config")
os.Exit(1)
return fmt.Errorf("missing config file path")
}

if readSourceCatalogPath == "" {
fmt.Fprintf(cmd.OutOrStdout(), "Please pass path to a valid source catalog file via the [%v] argument", "config")
os.Exit(1)
return fmt.Errorf("missing catalog file path")
}

psc, err := parseSource(ch.FileReader, readSourceConfigFilePath)
if err != nil {
fmt.Fprintln(cmd.OutOrStdout(), "Please provide path to a valid configuration file")
return
return err
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Ensure database")
if err := ch.EnsureDB(psc); err != nil {
fmt.Fprintln(cmd.OutOrStdout(), "Unable to connect to PlanetScale Database")
return
return err
}

defer func() {
Expand All @@ -60,19 +62,19 @@ func ReadCommand(ch *Helper) *cobra.Command {
cs, err := checkConnectionStatus(ctx, ch.Database, psc)
if err != nil {
ch.Logger.ConnectionStatus(cs)
return
return err
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Reading catalog")
catalog, err := readCatalog(readSourceCatalogPath)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read catalog: %+v", err))
os.Exit(1)
return fmt.Errorf("unable to read catalog: %w", err)
}

if len(catalog.Streams) == 0 {
ch.Logger.Log(internal.LOGLEVEL_ERROR, "Catalog has no streams")
return
return nil
}

state := ""
Expand All @@ -81,7 +83,7 @@ func ReadCommand(ch *Helper) *cobra.Command {
b, err := os.ReadFile(stateFilePath)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read state : %v", err))
os.Exit(1)
return fmt.Errorf("unable to read state: %w", err)
}
state = string(b)
}
Expand All @@ -90,52 +92,71 @@ func ReadCommand(ch *Helper) *cobra.Command {
shards, err := ch.Database.ListShards(ctx, psc)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to list shards : %v", err))
os.Exit(1)
return fmt.Errorf("unable to list shards: %w", err)
}

ch.Logger.Log(internal.LOGLEVEL_INFO, "Reading state")
syncState, err := readState(state, psc, catalog.Streams, shards, ch.Logger)
if err != nil {
ch.Logger.Error(fmt.Sprintf("Unable to read state : %v", err))
os.Exit(1)
return fmt.Errorf("unable to read state: %w", err)
}

var readErr error
for _, configuredStream := range catalog.Streams {
keyspaceOrDatabase := configuredStream.Stream.Namespace
if keyspaceOrDatabase == "" {
keyspaceOrDatabase = psc.Database
}
streamStateKey := keyspaceOrDatabase + ":" + configuredStream.Stream.Name
keyspaceOrDatabase, streamStateKey := streamStateKeyFor(configuredStream.Stream.Namespace, configuredStream.Stream.Name, psc.Database)
streamState, ok := syncState.Streams[streamStateKey]
if !ok {
ch.Logger.Error(fmt.Sprintf("Unable to read state for stream %v", streamStateKey))
os.Exit(1)
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
return fmt.Errorf("unable to read state for stream %v", streamStateKey)
}

ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_STARTED)

streamFailed := false
for shardName, shardState := range streamState.Shards {
var tc *psdbconnectv1alpha1.TableCursor

tc, err = shardState.SerializedCursorToTableCursor(configuredStream)
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Using serialized cursor for stream %s", streamStateKey))
if err != nil {
ch.Logger.Error(fmt.Sprintf("Invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
streamFailed = true
// A bad cursor only affects this shard; keep going so the
// other shards in this stream can still sync.
continue
}

sc, err := ch.Database.Read(ctx, cmd.OutOrStdout(), psc, configuredStream, tc)
// Read can return a cursor reflecting the progress made so far
// alongside an error (e.g. on a server timeout), so persist it
// before handling the error to avoid re-reading already-synced
// data on the next attempt.
if sc != nil {
syncState.Streams[streamStateKey].Shards[shardName] = sc
}
// Checkpoint after every shard so that if we crash mid-stream
// the progress of shards that already completed isn't lost and
// re-read on the next attempt.
ch.Logger.StreamState(keyspaceOrDatabase, configuredStream.Stream.Name, syncState.Streams[streamStateKey])
if err != nil {
ch.Logger.Error(err.Error())
os.Exit(1)
streamFailed = true
// One shard failing shouldn't stop the others from syncing.
continue
}
}

if sc != nil {
// if we get any new state, we assign it here.
// otherwise, the older state is round-tripped back to Airbyte.
syncState.Streams[streamStateKey].Shards[shardName] = sc
}
ch.Logger.State(syncState)
if streamFailed {
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_INCOMPLETE)
readErr = errors.Join(readErr, fmt.Errorf("read failed for stream %v", streamStateKey))
} else {
ch.Logger.StreamStatus(keyspaceOrDatabase, configuredStream.Stream.Name, internal.STREAM_STATUS_COMPLETE)
}
}

return readErr
},
}
readCmd.Flags().StringVar(&readSourceCatalogPath, "catalog", "", "Path to the PlanetScale catalog configuration")
Expand All @@ -148,23 +169,47 @@ type State struct {
Shards map[string]map[string]interface{} `json:"shards"`
}

// streamStateKeyFor resolves the effective namespace for a stream (defaulting
// to the source database when the catalog leaves it empty) and the composite
// key used to look that stream up in the sync state. Keeping this in one place
// avoids the namespace/key logic drifting between the read loop and readState.
func streamStateKeyFor(namespace, streamName, database string) (string, string) {
if namespace == "" {
namespace = database
}
return namespace, namespace + ":" + streamName
}

func readState(state string, psc internal.PlanetScaleSource, streams []internal.ConfiguredStream, shards []string, logger internal.AirbyteLogger) (internal.SyncState, error) {
syncState := internal.SyncState{
Streams: map[string]internal.ShardStates{},
}
if state != "" {
err := json.Unmarshal([]byte(state), &syncState)
if err != nil {
return syncState, err
// Try parsing as Airbyte v2 per-stream state array first. An empty
// array is valid v2 state (no checkpoints yet) and must be treated as
// v2 rather than falling through to the legacy object parser, which
// would fail to unmarshal it; the stream loop below then initializes
// fresh cursors.
var perStreamStates []internal.AirbyteState
if err := json.Unmarshal([]byte(state), &perStreamStates); err == nil && (len(perStreamStates) == 0 || perStreamStates[0].Type == internal.STATE_TYPE_STREAM) {
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Parsing Airbyte v2 per-stream state (%d streams)", len(perStreamStates)))
for _, s := range perStreamStates {
if s.Stream != nil && s.Stream.StreamState != nil {
_, key := streamStateKeyFor(s.Stream.StreamDescriptor.Namespace, s.Stream.StreamDescriptor.Name, psc.Database)
syncState.Streams[key] = *s.Stream.StreamState
}
}
} else {
// Fall back to legacy global state format
err := json.Unmarshal([]byte(state), &syncState)
if err != nil {
return syncState, err
}
}
}

for _, s := range streams {
keyspaceOrDatabase := s.Stream.Namespace
if keyspaceOrDatabase == "" {
keyspaceOrDatabase = psc.Database
}
stateKey := keyspaceOrDatabase + ":" + s.Stream.Name
keyspaceOrDatabase, stateKey := streamStateKeyFor(s.Stream.Namespace, s.Stream.Name, psc.Database)
logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("Syncing stream %s with sync mode %s", s.Stream.Name, s.SyncMode))
ignoreCurrentCursor := !s.IncrementalSyncRequested()

Expand Down
Loading
Loading