Skip to content

Commit

Permalink
fix TestCDCIterator_RestartOnPosition
Browse files Browse the repository at this point in the history
  • Loading branch information
Guillem committed Oct 13, 2024
1 parent f832ccc commit 010b671
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
6 changes: 5 additions & 1 deletion cdc_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"time"

"cloud.google.com/go/spanner"
database "cloud.google.com/go/spanner/admin/database/apiv1"
Expand Down Expand Up @@ -128,8 +129,11 @@ func (c *cdcIterator) startReader(ctx context.Context, streamID string) {
for _, changeRec := range result.ChangeRecords {
for _, dataChangeRec := range changeRec.DataChangeRecords {
for _, mod := range dataChangeRec.Mods {

Check failure on line 131 in cdc_iterator.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unnecessary leading newline (whitespace)

// add a nanosecond so that the position doesn't include the record itself
start := dataChangeRec.CommitTimestamp.Add(time.Nanosecond)
position := common.CDCPosition{
Start: dataChangeRec.CommitTimestamp,
Start: start,
StreamID: streamID,
}

Expand Down
11 changes: 5 additions & 6 deletions cdc_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,12 @@ func TestCDCIterator_RestartOnPosition(t *testing.T) {

var latestPosition opencdc.Position

{ // read and ack 2 records
testutils.ReadAndAssertInsert(ctx, is, iterator, user1)
rec := testutils.ReadAndAssertInsert(ctx, is, iterator, user2)
teardown()
// read and ack 2 records
testutils.ReadAndAssertInsert(ctx, is, iterator, user1)
rec := testutils.ReadAndAssertInsert(ctx, is, iterator, user2)
teardown()

latestPosition = rec.Position
}
latestPosition = rec.Position

// then, try to read from the second record

Expand Down
2 changes: 2 additions & 0 deletions test/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,14 @@ func isDataEqual(is *is.I, a, b opencdc.Data) {
}

func assertMetadata(is *is.I, metadata opencdc.Metadata) {
is.Helper()
col, err := metadata.GetCollection()
is.NoErr(err)
is.Equal(col, "Singers")
}

func assertKey(is *is.I, rec opencdc.Record, singer Singer) {
is.Helper()
singerID := fmt.Sprint(singer.SingerID)
isDataEqual(is, rec.Key, opencdc.StructuredData{"SingerID": singerID})
}

0 comments on commit 010b671

Please sign in to comment.