Skip to content

Commit

Permalink
decouple cdc iterator from google credentials env vars
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Oct 15, 2024
1 parent 1e6c1a3 commit 3556a8f
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 34 deletions.
29 changes: 13 additions & 16 deletions cdc_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (

type (
cdcIteratorConfig struct {
tableName string
projectID string
instanceID string
databaseID string
position *common.CDCPosition
client *spanner.Client
tableName string
projectID string
instanceID string
databaseID string
position *common.CDCPosition
client *spanner.Client
adminClient *database.DatabaseAdminClient
endpoint string
}
cdcIterator struct {
reader *changestreams.Reader
Expand All @@ -33,12 +35,6 @@ type (
var _ common.Iterator = new(cdcIterator)

func newCdcIterator(ctx context.Context, config *cdcIteratorConfig) (*cdcIterator, error) {
adminClient, err := database.NewDatabaseAdminClient(ctx)
if err != nil {
return nil, err
}
defer adminClient.Close()

databaseName := fmt.Sprintf(
"projects/%s/instances/%s/databases/%s",
config.projectID, config.instanceID, config.databaseID,
Expand All @@ -58,19 +54,20 @@ func newCdcIterator(ctx context.Context, config *cdcIteratorConfig) (*cdcIterato

if !streamExists {
stmt := fmt.Sprint("CREATE CHANGE STREAM ", streamID, " FOR ", config.tableName)
op, err := adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{
op, err := config.adminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{
Database: databaseName,
Statements: []string{stmt},
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create change stream %s: %w", streamID, err)
}
if err := op.Wait(ctx); err != nil {
return nil, err
return nil, fmt.Errorf("failed to wait for change stream %s creation: %w", streamID, err)
}
}

changestreamsConfig := changestreams.Config{
SpannerClientOptions: common.ClientOptions(config.endpoint),
SpannerClientConfig: spanner.ClientConfig{
SessionPoolConfig: spanner.DefaultSessionPoolConfig,
},
Expand All @@ -85,7 +82,7 @@ func newCdcIterator(ctx context.Context, config *cdcIteratorConfig) (*cdcIterato
changestreamsConfig,
)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create reader for change stream %s: %w", streamID, err)
}

iterator := &cdcIterator{
Expand Down
28 changes: 17 additions & 11 deletions cdc_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (

func testCdcIterator(ctx context.Context, is *is.I) (common.Iterator, func()) {
client := testutils.NewClient(ctx, is)
adminClient := testutils.NewDatabaseAdminClient(ctx, is)

iterator, err := newCdcIterator(ctx, &cdcIteratorConfig{
tableName: "Singers",
projectID: testutils.ProjectID,
instanceID: testutils.InstanceID,
databaseID: testutils.DatabaseID,
client: client,
tableName: "Singers",
projectID: testutils.ProjectID,
instanceID: testutils.InstanceID,
databaseID: testutils.DatabaseID,
client: client,
adminClient: adminClient,
endpoint: testutils.EmulatorHost,
})
is.NoErr(err)

Expand All @@ -30,6 +33,7 @@ func testCdcIteratorAtPosition(
sdkPos opencdc.Position,
) (common.Iterator, func()) {
client := testutils.NewClient(ctx, is)
adminClient := testutils.NewDatabaseAdminClient(ctx, is)

pos, err := common.ParseSDKPosition(sdkPos)
is.NoErr(err)
Expand All @@ -39,12 +43,14 @@ func testCdcIteratorAtPosition(
is.True(pos.SnapshotPosition == nil)

iterator, err := newCdcIterator(ctx, &cdcIteratorConfig{
tableName: "Singers",
projectID: testutils.ProjectID,
instanceID: testutils.InstanceID,
databaseID: testutils.DatabaseID,
client: client,
position: pos.CDCPosition,
tableName: "Singers",
projectID: testutils.ProjectID,
instanceID: testutils.InstanceID,
databaseID: testutils.DatabaseID,
client: client,
adminClient: adminClient,
position: pos.CDCPosition,
endpoint: testutils.EmulatorHost,
})
is.NoErr(err)

Expand Down
33 changes: 31 additions & 2 deletions common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"cloud.google.com/go/spanner"
sdk "github.com/conduitio/conduit-connector-sdk"
database "cloud.google.com/go/spanner/admin/database/apiv1"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
Expand All @@ -24,12 +24,41 @@ func NewClient(ctx context.Context, config NewClientConfig) (*spanner.Client, er
}
if config.Endpoint != "" {
options = append(options, option.WithEndpoint(config.Endpoint))
sdk.Logger(ctx).Info().Msg("using custom endpoint")
}

return spanner.NewClient(ctx, config.DatabaseName, options...)
}

func NewDatabaseAdminClient(ctx context.Context) (*database.DatabaseAdminClient, error) {
return NewDatabaseAdminClientWithEndpoint(ctx, "")
}

func NewDatabaseAdminClientWithEndpoint(
ctx context.Context, endpoint string,
) (*database.DatabaseAdminClient, error) {
options := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication(),
}
if endpoint != "" {
options = append(options, option.WithEndpoint(endpoint))
}

return database.NewDatabaseAdminClient(ctx, options...)
}

func ClientOptions(endpoint string) []option.ClientOption {
options := []option.ClientOption{
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication(),
}
if endpoint != "" {
options = append(options, option.WithEndpoint(endpoint))
}

return options
}

func FormatValue(val any) any {
switch v := val.(type) {
case *time.Time:
Expand Down
1 change: 1 addition & 0 deletions snapshot_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (s *snapshotIterator) Read(ctx context.Context) (rec opencdc.Record, err er

return rec, ErrSnapshotIteratorDone
case data := <-s.dataC:
sdk.Logger(ctx).Trace().Msg("received data from fetcher")
s.acks.Add(1)
return s.buildRecord(data), nil
}
Expand Down
7 changes: 2 additions & 5 deletions test/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,10 @@ func createInstance(ctx context.Context, is *is.I) {
}

func NewDatabaseAdminClient(ctx context.Context, is *is.I) *database.DatabaseAdminClient {
databaseAdminClient, err := database.NewDatabaseAdminClient(ctx,
option.WithEndpoint(EmulatorHost),
option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())),
option.WithoutAuthentication())
client, err := common.NewDatabaseAdminClientWithEndpoint(ctx, EmulatorHost)
is.NoErr(err)

return databaseAdminClient
return client
}

func SetupDatabase(ctx context.Context, is *is.I) {
Expand Down

0 comments on commit 3556a8f

Please sign in to comment.