Skip to content

Commit

Permalink
fix: config used map[string]type
Browse files Browse the repository at this point in the history
  • Loading branch information
parikshitg committed Oct 18, 2024
1 parent 73a2000 commit 633523f
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 30 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ ElasticSearch source connector allows you to move data from multiple Elasticsear
| `serviceToken` | [v: 7, 8] Service token for authorization; if set, overrides username/password. | `false` | |
| `certificateFingerprint` | [v: 7, 8] SHA256 hex fingerprint given by Elasticsearch on first launch. | `false` | |
| `indexes` | The name of the indexes to read the data from. | `true` | |
| `indexes.*.sortBy` | The sortby field for each index to be used by elasticsearch search api. | `true` | |
| `indexes.*.sortOrder` | The sortOrder (asc or desc) for each index to be used by elasticsearch search api. | `true` | |
| `batchSize` | The number of items to fetch from an index. The minimum value is `1`, maximum value is `10000`. | `false` | `"1000"` |
| `pollingPeriod` | The duration for polling the search api for fetching new records. | `false` | `"5s"` |

Expand Down
14 changes: 8 additions & 6 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,19 @@ type Config struct {
ServiceToken string `json:"serviceToken"`
// SHA256 hex fingerprint given by Elasticsearch on first launch.
CertificateFingerprint string `json:"certificateFingerprint"`
// The name of the indexes to read data from.
Indexes []string `json:"indexes" validate:"required"`
// The sortbyField for each index to be used by elasticsearch search api.
IndexSortFields []string `json:"indexSortFields" validate:"required"`
// The sortOrders for each index to be used by elasticsearch search api.
SortOrders []string `json:"sortOrders" validate:"required"`
// The name of the indexes and sort details to read data from.
Indexes map[string]Sort `json:"indexes" validate:"required"`
// The number of items stored in bulk in the index. The minimum value is `1`, maximum value is `10000`.
BatchSize int `json:"batchSize" default:"1000"`
// This period is used by workers to poll for new data at regular intervals.
PollingPeriod time.Duration `json:"pollingPeriod" default:"5s"`
}
type Sort struct {
// The sortbyField for each index to be used by elasticsearch search api.
SortBy string `json:"sortBy" validate:"required"`
// The sortOrders(asc or desc) for each index to be used by elasticsearch search api.
SortOrder string `json:"sortOrder" validate:"required"`
}

func (c Config) GetHost() string {
return c.Host
Expand Down
19 changes: 5 additions & 14 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 2 additions & 10 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ func (s *Source) Configure(ctx context.Context, cfgRaw config.Config) error {
return err
}

// custom validations
if len(s.config.Indexes) != len(s.config.IndexSortFields) {
return fmt.Errorf("each index should have a respective sort field")
}
if len(s.config.IndexSortFields) != len(s.config.SortOrders) {
return fmt.Errorf("each sortfield should have a respective sort order")
}

return nil
}

Expand Down Expand Up @@ -92,12 +84,12 @@ func (s *Source) Open(ctx context.Context, position opencdc.Position) error {
s.shutdown = make(chan struct{})
s.wg = &sync.WaitGroup{}

for i, index := range s.config.Indexes {
for index, sort := range s.config.Indexes {
s.wg.Add(1)
lastRecordSortID := s.position.IndexPositions[index]

// a new worker for a new index
NewWorker(ctx, s, index, s.config.IndexSortFields[i], s.config.SortOrders[i], lastRecordSortID)
NewWorker(ctx, s, index, sort.SortBy, sort.SortOrder, lastRecordSortID)
}

return nil
Expand Down

0 comments on commit 633523f

Please sign in to comment.