diff --git a/README.md b/README.md index e59bc07..718e797 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ ElasticSearch source connector allows you to move data from multiple Elasticsear | `serviceToken` | [v: 7, 8] Service token for authorization; if set, overrides username/password. | `false` | | | `certificateFingerprint` | [v: 7, 8] SHA256 hex fingerprint given by Elasticsearch on first launch. | `false` | | | `indexes` | The name of the indexes to read the data from. | `true` | | +| `indexes.*.sortBy` | The sortby field for each index to be used by elasticsearch search api. | `true` | | +| `indexes.*.sortOrder` | The sortOrder (asc or desc) for each index to be used by elasticsearch search api. | `true` | | | `batchSize` | The number of items to fetch from an index. The minimum value is `1`, maximum value is `10000`. | `false` | `"1000"` | | `pollingPeriod` | The duration for polling the search api for fetching new records. | `false` | `"5s"` | diff --git a/source/config.go b/source/config.go index dc81811..96efb3c 100644 --- a/source/config.go +++ b/source/config.go @@ -39,17 +39,19 @@ type Config struct { ServiceToken string `json:"serviceToken"` // SHA256 hex fingerprint given by Elasticsearch on first launch. CertificateFingerprint string `json:"certificateFingerprint"` - // The name of the indexes to read data from. - Indexes []string `json:"indexes" validate:"required"` - // The sortbyField for each index to be used by elasticsearch search api. - IndexSortFields []string `json:"indexSortFields" validate:"required"` - // The sortOrders for each index to be used by elasticsearch search api. - SortOrders []string `json:"sortOrders" validate:"required"` + // The name of the indexes and sort details to read data from. + Indexes map[string]Sort `json:"indexes" validate:"required"` // The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`. BatchSize int `json:"batchSize" default:"1000"` // This period is used by workers to poll for new data at regular intervals. PollingPeriod time.Duration `json:"pollingPeriod" default:"5s"` } +type Sort struct { + // The sortbyField for each index to be used by elasticsearch search api. + SortBy string `json:"sortBy" validate:"required"` + // The sortOrders(asc or desc) for each index to be used by elasticsearch search api. + SortOrder string `json:"sortOrder" validate:"required"` +} func (c Config) GetHost() string { return c.Host diff --git a/source/paramgen.go b/source/paramgen.go index 0ec8473..fdbed5c 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -13,12 +13,11 @@ const ( ConfigCertificateFingerprint = "certificateFingerprint" ConfigCloudID = "cloudID" ConfigHost = "host" - ConfigIndexSortFields = "indexSortFields" - ConfigIndexes = "indexes" + ConfigIndexesSortBy = "indexes.*.sortBy" + ConfigIndexesSortOrder = "indexes.*.sortOrder" ConfigPassword = "password" ConfigPollingPeriod = "pollingPeriod" ConfigServiceToken = "serviceToken" - ConfigSortOrders = "sortOrders" ConfigUsername = "username" ConfigVersion = "version" ) @@ -57,7 +56,7 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigIndexSortFields: { + ConfigIndexesSortBy: { Default: "", Description: "The sortbyField for each index to be used by elasticsearch search api.", Type: config.ParameterTypeString, @@ -65,9 +64,9 @@ func (Config) Parameters() map[string]config.Parameter { config.ValidationRequired{}, }, }, - ConfigIndexes: { + ConfigIndexesSortOrder: { Default: "", - Description: "The name of the indexes to read data from.", + Description: "The sortOrders(asc or desc) for each index to be used by elasticsearch search api.", Type: config.ParameterTypeString, Validations: []config.Validation{ config.ValidationRequired{}, @@ -91,14 +90,6 @@ func (Config) Parameters() map[string]config.Parameter { Type: config.ParameterTypeString, Validations: []config.Validation{}, }, - ConfigSortOrders: { - Default: "", - Description: "The sortOrders for each index to be used by elasticsearch search api.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, ConfigUsername: { Default: "", Description: "The username for HTTP Basic Authentication.", diff --git a/source/source.go b/source/source.go index 281dd1b..a0ab2d7 100644 --- a/source/source.go +++ b/source/source.go @@ -56,14 +56,6 @@ func (s *Source) Configure(ctx context.Context, cfgRaw config.Config) error { return err } - // custom validations - if len(s.config.Indexes) != len(s.config.IndexSortFields) { - return fmt.Errorf("each index should have a respective sort field") - } - if len(s.config.IndexSortFields) != len(s.config.SortOrders) { - return fmt.Errorf("each sortfield should have a respective sort order") - } - return nil } @@ -92,12 +84,12 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error { s.shutdown = make(chan struct{}) s.wg = &sync.WaitGroup{} - for i, index := range s.config.Indexes { + for index, sort := range s.config.Indexes { s.wg.Add(1) lastRecordSortID := s.position.IndexPositions[index] // a new worker for a new index - NewWorker(ctx, s, index, s.config.IndexSortFields[i], s.config.SortOrders[i], lastRecordSortID) + NewWorker(ctx, s, index, sort.SortBy, sort.SortOrder, lastRecordSortID) } return nil