From 1cb1a99baa423f772ba9f14462f547e800cb5d03 Mon Sep 17 00:00:00 2001 From: Gaurav Sahil Date: Thu, 17 Oct 2024 06:51:46 +0530 Subject: [PATCH] fix: refactor index name retrieval --- destination/client_moq_test.go | 80 +++++---- destination/config.go | 26 +-- destination/config_test.go | 91 ++++++++-- destination/destination.go | 33 ++-- destination/destination_test.go | 10 +- internal/elasticsearch/client.go | 6 +- internal/elasticsearch/v5/client.go | 32 +--- internal/elasticsearch/v5/client_test.go | 103 +---------- internal/elasticsearch/v5/config.go | 3 - internal/elasticsearch/v5/config_moq_test.go | 45 ----- internal/elasticsearch/v6/client.go | 32 +--- internal/elasticsearch/v6/client_test.go | 103 +---------- internal/elasticsearch/v6/config.go | 3 - internal/elasticsearch/v6/config_moq_test.go | 45 ----- internal/elasticsearch/v7/client.go | 32 +--- internal/elasticsearch/v7/client_test.go | 109 ++---------- internal/elasticsearch/v7/config.go | 3 - internal/elasticsearch/v7/config_moq_test.go | 45 ----- internal/elasticsearch/v8/client.go | 32 +--- internal/elasticsearch/v8/client_test.go | 174 +------------------ internal/elasticsearch/v8/config.go | 3 - internal/elasticsearch/v8/config_moq_test.go | 45 ----- 22 files changed, 227 insertions(+), 828 deletions(-) diff --git a/destination/client_moq_test.go b/destination/client_moq_test.go index 7d0756c..3434ac0 100644 --- a/destination/client_moq_test.go +++ b/destination/client_moq_test.go @@ -26,13 +26,13 @@ var _ client = &clientMock{} // PingFunc: func(ctx context.Context) error { // panic("mock out the Ping method") // }, -// PrepareCreateOperationFunc: func(item opencdc.Record) (interface{}, interface{}, error) { +// PrepareCreateOperationFunc: func(item opencdc.Record, index string) (interface{}, interface{}, error) { // panic("mock out the PrepareCreateOperation method") // }, -// PrepareDeleteOperationFunc: func(key string, item opencdc.Record) (interface{}, error) { +// PrepareDeleteOperationFunc: func(key string, index string) (interface{}, error) { // panic("mock out the PrepareDeleteOperation method") // }, -// PrepareUpsertOperationFunc: func(key string, item opencdc.Record) (interface{}, interface{}, error) { +// PrepareUpsertOperationFunc: func(key string, item opencdc.Record, index string) (interface{}, interface{}, error) { // panic("mock out the PrepareUpsertOperation method") // }, // } @@ -49,13 +49,13 @@ type clientMock struct { PingFunc func(ctx context.Context) error // PrepareCreateOperationFunc mocks the PrepareCreateOperation method. - PrepareCreateOperationFunc func(item opencdc.Record) (interface{}, interface{}, error) + PrepareCreateOperationFunc func(item opencdc.Record, index string) (interface{}, interface{}, error) // PrepareDeleteOperationFunc mocks the PrepareDeleteOperation method. - PrepareDeleteOperationFunc func(key string, item opencdc.Record) (interface{}, error) + PrepareDeleteOperationFunc func(key string, index string) (interface{}, error) // PrepareUpsertOperationFunc mocks the PrepareUpsertOperation method. - PrepareUpsertOperationFunc func(key string, item opencdc.Record) (interface{}, interface{}, error) + PrepareUpsertOperationFunc func(key string, item opencdc.Record, index string) (interface{}, interface{}, error) // calls tracks calls to the methods. calls struct { @@ -75,13 +75,15 @@ type clientMock struct { PrepareCreateOperation []struct { // Item is the item argument value. Item opencdc.Record + // Index is the index argument value. + Index string } // PrepareDeleteOperation holds details about calls to the PrepareDeleteOperation method. PrepareDeleteOperation []struct { // Key is the key argument value. Key string - // Item is the item argument value. - Item opencdc.Record + // Index is the index argument value. + Index string } // PrepareUpsertOperation holds details about calls to the PrepareUpsertOperation method. PrepareUpsertOperation []struct { @@ -89,6 +91,8 @@ type clientMock struct { Key string // Item is the item argument value. Item opencdc.Record + // Index is the index argument value. + Index string } } lockBulk sync.RWMutex @@ -167,19 +171,21 @@ func (mock *clientMock) PingCalls() []struct { } // PrepareCreateOperation calls PrepareCreateOperationFunc. -func (mock *clientMock) PrepareCreateOperation(item opencdc.Record) (interface{}, interface{}, error) { +func (mock *clientMock) PrepareCreateOperation(item opencdc.Record, index string) (interface{}, interface{}, error) { if mock.PrepareCreateOperationFunc == nil { panic("clientMock.PrepareCreateOperationFunc: method is nil but client.PrepareCreateOperation was just called") } callInfo := struct { - Item opencdc.Record + Item opencdc.Record + Index string }{ - Item: item, + Item: item, + Index: index, } mock.lockPrepareCreateOperation.Lock() mock.calls.PrepareCreateOperation = append(mock.calls.PrepareCreateOperation, callInfo) mock.lockPrepareCreateOperation.Unlock() - return mock.PrepareCreateOperationFunc(item) + return mock.PrepareCreateOperationFunc(item, index) } // PrepareCreateOperationCalls gets all the calls that were made to PrepareCreateOperation. @@ -187,10 +193,12 @@ func (mock *clientMock) PrepareCreateOperation(item opencdc.Record) (interface{} // // len(mockedclient.PrepareCreateOperationCalls()) func (mock *clientMock) PrepareCreateOperationCalls() []struct { - Item opencdc.Record + Item opencdc.Record + Index string } { var calls []struct { - Item opencdc.Record + Item opencdc.Record + Index string } mock.lockPrepareCreateOperation.RLock() calls = mock.calls.PrepareCreateOperation @@ -199,21 +207,21 @@ func (mock *clientMock) PrepareCreateOperationCalls() []struct { } // PrepareDeleteOperation calls PrepareDeleteOperationFunc. -func (mock *clientMock) PrepareDeleteOperation(key string, item opencdc.Record) (interface{}, error) { +func (mock *clientMock) PrepareDeleteOperation(key string, index string) (interface{}, error) { if mock.PrepareDeleteOperationFunc == nil { panic("clientMock.PrepareDeleteOperationFunc: method is nil but client.PrepareDeleteOperation was just called") } callInfo := struct { - Key string - Item opencdc.Record + Key string + Index string }{ - Key: key, - Item: item, + Key: key, + Index: index, } mock.lockPrepareDeleteOperation.Lock() mock.calls.PrepareDeleteOperation = append(mock.calls.PrepareDeleteOperation, callInfo) mock.lockPrepareDeleteOperation.Unlock() - return mock.PrepareDeleteOperationFunc(key, item) + return mock.PrepareDeleteOperationFunc(key, index) } // PrepareDeleteOperationCalls gets all the calls that were made to PrepareDeleteOperation. @@ -221,12 +229,12 @@ func (mock *clientMock) PrepareDeleteOperation(key string, item opencdc.Record) // // len(mockedclient.PrepareDeleteOperationCalls()) func (mock *clientMock) PrepareDeleteOperationCalls() []struct { - Key string - Item opencdc.Record + Key string + Index string } { var calls []struct { - Key string - Item opencdc.Record + Key string + Index string } mock.lockPrepareDeleteOperation.RLock() calls = mock.calls.PrepareDeleteOperation @@ -235,21 +243,23 @@ func (mock *clientMock) PrepareDeleteOperationCalls() []struct { } // PrepareUpsertOperation calls PrepareUpsertOperationFunc. -func (mock *clientMock) PrepareUpsertOperation(key string, item opencdc.Record) (interface{}, interface{}, error) { +func (mock *clientMock) PrepareUpsertOperation(key string, item opencdc.Record, index string) (interface{}, interface{}, error) { if mock.PrepareUpsertOperationFunc == nil { panic("clientMock.PrepareUpsertOperationFunc: method is nil but client.PrepareUpsertOperation was just called") } callInfo := struct { - Key string - Item opencdc.Record + Key string + Item opencdc.Record + Index string }{ - Key: key, - Item: item, + Key: key, + Item: item, + Index: index, } mock.lockPrepareUpsertOperation.Lock() mock.calls.PrepareUpsertOperation = append(mock.calls.PrepareUpsertOperation, callInfo) mock.lockPrepareUpsertOperation.Unlock() - return mock.PrepareUpsertOperationFunc(key, item) + return mock.PrepareUpsertOperationFunc(key, item, index) } // PrepareUpsertOperationCalls gets all the calls that were made to PrepareUpsertOperation. @@ -257,12 +267,14 @@ func (mock *clientMock) PrepareUpsertOperation(key string, item opencdc.Record) // // len(mockedclient.PrepareUpsertOperationCalls()) func (mock *clientMock) PrepareUpsertOperationCalls() []struct { - Key string - Item opencdc.Record + Key string + Item opencdc.Record + Index string } { var calls []struct { - Key string - Item opencdc.Record + Key string + Item opencdc.Record + Index string } mock.lockPrepareUpsertOperation.RLock() calls = mock.calls.PrepareUpsertOperation diff --git a/destination/config.go b/destination/config.go index 4448a47..cd6a6b2 100644 --- a/destination/config.go +++ b/destination/config.go @@ -27,6 +27,8 @@ import ( "github.com/conduitio/conduit-commons/opencdc" ) +type IndexFn func(opencdc.Record) (string, error) + type Config struct { // The version of the Elasticsearch service. One of: 5, 6, 7, 8. Version elasticsearch.Version `json:"version" validate:"required"` @@ -86,27 +88,31 @@ func (c Config) GetType() string { return c.Type } -// GetIndex determines the index for each record individually. +// IndexFunction returns a function that determines the index for each record individually. // The function might be returning a static index name. // If the index is neither static nor a template, an error is returned. -func (c Config) GetIndex(r opencdc.Record) (string, error) { +func (c Config) IndexFunction() (f IndexFn, err error) { // Not a template, i.e. it's a static index name if !strings.Contains(c.Index, "{{") && !strings.Contains(c.Index, "}}") { - return c.Index, nil + return func(_ opencdc.Record) (string, error) { + return c.Index, nil + }, nil } // Try to parse the index t, err := template.New("index").Funcs(sprig.FuncMap()).Parse(c.Index) if err != nil { // The index is not a valid Go template. - return "", fmt.Errorf("index is neither a valid static table nor a valid Go template: %w", err) + return nil, fmt.Errorf("index is neither a valid static index nor a valid Go template: %w", err) } - // The index is a valid template, return index + // The index is a valid template, return IndexFn. var buf bytes.Buffer - buf.Reset() - if err := t.Execute(&buf, r); err != nil { - return "", fmt.Errorf("failed to execute index template: %w", err) - } - return buf.String(), nil + return func(r opencdc.Record) (string, error) { + buf.Reset() + if err := t.Execute(&buf, r); err != nil { + return "", fmt.Errorf("failed to execute index template: %w", err) + } + return buf.String(), nil + }, nil } diff --git a/destination/config_test.go b/destination/config_test.go index ac2a1b9..67e50c1 100644 --- a/destination/config_test.go +++ b/destination/config_test.go @@ -18,6 +18,7 @@ import ( "testing" "github.com/conduitio/conduit-commons/opencdc" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/jaswdr/faker" "github.com/stretchr/testify/require" ) @@ -57,29 +58,81 @@ func TestConfig_Getters(t *testing.T) { require.Equal(t, serviceToken, config.GetServiceToken()) require.Equal(t, certificateFingerprint, config.GetCertificateFingerprint()) require.Equal(t, indexType, config.GetType()) - - record := opencdc.Record{} - index, err := config.GetIndex(record) - require.NoError(t, err) - require.Equal(t, indexName, index) } -func TestConfig_GetIndex_Template(t *testing.T) { - fakerInstance := faker.New() +func TestConfig_IndexFunction(t *testing.T) { + t.Run("static index name", func(t *testing.T) { + config := Config{ + Index: "test-index", + } - indexName := fakerInstance.Lorem().Word() + indexFn, err := config.IndexFunction() + require.NoError(t, err) + require.NotNil(t, indexFn) - config := Config{ - Index: "{{ index .Metadata \"opencdc.collection\" }}", - } + index, err := indexFn(opencdc.Record{}) + require.NoError(t, err) + require.Equal(t, "test-index", index) - record := opencdc.Record{ - Metadata: map[string]string{ - "opencdc.collection": indexName, - }, - } + record := sdk.SourceUtil{}.NewRecordCreate( + nil, + map[string]string{"opencdc.collection": "other-index"}, + nil, + nil, + ) + index, err = indexFn(record) + require.NoError(t, err) + require.Equal(t, "test-index", index) + }) + + t.Run("template with metadata", func(t *testing.T) { + config := Config{ + Index: "{{ index .Metadata \"opencdc.collection\" }}", + } + + indexFn, err := config.IndexFunction() + require.NoError(t, err) + require.NotNil(t, indexFn) + + record := sdk.SourceUtil{}.NewRecordCreate( + nil, + map[string]string{"opencdc.collection": "dynamic-index"}, + nil, + nil, + ) + index, err := indexFn(record) + require.NoError(t, err) + require.Equal(t, "dynamic-index", index) + + record = sdk.SourceUtil{}.NewRecordCreate(nil, nil, nil, nil) + index, err = indexFn(record) + require.NoError(t, err) + require.Equal(t, "", index) + }) + + t.Run("invalid template syntax", func(t *testing.T) { + config := Config{ + Index: "{{ invalid syntax }}", + } + + indexFn, err := config.IndexFunction() + require.Error(t, err) + require.Nil(t, indexFn) + require.Contains(t, err.Error(), "index is neither a valid static index nor a valid Go template") + }) + + t.Run("template with invalid function", func(t *testing.T) { + config := Config{ + Index: "{{ .InvalidFunction }}", + } + + indexFn, err := config.IndexFunction() + require.NoError(t, err) + require.NotNil(t, indexFn) - index, err := config.GetIndex(record) - require.NoError(t, err) - require.Equal(t, indexName, index) + record := sdk.SourceUtil{}.NewRecordCreate(nil, nil, nil, nil) + _, err = indexFn(record) + require.Error(t, err) + require.Contains(t, err.Error(), "failed to execute index template") + }) } diff --git a/destination/destination.go b/destination/destination.go index fbafe8e..a1aa50c 100644 --- a/destination/destination.go +++ b/destination/destination.go @@ -34,7 +34,9 @@ func NewDestination() sdk.Destination { type Destination struct { sdk.UnimplementedDestination - config Config + config Config + getIndexName IndexFn + client client } @@ -55,6 +57,12 @@ func (d *Destination) Configure(ctx context.Context, cfg config.Config) (err err if err != nil { return err } + + d.getIndexName, err = d.config.IndexFunction() + if err != nil { + return fmt.Errorf("invalid index name or index function: %w", err) + } + return } @@ -154,6 +162,11 @@ func (d *Destination) prepareBulkRequestPayload(records []opencdc.Record) (*byte data := &bytes.Buffer{} for _, record := range records { + index, err := d.getIndexName(record) + if err != nil { + return nil, err + } + var key string if record.Key != nil { key = string(record.Key.Bytes()) @@ -165,17 +178,17 @@ func (d *Destination) prepareBulkRequestPayload(records []opencdc.Record) (*byte } switch { case key == "": - if err := d.writeInsertOperation(data, record); err != nil { + if err := d.writeInsertOperation(data, record, index); err != nil { return nil, err } case op == opencdc.OperationSnapshot || op == opencdc.OperationCreate || op == opencdc.OperationUpdate: - if err := d.writeUpsertOperation(key, data, record); err != nil { + if err := d.writeUpsertOperation(key, data, record, index); err != nil { return nil, err } case op == opencdc.OperationDelete: - if err := d.writeDeleteOperation(key, data, record); err != nil { + if err := d.writeDeleteOperation(key, data, index); err != nil { return nil, err } @@ -188,11 +201,11 @@ func (d *Destination) prepareBulkRequestPayload(records []opencdc.Record) (*byte } // writeInsertOperation adds create new Document without ID request into Bulk API request. -func (d *Destination) writeInsertOperation(data *bytes.Buffer, item opencdc.Record) error { +func (d *Destination) writeInsertOperation(data *bytes.Buffer, item opencdc.Record, index string) error { jsonEncoder := json.NewEncoder(data) // Prepare data - metadata, payload, err := d.client.PrepareCreateOperation(item) + metadata, payload, err := d.client.PrepareCreateOperation(item, index) if err != nil { return fmt.Errorf("failed to prepare metadata: %w", err) } @@ -211,11 +224,11 @@ func (d *Destination) writeInsertOperation(data *bytes.Buffer, item opencdc.Reco } // writeUpsertOperation adds upsert a Document with ID request into Bulk API request. -func (d *Destination) writeUpsertOperation(key string, data *bytes.Buffer, item opencdc.Record) error { +func (d *Destination) writeUpsertOperation(key string, data *bytes.Buffer, item opencdc.Record, index string) error { jsonEncoder := json.NewEncoder(data) // Prepare data - metadata, payload, err := d.client.PrepareUpsertOperation(key, item) + metadata, payload, err := d.client.PrepareUpsertOperation(key, item, index) if err != nil { return fmt.Errorf("failed to prepare metadata with key=%s: %w", key, err) } @@ -234,11 +247,11 @@ func (d *Destination) writeUpsertOperation(key string, data *bytes.Buffer, item } // writeDeleteOperation adds delete a Document by ID request into Bulk API request. -func (d *Destination) writeDeleteOperation(key string, data *bytes.Buffer, record opencdc.Record) error { +func (d *Destination) writeDeleteOperation(key string, data *bytes.Buffer, index string) error { jsonEncoder := json.NewEncoder(data) // Prepare data - metadata, err := d.client.PrepareDeleteOperation(key, record) + metadata, err := d.client.PrepareDeleteOperation(key, index) if err != nil { return fmt.Errorf("failed to prepare metadata with key=%s: %w", key, err) } diff --git a/destination/destination_test.go b/destination/destination_test.go index b523d69..0e8d5a3 100644 --- a/destination/destination_test.go +++ b/destination/destination_test.go @@ -29,6 +29,8 @@ import ( "github.com/stretchr/testify/require" ) +const indexName = "someIndexName" + func TestNewDestination(t *testing.T) { t.Run("New Destination can be created", func(t *testing.T) { require.IsType(t, &Destination{}, NewDestination()) @@ -53,6 +55,9 @@ func TestDestination_Write(t *testing.T) { destination := Destination{ config: Config{}, + getIndexName: func(_ opencdc.Record) (string, error) { + return indexName, nil + }, client: &esClientMock, } @@ -72,7 +77,7 @@ func TestDestination_Write(t *testing.T) { ) esClientMock := clientMock{ - PrepareCreateOperationFunc: func(_ opencdc.Record) (interface{}, interface{}, error) { + PrepareCreateOperationFunc: func(_ opencdc.Record, _ string) (interface{}, interface{}, error) { return operationMetadata, operationPayload, nil }, @@ -103,6 +108,9 @@ func TestDestination_Write(t *testing.T) { BulkSize: 1, Retries: 2, }, + getIndexName: func(_ opencdc.Record) (string, error) { + return indexName, nil + }, client: &esClientMock, } diff --git a/internal/elasticsearch/client.go b/internal/elasticsearch/client.go index dfa306a..eafdd9e 100644 --- a/internal/elasticsearch/client.go +++ b/internal/elasticsearch/client.go @@ -32,11 +32,11 @@ type Client interface { Bulk(ctx context.Context, reader io.Reader) (io.ReadCloser, error) // PrepareCreateOperation prepares insert operation definition for Bulk API query. - PrepareCreateOperation(item opencdc.Record) (metadata interface{}, payload interface{}, err error) + PrepareCreateOperation(item opencdc.Record, index string) (metadata interface{}, payload interface{}, err error) // PrepareUpsertOperation prepares upsert operation definition for Bulk API query. - PrepareUpsertOperation(key string, item opencdc.Record) (metadata interface{}, payload interface{}, err error) + PrepareUpsertOperation(key string, item opencdc.Record, index string) (metadata interface{}, payload interface{}, err error) // PrepareDeleteOperation prepares delete operation definition for Bulk API query. - PrepareDeleteOperation(key string, item opencdc.Record) (metadata interface{}, err error) + PrepareDeleteOperation(key string, index string) (metadata interface{}, err error) } diff --git a/internal/elasticsearch/v5/client.go b/internal/elasticsearch/v5/client.go index 1037d65..eb9583a 100644 --- a/internal/elasticsearch/v5/client.go +++ b/internal/elasticsearch/v5/client.go @@ -94,17 +94,11 @@ func (c *Client) Bulk(ctx context.Context, reader io.Reader) (io.ReadCloser, err return result.Body, nil } -func (c *Client) PrepareCreateOperation(item opencdc.Record) (interface{}, interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareCreateOperation(item opencdc.Record, index string) (interface{}, interface{}, error) { // Prepare metadata metadata := bulkRequestActionAndMetadata{ Index: &bulkRequestIndexAction{ - Index: indexName, + Index: index, Type: c.cfg.GetType(), }, } @@ -118,23 +112,19 @@ func (c *Client) PrepareCreateOperation(item opencdc.Record) (interface{}, inter return metadata, bulkRequestCreateSource(payload), nil } -func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record) (interface{}, interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record, index string) (interface{}, interface{}, error) { // Prepare metadata metadata := bulkRequestActionAndMetadata{ Update: &bulkRequestUpdateAction{ ID: key, - Index: indexName, + Index: index, Type: c.cfg.GetType(), }, } // Prepare payload + var err error + payload := bulkRequestUpdateSource{ Doc: nil, DocAsUpsert: true, @@ -148,17 +138,11 @@ func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record) (interf return metadata, payload, nil } -func (c *Client) PrepareDeleteOperation(key string, item opencdc.Record) (interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareDeleteOperation(key string, index string) (interface{}, error) { return bulkRequestActionAndMetadata{ Delete: &bulkRequestDeleteAction{ ID: key, - Index: indexName, + Index: index, Type: c.cfg.GetType(), }, }, nil diff --git a/internal/elasticsearch/v5/client_test.go b/internal/elasticsearch/v5/client_test.go index cf66fc9..af27ba1 100644 --- a/internal/elasticsearch/v5/client_test.go +++ b/internal/elasticsearch/v5/client_test.go @@ -16,7 +16,6 @@ package v5 import ( "encoding/json" - "fmt" "testing" "github.com/conduitio/conduit-commons/opencdc" @@ -53,9 +52,6 @@ func TestClient_PrepareCreateOperation(t *testing.T) { t.Run("Fails when payload could not be prepared", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -69,45 +65,16 @@ func TestClient_PrepareCreateOperation(t *testing.T) { opencdc.StructuredData{ "foo": complex64(1 + 2i), }, - )) + ), indexName) require.Nil(t, metadata) require.Nil(t, payload) require.EqualError(t, err, "json: unsupported type: complex64") }) - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - GetTypeFunc: func() string { - return indexType - }, - }, - } - - metadata, payload, err := client.PrepareCreateOperation(sdk.SourceUtil{}.NewRecordCreate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - )) - - require.Nil(t, metadata) - require.Nil(t, payload) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares create operation", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -121,7 +88,7 @@ func TestClient_PrepareCreateOperation(t *testing.T) { opencdc.StructuredData{ "foo": "bar", }, - )) + ), indexName) require.NoError(t, err) require.NotNil(t, metadata) @@ -145,9 +112,6 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { t.Run("Fails when payload could not be prepared", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -167,6 +131,7 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { "foo": complex64(1 + 2i), }, ), + indexName, ) require.Nil(t, metadata) @@ -174,44 +139,9 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { require.EqualError(t, err, "json: unsupported type: complex64") }) - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - GetTypeFunc: func() string { - return indexType - }, - }, - } - - metadata, payload, err := client.PrepareUpsertOperation( - "key", - sdk.SourceUtil{}.NewRecordUpdate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - opencdc.StructuredData{ - "foo": "baz", - }, - ), - ) - - require.Nil(t, metadata) - require.Nil(t, payload) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares upsert operation", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -231,6 +161,7 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { "foo": "baz", }, ), + indexName, ) require.NoError(t, err) @@ -256,33 +187,9 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { } func TestClient_PrepareDeleteOperation(t *testing.T) { - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - GetTypeFunc: func() string { - return indexType - }, - }, - } - - metadata, err := client.PrepareDeleteOperation( - "key", - sdk.SourceUtil{}.NewRecordDelete(nil, nil, nil, nil), - ) - - require.Nil(t, metadata) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares delete operation", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -291,7 +198,7 @@ func TestClient_PrepareDeleteOperation(t *testing.T) { metadata, err := client.PrepareDeleteOperation( "key", - sdk.SourceUtil{}.NewRecordDelete(nil, nil, nil, nil), + indexName, ) require.NoError(t, err) diff --git a/internal/elasticsearch/v5/config.go b/internal/elasticsearch/v5/config.go index 2a53ee9..181a87c 100644 --- a/internal/elasticsearch/v5/config.go +++ b/internal/elasticsearch/v5/config.go @@ -14,13 +14,10 @@ package v5 -import "github.com/conduitio/conduit-commons/opencdc" - //go:generate moq -out config_moq_test.go . config type config interface { GetHost() string GetUsername() string GetPassword() string GetType() string - GetIndex(r opencdc.Record) (string, error) } diff --git a/internal/elasticsearch/v5/config_moq_test.go b/internal/elasticsearch/v5/config_moq_test.go index bd3692c..d0579eb 100644 --- a/internal/elasticsearch/v5/config_moq_test.go +++ b/internal/elasticsearch/v5/config_moq_test.go @@ -4,7 +4,6 @@ package v5 import ( - "github.com/conduitio/conduit-commons/opencdc" "sync" ) @@ -21,9 +20,6 @@ var _ config = &configMock{} // GetHostFunc: func() string { // panic("mock out the GetHost method") // }, -// GetIndexFunc: func(r opencdc.Record) (string, error) { -// panic("mock out the GetIndex method") -// }, // GetPasswordFunc: func() string { // panic("mock out the GetPassword method") // }, @@ -43,9 +39,6 @@ type configMock struct { // GetHostFunc mocks the GetHost method. GetHostFunc func() string - // GetIndexFunc mocks the GetIndex method. - GetIndexFunc func(r opencdc.Record) (string, error) - // GetPasswordFunc mocks the GetPassword method. GetPasswordFunc func() string @@ -60,11 +53,6 @@ type configMock struct { // GetHost holds details about calls to the GetHost method. GetHost []struct { } - // GetIndex holds details about calls to the GetIndex method. - GetIndex []struct { - // R is the r argument value. - R opencdc.Record - } // GetPassword holds details about calls to the GetPassword method. GetPassword []struct { } @@ -76,7 +64,6 @@ type configMock struct { } } lockGetHost sync.RWMutex - lockGetIndex sync.RWMutex lockGetPassword sync.RWMutex lockGetType sync.RWMutex lockGetUsername sync.RWMutex @@ -109,38 +96,6 @@ func (mock *configMock) GetHostCalls() []struct { return calls } -// GetIndex calls GetIndexFunc. -func (mock *configMock) GetIndex(r opencdc.Record) (string, error) { - if mock.GetIndexFunc == nil { - panic("configMock.GetIndexFunc: method is nil but config.GetIndex was just called") - } - callInfo := struct { - R opencdc.Record - }{ - R: r, - } - mock.lockGetIndex.Lock() - mock.calls.GetIndex = append(mock.calls.GetIndex, callInfo) - mock.lockGetIndex.Unlock() - return mock.GetIndexFunc(r) -} - -// GetIndexCalls gets all the calls that were made to GetIndex. -// Check the length with: -// -// len(mockedconfig.GetIndexCalls()) -func (mock *configMock) GetIndexCalls() []struct { - R opencdc.Record -} { - var calls []struct { - R opencdc.Record - } - mock.lockGetIndex.RLock() - calls = mock.calls.GetIndex - mock.lockGetIndex.RUnlock() - return calls -} - // GetPassword calls GetPasswordFunc. func (mock *configMock) GetPassword() string { if mock.GetPasswordFunc == nil { diff --git a/internal/elasticsearch/v6/client.go b/internal/elasticsearch/v6/client.go index 5987682..f8163fe 100644 --- a/internal/elasticsearch/v6/client.go +++ b/internal/elasticsearch/v6/client.go @@ -96,17 +96,11 @@ func (c *Client) Bulk(ctx context.Context, reader io.Reader) (io.ReadCloser, err return result.Body, nil } -func (c *Client) PrepareCreateOperation(item opencdc.Record) (interface{}, interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareCreateOperation(item opencdc.Record, index string) (interface{}, interface{}, error) { // Prepare metadata metadata := bulkRequestActionAndMetadata{ Index: &bulkRequestIndexAction{ - Index: indexName, + Index: index, Type: c.cfg.GetType(), }, } @@ -120,24 +114,20 @@ func (c *Client) PrepareCreateOperation(item opencdc.Record) (interface{}, inter return metadata, bulkRequestCreateSource(payload), nil } -func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record) (interface{}, interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record, index string) (interface{}, interface{}, error) { // Prepare metadata metadata := bulkRequestActionAndMetadata{ Update: &bulkRequestUpdateAction{ ID: key, - Index: indexName, + Index: index, Type: c.cfg.GetType(), RetryOnConflict: 3, }, } // Prepare payload + var err error + payload := bulkRequestOptionalSource{ Doc: nil, DocAsUpsert: true, @@ -151,17 +141,11 @@ func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record) (interf return metadata, payload, nil } -func (c *Client) PrepareDeleteOperation(key string, item opencdc.Record) (interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareDeleteOperation(key string, index string) (interface{}, error) { return bulkRequestActionAndMetadata{ Delete: &bulkRequestDeleteAction{ ID: key, - Index: indexName, + Index: index, Type: c.cfg.GetType(), }, }, nil diff --git a/internal/elasticsearch/v6/client_test.go b/internal/elasticsearch/v6/client_test.go index bf31581..721ad81 100644 --- a/internal/elasticsearch/v6/client_test.go +++ b/internal/elasticsearch/v6/client_test.go @@ -16,7 +16,6 @@ package v6 import ( "encoding/json" - "fmt" "testing" "github.com/conduitio/conduit-commons/opencdc" @@ -53,9 +52,6 @@ func TestClient_PrepareCreateOperation(t *testing.T) { t.Run("Fails when payload could not be prepared", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -69,45 +65,16 @@ func TestClient_PrepareCreateOperation(t *testing.T) { opencdc.StructuredData{ "foo": complex64(1 + 2i), }, - )) + ), indexName) require.Nil(t, metadata) require.Nil(t, payload) require.EqualError(t, err, "json: unsupported type: complex64") }) - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - GetTypeFunc: func() string { - return indexType - }, - }, - } - - metadata, payload, err := client.PrepareCreateOperation(sdk.SourceUtil{}.NewRecordCreate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - )) - - require.Nil(t, metadata) - require.Nil(t, payload) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares create operation", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -121,7 +88,7 @@ func TestClient_PrepareCreateOperation(t *testing.T) { opencdc.StructuredData{ "foo": "bar", }, - )) + ), indexName) require.NoError(t, err) require.NotNil(t, metadata) @@ -145,9 +112,6 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { t.Run("Fails when payload could not be prepared", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -167,6 +131,7 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { "foo": complex64(1 + 2i), }, ), + indexName, ) require.Nil(t, metadata) @@ -174,44 +139,9 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { require.EqualError(t, err, "json: unsupported type: complex64") }) - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - GetTypeFunc: func() string { - return indexType - }, - }, - } - - metadata, payload, err := client.PrepareUpsertOperation( - "key", - sdk.SourceUtil{}.NewRecordUpdate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - opencdc.StructuredData{ - "foo": "baz", - }, - ), - ) - - require.Nil(t, metadata) - require.Nil(t, payload) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares upsert operation", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -231,6 +161,7 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { "foo": "baz", }, ), + indexName, ) require.NoError(t, err) @@ -257,33 +188,9 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { } func TestClient_PrepareDeleteOperation(t *testing.T) { - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - GetTypeFunc: func() string { - return indexType - }, - }, - } - - metadata, err := client.PrepareDeleteOperation( - "key", - sdk.SourceUtil{}.NewRecordDelete(nil, nil, nil, nil), - ) - - require.Nil(t, metadata) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares delete operation", func(t *testing.T) { client := Client{ cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, GetTypeFunc: func() string { return indexType }, @@ -292,7 +199,7 @@ func TestClient_PrepareDeleteOperation(t *testing.T) { metadata, err := client.PrepareDeleteOperation( "key", - sdk.SourceUtil{}.NewRecordDelete(nil, nil, nil, nil), + indexName, ) require.NoError(t, err) diff --git a/internal/elasticsearch/v6/config.go b/internal/elasticsearch/v6/config.go index 54f7843..4bd1501 100644 --- a/internal/elasticsearch/v6/config.go +++ b/internal/elasticsearch/v6/config.go @@ -14,8 +14,6 @@ package v6 -import "github.com/conduitio/conduit-commons/opencdc" - //go:generate moq -out config_moq_test.go . config type config interface { GetHost() string @@ -24,5 +22,4 @@ type config interface { GetCloudID() string GetAPIKey() string GetType() string - GetIndex(r opencdc.Record) (string, error) } diff --git a/internal/elasticsearch/v6/config_moq_test.go b/internal/elasticsearch/v6/config_moq_test.go index f677d84..f7694d9 100644 --- a/internal/elasticsearch/v6/config_moq_test.go +++ b/internal/elasticsearch/v6/config_moq_test.go @@ -4,7 +4,6 @@ package v6 import ( - "github.com/conduitio/conduit-commons/opencdc" "sync" ) @@ -27,9 +26,6 @@ var _ config = &configMock{} // GetHostFunc: func() string { // panic("mock out the GetHost method") // }, -// GetIndexFunc: func(r opencdc.Record) (string, error) { -// panic("mock out the GetIndex method") -// }, // GetPasswordFunc: func() string { // panic("mock out the GetPassword method") // }, @@ -55,9 +51,6 @@ type configMock struct { // GetHostFunc mocks the GetHost method. GetHostFunc func() string - // GetIndexFunc mocks the GetIndex method. - GetIndexFunc func(r opencdc.Record) (string, error) - // GetPasswordFunc mocks the GetPassword method. GetPasswordFunc func() string @@ -78,11 +71,6 @@ type configMock struct { // GetHost holds details about calls to the GetHost method. GetHost []struct { } - // GetIndex holds details about calls to the GetIndex method. - GetIndex []struct { - // R is the r argument value. - R opencdc.Record - } // GetPassword holds details about calls to the GetPassword method. GetPassword []struct { } @@ -96,7 +84,6 @@ type configMock struct { lockGetAPIKey sync.RWMutex lockGetCloudID sync.RWMutex lockGetHost sync.RWMutex - lockGetIndex sync.RWMutex lockGetPassword sync.RWMutex lockGetType sync.RWMutex lockGetUsername sync.RWMutex @@ -183,38 +170,6 @@ func (mock *configMock) GetHostCalls() []struct { return calls } -// GetIndex calls GetIndexFunc. -func (mock *configMock) GetIndex(r opencdc.Record) (string, error) { - if mock.GetIndexFunc == nil { - panic("configMock.GetIndexFunc: method is nil but config.GetIndex was just called") - } - callInfo := struct { - R opencdc.Record - }{ - R: r, - } - mock.lockGetIndex.Lock() - mock.calls.GetIndex = append(mock.calls.GetIndex, callInfo) - mock.lockGetIndex.Unlock() - return mock.GetIndexFunc(r) -} - -// GetIndexCalls gets all the calls that were made to GetIndex. -// Check the length with: -// -// len(mockedconfig.GetIndexCalls()) -func (mock *configMock) GetIndexCalls() []struct { - R opencdc.Record -} { - var calls []struct { - R opencdc.Record - } - mock.lockGetIndex.RLock() - calls = mock.calls.GetIndex - mock.lockGetIndex.RUnlock() - return calls -} - // GetPassword calls GetPasswordFunc. func (mock *configMock) GetPassword() string { if mock.GetPasswordFunc == nil { diff --git a/internal/elasticsearch/v7/client.go b/internal/elasticsearch/v7/client.go index a071b91..98e475a 100644 --- a/internal/elasticsearch/v7/client.go +++ b/internal/elasticsearch/v7/client.go @@ -98,17 +98,11 @@ func (c *Client) Bulk(ctx context.Context, reader io.Reader) (io.ReadCloser, err return result.Body, nil } -func (c *Client) PrepareCreateOperation(item opencdc.Record) (interface{}, interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareCreateOperation(item opencdc.Record, index string) (interface{}, interface{}, error) { // Prepare metadata metadata := bulkRequestActionAndMetadata{ Create: &bulkRequestCreateAction{ - Index: indexName, + Index: index, }, } @@ -121,23 +115,19 @@ func (c *Client) PrepareCreateOperation(item opencdc.Record) (interface{}, inter return metadata, bulkRequestCreateSource(payload), nil } -func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record) (interface{}, interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record, index string) (interface{}, interface{}, error) { // Prepare metadata metadata := bulkRequestActionAndMetadata{ Update: &bulkRequestUpdateAction{ ID: key, - Index: indexName, + Index: index, RetryOnConflict: 3, }, } // Prepare payload + var err error + payload := bulkRequestOptionalSource{ Doc: nil, DocAsUpsert: true, @@ -151,17 +141,11 @@ func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record) (interf return metadata, payload, nil } -func (c *Client) PrepareDeleteOperation(key string, item opencdc.Record) (interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareDeleteOperation(key string, index string) (interface{}, error) { return bulkRequestActionAndMetadata{ Delete: &bulkRequestDeleteAction{ ID: key, - Index: indexName, + Index: index, }, }, nil } diff --git a/internal/elasticsearch/v7/client_test.go b/internal/elasticsearch/v7/client_test.go index 384a4e3..41237fb 100644 --- a/internal/elasticsearch/v7/client_test.go +++ b/internal/elasticsearch/v7/client_test.go @@ -16,7 +16,6 @@ package v7 import ( "encoding/json" - "fmt" "testing" "github.com/conduitio/conduit-commons/opencdc" @@ -49,11 +48,7 @@ func TestClient_GetClient(t *testing.T) { func TestClient_PrepareCreateOperation(t *testing.T) { t.Run("Fails when payload could not be prepared", func(t *testing.T) { client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, + cfg: &configMock{}, } metadata, payload, err := client.PrepareCreateOperation(sdk.SourceUtil{}.NewRecordCreate( @@ -63,43 +58,16 @@ func TestClient_PrepareCreateOperation(t *testing.T) { opencdc.StructuredData{ "foo": complex64(1 + 2i), }, - )) + ), indexName) require.Nil(t, metadata) require.Nil(t, payload) require.EqualError(t, err, "json: unsupported type: complex64") }) - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - }, - } - - metadata, payload, err := client.PrepareCreateOperation(sdk.SourceUtil{}.NewRecordCreate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - )) - - require.Nil(t, metadata) - require.Nil(t, payload) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares create operation", func(t *testing.T) { client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, + cfg: &configMock{}, } metadata, payload, err := client.PrepareCreateOperation(sdk.SourceUtil{}.NewRecordCreate( @@ -109,7 +77,7 @@ func TestClient_PrepareCreateOperation(t *testing.T) { opencdc.StructuredData{ "foo": "bar", }, - )) + ), indexName) require.NoError(t, err) require.NotNil(t, metadata) @@ -131,11 +99,7 @@ func TestClient_PrepareCreateOperation(t *testing.T) { func TestClient_PrepareUpsertOperation(t *testing.T) { t.Run("Fails when payload could not be prepared", func(t *testing.T) { client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, + cfg: &configMock{}, } metadata, payload, err := client.PrepareUpsertOperation( @@ -151,6 +115,7 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { "foo": complex64(1 + 2i), }, ), + indexName, ) require.Nil(t, metadata) @@ -158,42 +123,9 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { require.EqualError(t, err, "json: unsupported type: complex64") }) - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - }, - } - - metadata, payload, err := client.PrepareUpsertOperation( - "key", - sdk.SourceUtil{}.NewRecordUpdate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - opencdc.StructuredData{ - "foo": "baz", - }, - ), - ) - - require.Nil(t, metadata) - require.Nil(t, payload) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares upsert operation", func(t *testing.T) { client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, + cfg: &configMock{}, } metadata, payload, err := client.PrepareUpsertOperation( @@ -209,6 +141,7 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { "foo": "baz", }, ), + indexName, ) require.NoError(t, err) @@ -234,36 +167,14 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { } func TestClient_PrepareDeleteOperation(t *testing.T) { - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - }, - } - - metadata, err := client.PrepareDeleteOperation( - "key", - sdk.SourceUtil{}.NewRecordDelete(nil, nil, nil, nil), - ) - - require.Nil(t, metadata) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares delete operation", func(t *testing.T) { client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, + cfg: &configMock{}, } metadata, err := client.PrepareDeleteOperation( "key", - sdk.SourceUtil{}.NewRecordDelete(nil, nil, nil, nil), + indexName, ) require.NoError(t, err) diff --git a/internal/elasticsearch/v7/config.go b/internal/elasticsearch/v7/config.go index 7d96bc7..12a6e64 100644 --- a/internal/elasticsearch/v7/config.go +++ b/internal/elasticsearch/v7/config.go @@ -14,8 +14,6 @@ package v7 -import "github.com/conduitio/conduit-commons/opencdc" - //go:generate moq -out config_moq_test.go . config type config interface { GetHost() string @@ -25,5 +23,4 @@ type config interface { GetAPIKey() string GetServiceToken() string GetCertificateFingerprint() string - GetIndex(r opencdc.Record) (string, error) } diff --git a/internal/elasticsearch/v7/config_moq_test.go b/internal/elasticsearch/v7/config_moq_test.go index 9d2930a..167840b 100644 --- a/internal/elasticsearch/v7/config_moq_test.go +++ b/internal/elasticsearch/v7/config_moq_test.go @@ -4,7 +4,6 @@ package v7 import ( - "github.com/conduitio/conduit-commons/opencdc" "sync" ) @@ -30,9 +29,6 @@ var _ config = &configMock{} // GetHostFunc: func() string { // panic("mock out the GetHost method") // }, -// GetIndexFunc: func(r opencdc.Record) (string, error) { -// panic("mock out the GetIndex method") -// }, // GetPasswordFunc: func() string { // panic("mock out the GetPassword method") // }, @@ -61,9 +57,6 @@ type configMock struct { // GetHostFunc mocks the GetHost method. GetHostFunc func() string - // GetIndexFunc mocks the GetIndex method. - GetIndexFunc func(r opencdc.Record) (string, error) - // GetPasswordFunc mocks the GetPassword method. GetPasswordFunc func() string @@ -87,11 +80,6 @@ type configMock struct { // GetHost holds details about calls to the GetHost method. GetHost []struct { } - // GetIndex holds details about calls to the GetIndex method. - GetIndex []struct { - // R is the r argument value. - R opencdc.Record - } // GetPassword holds details about calls to the GetPassword method. GetPassword []struct { } @@ -106,7 +94,6 @@ type configMock struct { lockGetCertificateFingerprint sync.RWMutex lockGetCloudID sync.RWMutex lockGetHost sync.RWMutex - lockGetIndex sync.RWMutex lockGetPassword sync.RWMutex lockGetServiceToken sync.RWMutex lockGetUsername sync.RWMutex @@ -220,38 +207,6 @@ func (mock *configMock) GetHostCalls() []struct { return calls } -// GetIndex calls GetIndexFunc. -func (mock *configMock) GetIndex(r opencdc.Record) (string, error) { - if mock.GetIndexFunc == nil { - panic("configMock.GetIndexFunc: method is nil but config.GetIndex was just called") - } - callInfo := struct { - R opencdc.Record - }{ - R: r, - } - mock.lockGetIndex.Lock() - mock.calls.GetIndex = append(mock.calls.GetIndex, callInfo) - mock.lockGetIndex.Unlock() - return mock.GetIndexFunc(r) -} - -// GetIndexCalls gets all the calls that were made to GetIndex. -// Check the length with: -// -// len(mockedconfig.GetIndexCalls()) -func (mock *configMock) GetIndexCalls() []struct { - R opencdc.Record -} { - var calls []struct { - R opencdc.Record - } - mock.lockGetIndex.RLock() - calls = mock.calls.GetIndex - mock.lockGetIndex.RUnlock() - return calls -} - // GetPassword calls GetPasswordFunc. func (mock *configMock) GetPassword() string { if mock.GetPasswordFunc == nil { diff --git a/internal/elasticsearch/v8/client.go b/internal/elasticsearch/v8/client.go index 220b1f7..7eb7de3 100644 --- a/internal/elasticsearch/v8/client.go +++ b/internal/elasticsearch/v8/client.go @@ -98,17 +98,11 @@ func (c *Client) Bulk(ctx context.Context, reader io.Reader) (io.ReadCloser, err return result.Body, nil } -func (c *Client) PrepareCreateOperation(item opencdc.Record) (interface{}, interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareCreateOperation(item opencdc.Record, index string) (interface{}, interface{}, error) { // Prepare metadata metadata := bulkRequestActionAndMetadata{ Create: &bulkRequestCreateAction{ - Index: indexName, + Index: index, }, } @@ -121,23 +115,19 @@ func (c *Client) PrepareCreateOperation(item opencdc.Record) (interface{}, inter return metadata, bulkRequestCreateSource(payload), nil } -func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record) (interface{}, interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record, index string) (interface{}, interface{}, error) { // Prepare metadata metadata := bulkRequestActionAndMetadata{ Update: &bulkRequestUpdateAction{ ID: key, - Index: indexName, + Index: index, RetryOnConflict: 3, }, } // Prepare payload + var err error + payload := bulkRequestOptionalSource{ Doc: nil, DocAsUpsert: true, @@ -151,17 +141,11 @@ func (c *Client) PrepareUpsertOperation(key string, item opencdc.Record) (interf return metadata, payload, nil } -func (c *Client) PrepareDeleteOperation(key string, item opencdc.Record) (interface{}, error) { - // Determine the index name - indexName, err := c.cfg.GetIndex(item) - if err != nil { - return nil, fmt.Errorf("failed to determine index name: %w", err) - } - +func (c *Client) PrepareDeleteOperation(key string, index string) (interface{}, error) { return bulkRequestActionAndMetadata{ Delete: &bulkRequestDeleteAction{ ID: key, - Index: indexName, + Index: index, }, }, nil } diff --git a/internal/elasticsearch/v8/client_test.go b/internal/elasticsearch/v8/client_test.go index 8b9000d..9521a31 100644 --- a/internal/elasticsearch/v8/client_test.go +++ b/internal/elasticsearch/v8/client_test.go @@ -15,8 +15,6 @@ package v8 import ( - "encoding/json" - "fmt" "testing" "github.com/conduitio/conduit-commons/opencdc" @@ -49,11 +47,7 @@ func TestClient_GetClient(t *testing.T) { func TestClient_PrepareCreateOperation(t *testing.T) { t.Run("Fails when payload could not be prepared", func(t *testing.T) { client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, + cfg: &configMock{}, } metadata, payload, err := client.PrepareCreateOperation(sdk.SourceUtil{}.NewRecordCreate( @@ -63,79 +57,18 @@ func TestClient_PrepareCreateOperation(t *testing.T) { opencdc.StructuredData{ "foo": complex64(1 + 2i), }, - )) + ), indexName) require.Nil(t, metadata) require.Nil(t, payload) require.EqualError(t, err, "json: unsupported type: complex64") }) - - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - }, - } - - metadata, payload, err := client.PrepareCreateOperation(sdk.SourceUtil{}.NewRecordCreate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - )) - - require.Nil(t, metadata) - require.Nil(t, payload) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - - t.Run("Successfully prepares create operation", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, - } - - metadata, payload, err := client.PrepareCreateOperation(sdk.SourceUtil{}.NewRecordCreate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - )) - - require.NoError(t, err) - require.NotNil(t, metadata) - require.NotNil(t, payload) - - expectedMetadata := bulkRequestActionAndMetadata{ - Create: &bulkRequestCreateAction{ - Index: indexName, - }, - } - - expectedPayload := bulkRequestCreateSource([]byte(`{"foo":"bar"}`)) - - require.Equal(t, expectedMetadata, metadata) - require.Equal(t, expectedPayload, payload) - }) } func TestClient_PrepareUpsertOperation(t *testing.T) { t.Run("Fails when payload could not be prepared", func(t *testing.T) { client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, + cfg: &configMock{}, } metadata, payload, err := client.PrepareUpsertOperation( @@ -151,119 +84,24 @@ func TestClient_PrepareUpsertOperation(t *testing.T) { "foo": complex64(1 + 2i), }, ), + indexName, ) require.Nil(t, metadata) require.Nil(t, payload) require.EqualError(t, err, "json: unsupported type: complex64") }) - - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - }, - } - - metadata, payload, err := client.PrepareUpsertOperation( - "key", - sdk.SourceUtil{}.NewRecordUpdate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - opencdc.StructuredData{ - "foo": "baz", - }, - ), - ) - - require.Nil(t, metadata) - require.Nil(t, payload) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - - t.Run("Successfully prepares upsert operation", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, - } - - metadata, payload, err := client.PrepareUpsertOperation( - "key", - sdk.SourceUtil{}.NewRecordUpdate( - nil, - nil, - nil, - opencdc.StructuredData{ - "foo": "bar", - }, - opencdc.StructuredData{ - "foo": "baz", - }, - ), - ) - - require.NoError(t, err) - require.NotNil(t, metadata) - require.NotNil(t, payload) - - expectedMetadata := bulkRequestActionAndMetadata{ - Update: &bulkRequestUpdateAction{ - ID: "key", - Index: indexName, - RetryOnConflict: 3, - }, - } - - expectedPayload := bulkRequestOptionalSource{ - Doc: json.RawMessage(`{"foo":"baz"}`), - DocAsUpsert: true, - } - - require.Equal(t, expectedMetadata, metadata) - require.Equal(t, expectedPayload, payload) - }) } func TestClient_PrepareDeleteOperation(t *testing.T) { - t.Run("Fails when index name could not be determined", func(t *testing.T) { - client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return "", fmt.Errorf("failed to determine index") - }, - }, - } - - metadata, err := client.PrepareDeleteOperation( - "key", - sdk.SourceUtil{}.NewRecordDelete(nil, nil, nil, nil), - ) - - require.Nil(t, metadata) - require.EqualError(t, err, "failed to determine index name: failed to determine index") - }) - t.Run("Successfully prepares delete operation", func(t *testing.T) { client := Client{ - cfg: &configMock{ - GetIndexFunc: func(_ opencdc.Record) (string, error) { - return indexName, nil - }, - }, + cfg: &configMock{}, } metadata, err := client.PrepareDeleteOperation( "key", - sdk.SourceUtil{}.NewRecordDelete(nil, nil, nil, nil), + indexName, ) require.NoError(t, err) diff --git a/internal/elasticsearch/v8/config.go b/internal/elasticsearch/v8/config.go index 409e585..2556df9 100644 --- a/internal/elasticsearch/v8/config.go +++ b/internal/elasticsearch/v8/config.go @@ -14,8 +14,6 @@ package v8 -import "github.com/conduitio/conduit-commons/opencdc" - //go:generate moq -out config_moq_test.go . config type config interface { GetHost() string @@ -25,5 +23,4 @@ type config interface { GetAPIKey() string GetServiceToken() string GetCertificateFingerprint() string - GetIndex(r opencdc.Record) (string, error) } diff --git a/internal/elasticsearch/v8/config_moq_test.go b/internal/elasticsearch/v8/config_moq_test.go index 15c161a..026fada 100644 --- a/internal/elasticsearch/v8/config_moq_test.go +++ b/internal/elasticsearch/v8/config_moq_test.go @@ -4,7 +4,6 @@ package v8 import ( - "github.com/conduitio/conduit-commons/opencdc" "sync" ) @@ -30,9 +29,6 @@ var _ config = &configMock{} // GetHostFunc: func() string { // panic("mock out the GetHost method") // }, -// GetIndexFunc: func(r opencdc.Record) (string, error) { -// panic("mock out the GetIndex method") -// }, // GetPasswordFunc: func() string { // panic("mock out the GetPassword method") // }, @@ -61,9 +57,6 @@ type configMock struct { // GetHostFunc mocks the GetHost method. GetHostFunc func() string - // GetIndexFunc mocks the GetIndex method. - GetIndexFunc func(r opencdc.Record) (string, error) - // GetPasswordFunc mocks the GetPassword method. GetPasswordFunc func() string @@ -87,11 +80,6 @@ type configMock struct { // GetHost holds details about calls to the GetHost method. GetHost []struct { } - // GetIndex holds details about calls to the GetIndex method. - GetIndex []struct { - // R is the r argument value. - R opencdc.Record - } // GetPassword holds details about calls to the GetPassword method. GetPassword []struct { } @@ -106,7 +94,6 @@ type configMock struct { lockGetCertificateFingerprint sync.RWMutex lockGetCloudID sync.RWMutex lockGetHost sync.RWMutex - lockGetIndex sync.RWMutex lockGetPassword sync.RWMutex lockGetServiceToken sync.RWMutex lockGetUsername sync.RWMutex @@ -220,38 +207,6 @@ func (mock *configMock) GetHostCalls() []struct { return calls } -// GetIndex calls GetIndexFunc. -func (mock *configMock) GetIndex(r opencdc.Record) (string, error) { - if mock.GetIndexFunc == nil { - panic("configMock.GetIndexFunc: method is nil but config.GetIndex was just called") - } - callInfo := struct { - R opencdc.Record - }{ - R: r, - } - mock.lockGetIndex.Lock() - mock.calls.GetIndex = append(mock.calls.GetIndex, callInfo) - mock.lockGetIndex.Unlock() - return mock.GetIndexFunc(r) -} - -// GetIndexCalls gets all the calls that were made to GetIndex. -// Check the length with: -// -// len(mockedconfig.GetIndexCalls()) -func (mock *configMock) GetIndexCalls() []struct { - R opencdc.Record -} { - var calls []struct { - R opencdc.Record - } - mock.lockGetIndex.RLock() - calls = mock.calls.GetIndex - mock.lockGetIndex.RUnlock() - return calls -} - // GetPassword calls GetPasswordFunc. func (mock *configMock) GetPassword() string { if mock.GetPasswordFunc == nil {