diff --git a/cdc_iterator.go b/cdc_iterator.go index 18baa3f..86acbe1 100644 --- a/cdc_iterator.go +++ b/cdc_iterator.go @@ -15,12 +15,14 @@ import ( type ( cdcIteratorConfig struct { - tableName string - projectID string - instanceID string - databaseID string - position *common.CDCPosition - client *spanner.Client + tableName string + projectID string + instanceID string + databaseID string + position *common.CDCPosition + client *spanner.Client + adminClient *database.DatabaseAdminClient + endpoint string } cdcIterator struct { reader *changestreams.Reader @@ -33,12 +35,6 @@ type ( var _ common.Iterator = new(cdcIterator) func newCdcIterator(ctx context.Context, config *cdcIteratorConfig) (*cdcIterator, error) { - adminClient, err := database.NewDatabaseAdminClient(ctx) - if err != nil { - return nil, err - } - defer adminClient.Close() - databaseName := fmt.Sprintf( "projects/%s/instances/%s/databases/%s", config.projectID, config.instanceID, config.databaseID, @@ -58,19 +54,20 @@ func newCdcIterator(ctx context.Context, config *cdcIteratorConfig) (*cdcIterato if !streamExists { stmt := fmt.Sprint("CREATE CHANGE STREAM ", streamID, " FOR ", config.tableName) - op, err := adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{ + op, err := config.adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{ Database: databaseName, Statements: []string{stmt}, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create change stream %s: %w", streamID, err) } if err := op.Wait(ctx); err != nil { - return nil, err + return nil, fmt.Errorf("failed to wait for change stream %s creation: %w", streamID, err) } } changestreamsConfig := changestreams.Config{ + SpannerClientOptions: common.ClientOptions(config.endpoint), SpannerClientConfig: spanner.ClientConfig{ SessionPoolConfig: spanner.DefaultSessionPoolConfig, }, @@ -85,7 +82,7 @@ func newCdcIterator(ctx context.Context, config *cdcIteratorConfig) (*cdcIterato changestreamsConfig, ) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create reader for change stream %s: %w", streamID, err) } iterator := &cdcIterator{ diff --git a/cdc_iterator_test.go b/cdc_iterator_test.go index 7cb31a9..0d7314b 100644 --- a/cdc_iterator_test.go +++ b/cdc_iterator_test.go @@ -12,13 +12,16 @@ import ( func testCdcIterator(ctx context.Context, is *is.I) (common.Iterator, func()) { client := testutils.NewClient(ctx, is) + adminClient := testutils.NewDatabaseAdminClient(ctx, is) iterator, err := newCdcIterator(ctx, &cdcIteratorConfig{ - tableName: "Singers", - projectID: testutils.ProjectID, - instanceID: testutils.InstanceID, - databaseID: testutils.DatabaseID, - client: client, + tableName: "Singers", + projectID: testutils.ProjectID, + instanceID: testutils.InstanceID, + databaseID: testutils.DatabaseID, + client: client, + adminClient: adminClient, + endpoint: testutils.EmulatorHost, }) is.NoErr(err) @@ -30,6 +33,7 @@ func testCdcIteratorAtPosition( sdkPos opencdc.Position, ) (common.Iterator, func()) { client := testutils.NewClient(ctx, is) + adminClient := testutils.NewDatabaseAdminClient(ctx, is) pos, err := common.ParseSDKPosition(sdkPos) is.NoErr(err) @@ -39,12 +43,14 @@ func testCdcIteratorAtPosition( is.True(pos.SnapshotPosition == nil) iterator, err := newCdcIterator(ctx, &cdcIteratorConfig{ - tableName: "Singers", - projectID: testutils.ProjectID, - instanceID: testutils.InstanceID, - databaseID: testutils.DatabaseID, - client: client, - position: pos.CDCPosition, + tableName: "Singers", + projectID: testutils.ProjectID, + instanceID: testutils.InstanceID, + databaseID: testutils.DatabaseID, + client: client, + adminClient: adminClient, + position: pos.CDCPosition, + endpoint: testutils.EmulatorHost, }) is.NoErr(err) diff --git a/common/utils.go b/common/utils.go index 767c90e..330b6b0 100644 --- a/common/utils.go +++ b/common/utils.go @@ -6,7 +6,7 @@ import ( "time" "cloud.google.com/go/spanner" - sdk "github.com/conduitio/conduit-connector-sdk" + database "cloud.google.com/go/spanner/admin/database/apiv1" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -24,12 +24,41 @@ func NewClient(ctx context.Context, config NewClientConfig) (*spanner.Client, er } if config.Endpoint != "" { options = append(options, option.WithEndpoint(config.Endpoint)) - sdk.Logger(ctx).Info().Msg("using custom endpoint") } return spanner.NewClient(ctx, config.DatabaseName, options...) } +func NewDatabaseAdminClient(ctx context.Context) (*database.DatabaseAdminClient, error) { + return NewDatabaseAdminClientWithEndpoint(ctx, "") +} + +func NewDatabaseAdminClientWithEndpoint( + ctx context.Context, endpoint string, +) (*database.DatabaseAdminClient, error) { + options := []option.ClientOption{ + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + option.WithoutAuthentication(), + } + if endpoint != "" { + options = append(options, option.WithEndpoint(endpoint)) + } + + return database.NewDatabaseAdminClient(ctx, options...) +} + +func ClientOptions(endpoint string) []option.ClientOption { + options := []option.ClientOption{ + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + option.WithoutAuthentication(), + } + if endpoint != "" { + options = append(options, option.WithEndpoint(endpoint)) + } + + return options +} + func FormatValue(val any) any { switch v := val.(type) { case *time.Time: diff --git a/snapshot_iterator.go b/snapshot_iterator.go index cba8aca..b3d3bce 100644 --- a/snapshot_iterator.go +++ b/snapshot_iterator.go @@ -87,6 +87,7 @@ func (s *snapshotIterator) Read(ctx context.Context) (rec opencdc.Record, err er return rec, ErrSnapshotIteratorDone case data := <-s.dataC: + sdk.Logger(ctx).Trace().Msg("received data from fetcher") s.acks.Add(1) return s.buildRecord(data), nil } diff --git a/test/testutils.go b/test/testutils.go index 8757848..4a41d25 100644 --- a/test/testutils.go +++ b/test/testutils.go @@ -94,13 +94,10 @@ func createInstance(ctx context.Context, is *is.I) { } func NewDatabaseAdminClient(ctx context.Context, is *is.I) *database.DatabaseAdminClient { - databaseAdminClient, err := database.NewDatabaseAdminClient(ctx, - option.WithEndpoint(EmulatorHost), - option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), - option.WithoutAuthentication()) + client, err := common.NewDatabaseAdminClientWithEndpoint(ctx, EmulatorHost) is.NoErr(err) - return databaseAdminClient + return client } func SetupDatabase(ctx context.Context, is *is.I) {