Skip to content

Commit

Permalink
fix: linters and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
parikshitg committed Oct 11, 2024
1 parent 3bd6502 commit f19eb7b
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 28 deletions.
3 changes: 2 additions & 1 deletion internal/elasticsearch/v8/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (*
}

// Perform the request
ctx, _ = context.WithTimeout(ctx, 5*time.Second)
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
res, err := req.Do(ctx, c.es)
if err != nil {
return nil, fmt.Errorf("error getting search response: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Config struct {
// SHA256 hex fingerprint given by Elasticsearch on first launch.
CertificateFingerprint string `json:"certificateFingerprint"`
// The name of the indexes to read data from.
Indexes []string `json:"indexes"`
Indexes []string `json:"indexes" validate:"required"`
// The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`.
BatchSize int `json:"batchSize" default:"1000"`
// This period is used by workers to poll for new data at regular intervals.
Expand Down
4 changes: 3 additions & 1 deletion source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions source/position.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ type Position struct {
IndexPositions map[string]int `json:"indexPositions"`
}

// NewPosition initializes a new position when sdk position is nil.
func NewPosition() *Position {
return &Position{IndexPositions: make(map[string]int)}
}

// ParseSDKPosition parses opencdc.Position and returns Position.
func ParseSDKPosition(position opencdc.Position) (*Position, error) {
var pos Position

if position == nil {
return nil, nil
}

if err := json.Unmarshal(position, &pos); err != nil {
return nil, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err)
}
Expand Down
32 changes: 17 additions & 15 deletions source/position_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ import (
"github.com/matryer/is"
)

func TestNewPosition(t *testing.T) {
is := is.New(t)
pos := NewPosition()
is.True(pos != nil)
is.Equal(len(pos.IndexPositions), 0)
}

func TestParseSDKPosition(t *testing.T) {
t.Parallel()

Expand All @@ -31,11 +38,6 @@ func TestParseSDKPosition(t *testing.T) {
wantPos *Position
wantErr error
}{
{
name: "success_position_is_nil",
in: nil,
wantPos: nil,
},
{
name: "failure_unmarshal_error",
in: opencdc.Position("invalid"),
Expand Down Expand Up @@ -75,22 +77,22 @@ func TestMarshal(t *testing.T) {

tests := []struct {
name string
in Position
in *Position
want opencdc.Position
}{
{
name: "successful marshal",
in: Position{IndexPositions: map[string]int{"a": 1, "b": 2}},
in: &Position{IndexPositions: map[string]int{"a": 1, "b": 2}},
want: []byte(`{"indexPositions":{"a":1,"b":2}}`),
},
{
name: "marshal empty map",
in: Position{IndexPositions: map[string]int{}},
in: &Position{IndexPositions: map[string]int{}},
want: []byte(`{"indexPositions":{}}`),
},
{
name: "marshal with nil map",
in: Position{IndexPositions: nil},
in: &Position{IndexPositions: nil},
want: []byte(`{"indexPositions":null}`),
},
}
Expand All @@ -111,24 +113,24 @@ func TestUpdate(t *testing.T) {

tests := []struct {
name string
in Position
in *Position
updateIndex string
updatePos int
want Position
want *Position
}{
{
name: "update existing index",
in: Position{IndexPositions: map[string]int{"a": 1, "b": 2}},
in: &Position{IndexPositions: map[string]int{"a": 1, "b": 2}},
updateIndex: "a",
updatePos: 10,
want: Position{IndexPositions: map[string]int{"a": 10, "b": 2}},
want: &Position{IndexPositions: map[string]int{"a": 10, "b": 2}},
},
{
name: "update new index",
in: Position{IndexPositions: map[string]int{"a": 1}},
in: &Position{IndexPositions: map[string]int{"a": 1}},
updateIndex: "b",
updatePos: 5,
want: Position{IndexPositions: map[string]int{"a": 1, "b": 5}},
want: &Position{IndexPositions: map[string]int{"a": 1, "b": 5}},
},
}
for _, tt := range tests {
Expand Down
8 changes: 2 additions & 6 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error {
var err error

if position == nil {
s.position = &Position{
IndexPositions: make(map[string]int),
}
s.position = NewPosition()
} else {
s.position, err = ParseSDKPosition(position)
if err != nil {
Expand All @@ -93,7 +91,7 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error {

for _, index := range s.config.Indexes {
s.wg.Add(1)
offset, _ := s.position.IndexPositions[index]
offset := s.position.IndexPositions[index]

// a new worker for a new index
NewWorker(ctx, s, index, offset)
Expand Down Expand Up @@ -127,9 +125,7 @@ func (s *Source) Ack(ctx context.Context, position opencdc.Position) error {
// Teardown gracefully shutdown connector.
func (s *Source) Teardown(ctx context.Context) error {
sdk.Logger(ctx).Info().Msg("Tearing down the ElasticSearch Source")

close(s.shutdown)

// wait for goroutines to finish
s.wg.Wait()
// close the read channel for write
Expand Down

0 comments on commit f19eb7b

Please sign in to comment.