diff --git a/internal/elasticsearch/v8/search.go b/internal/elasticsearch/v8/search.go index f0e003d..7b43ef0 100644 --- a/internal/elasticsearch/v8/search.go +++ b/internal/elasticsearch/v8/search.go @@ -41,7 +41,8 @@ func (c *Client) Search(ctx context.Context, index string, offset, size *int) (* } // Perform the request - ctx, _ = context.WithTimeout(ctx, 5*time.Second) + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() res, err := req.Do(ctx, c.es) if err != nil { return nil, fmt.Errorf("error getting search response: %w", err) diff --git a/source/config.go b/source/config.go index dbedbdc..c79c520 100644 --- a/source/config.go +++ b/source/config.go @@ -40,7 +40,7 @@ type Config struct { // SHA256 hex fingerprint given by Elasticsearch on first launch. CertificateFingerprint string `json:"certificateFingerprint"` // The name of the indexes to read data from. - Indexes []string `json:"indexes"` + Indexes []string `json:"indexes" validate:"required"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. diff --git a/source/paramgen.go b/source/paramgen.go index a00ce89..d9f45dd 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -59,7 +59,9 @@ func (Config) Parameters() map[string]config.Parameter { Default: "", Description: "The name of the indexes to read data from.", Type: config.ParameterTypeString, - Validations: []config.Validation{}, + Validations: []config.Validation{ + config.ValidationRequired{}, + }, }, ConfigPassword: { Default: "", diff --git a/source/position.go b/source/position.go index 6098682..0383fde 100644 --- a/source/position.go +++ b/source/position.go @@ -28,14 +28,15 @@ type Position struct { IndexPositions map[string]int `json:"indexPositions"` } +// NewPosition initializes a new position when sdk position is nil. +func NewPosition() *Position { + return &Position{IndexPositions: make(map[string]int)} +} + // ParseSDKPosition parses opencdc.Position and returns Position. func ParseSDKPosition(position opencdc.Position) (*Position, error) { var pos Position - if position == nil { - return nil, nil - } - if err := json.Unmarshal(position, &pos); err != nil { return nil, fmt.Errorf("unmarshal opencdc.Position into Position: %w", err) } diff --git a/source/position_test.go b/source/position_test.go index 6c7b87b..d0bed6a 100644 --- a/source/position_test.go +++ b/source/position_test.go @@ -22,6 +22,13 @@ import ( "github.com/matryer/is" ) +func TestNewPosition(t *testing.T) { + is := is.New(t) + pos := NewPosition() + is.True(pos != nil) + is.Equal(len(pos.IndexPositions), 0) +} + func TestParseSDKPosition(t *testing.T) { t.Parallel() @@ -31,11 +38,6 @@ func TestParseSDKPosition(t *testing.T) { wantPos *Position wantErr error }{ - { - name: "success_position_is_nil", - in: nil, - wantPos: nil, - }, { name: "failure_unmarshal_error", in: opencdc.Position("invalid"), @@ -75,22 +77,22 @@ func TestMarshal(t *testing.T) { tests := []struct { name string - in Position + in *Position want opencdc.Position }{ { name: "successful marshal", - in: Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, + in: &Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, want: []byte(`{"indexPositions":{"a":1,"b":2}}`), }, { name: "marshal empty map", - in: Position{IndexPositions: map[string]int{}}, + in: &Position{IndexPositions: map[string]int{}}, want: []byte(`{"indexPositions":{}}`), }, { name: "marshal with nil map", - in: Position{IndexPositions: nil}, + in: &Position{IndexPositions: nil}, want: []byte(`{"indexPositions":null}`), }, } @@ -111,24 +113,24 @@ func TestUpdate(t *testing.T) { tests := []struct { name string - in Position + in *Position updateIndex string updatePos int - want Position + want *Position }{ { name: "update existing index", - in: Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, + in: &Position{IndexPositions: map[string]int{"a": 1, "b": 2}}, updateIndex: "a", updatePos: 10, - want: Position{IndexPositions: map[string]int{"a": 10, "b": 2}}, + want: &Position{IndexPositions: map[string]int{"a": 10, "b": 2}}, }, { name: "update new index", - in: Position{IndexPositions: map[string]int{"a": 1}}, + in: &Position{IndexPositions: map[string]int{"a": 1}}, updateIndex: "b", updatePos: 5, - want: Position{IndexPositions: map[string]int{"a": 1, "b": 5}}, + want: &Position{IndexPositions: map[string]int{"a": 1, "b": 5}}, }, } for _, tt := range tests { diff --git a/source/source.go b/source/source.go index 18939d4..c2da321 100644 --- a/source/source.go +++ b/source/source.go @@ -66,9 +66,7 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { var err error if position == nil { - s.position = &Position{ - IndexPositions: make(map[string]int), - } + s.position = NewPosition() } else { s.position, err = ParseSDKPosition(position) if err != nil { @@ -93,7 +91,7 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { for _, index := range s.config.Indexes { s.wg.Add(1) - offset, _ := s.position.IndexPositions[index] + offset := s.position.IndexPositions[index] // a new worker for a new index NewWorker(ctx, s, index, offset) @@ -127,9 +125,7 @@ func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { // Teardown gracefully shutdown connector. func (s *Source) Teardown(ctx context.Context) error { sdk.Logger(ctx).Info().Msg("Tearing down the ElasticSearch Source") - close(s.shutdown) - // wait for goroutines to finish s.wg.Wait() // close the read channel for write