Skip to content

Commit

Permalink
fix: refactor index name retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
Gaurav Sahil authored and Gaurav Sahil committed Oct 17, 2024
1 parent a0fdcd9 commit 1cb1a99
Show file tree
Hide file tree
Showing 22 changed files with 227 additions and 828 deletions.
80 changes: 46 additions & 34 deletions destination/client_moq_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 16 additions & 10 deletions destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/conduitio/conduit-commons/opencdc"
)

type IndexFn func(opencdc.Record) (string, error)

type Config struct {
// The version of the Elasticsearch service. One of: 5, 6, 7, 8.
Version elasticsearch.Version `json:"version" validate:"required"`
Expand Down Expand Up @@ -86,27 +88,31 @@ func (c Config) GetType() string {
return c.Type
}

// GetIndex determines the index for each record individually.
// IndexFunction returns a function that determines the index for each record individually.
// The function might be returning a static index name.
// If the index is neither static nor a template, an error is returned.
func (c Config) GetIndex(r opencdc.Record) (string, error) {
func (c Config) IndexFunction() (f IndexFn, err error) {
// Not a template, i.e. it's a static index name
if !strings.Contains(c.Index, "{{") && !strings.Contains(c.Index, "}}") {
return c.Index, nil
return func(_ opencdc.Record) (string, error) {
return c.Index, nil
}, nil
}

// Try to parse the index
t, err := template.New("index").Funcs(sprig.FuncMap()).Parse(c.Index)
if err != nil {
// The index is not a valid Go template.
return "", fmt.Errorf("index is neither a valid static table nor a valid Go template: %w", err)
return nil, fmt.Errorf("index is neither a valid static index nor a valid Go template: %w", err)
}

// The index is a valid template, return index
// The index is a valid template, return IndexFn.
var buf bytes.Buffer
buf.Reset()
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute index template: %w", err)
}
return buf.String(), nil
return func(r opencdc.Record) (string, error) {
buf.Reset()
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute index template: %w", err)
}
return buf.String(), nil
}, nil
}
91 changes: 72 additions & 19 deletions destination/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"

"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/jaswdr/faker"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -57,29 +58,81 @@ func TestConfig_Getters(t *testing.T) {
require.Equal(t, serviceToken, config.GetServiceToken())
require.Equal(t, certificateFingerprint, config.GetCertificateFingerprint())
require.Equal(t, indexType, config.GetType())

record := opencdc.Record{}
index, err := config.GetIndex(record)
require.NoError(t, err)
require.Equal(t, indexName, index)
}

func TestConfig_GetIndex_Template(t *testing.T) {
fakerInstance := faker.New()
func TestConfig_IndexFunction(t *testing.T) {
t.Run("static index name", func(t *testing.T) {
config := Config{
Index: "test-index",
}

indexName := fakerInstance.Lorem().Word()
indexFn, err := config.IndexFunction()
require.NoError(t, err)
require.NotNil(t, indexFn)

config := Config{
Index: "{{ index .Metadata \"opencdc.collection\" }}",
}
index, err := indexFn(opencdc.Record{})
require.NoError(t, err)
require.Equal(t, "test-index", index)

record := opencdc.Record{
Metadata: map[string]string{
"opencdc.collection": indexName,
},
}
record := sdk.SourceUtil{}.NewRecordCreate(
nil,
map[string]string{"opencdc.collection": "other-index"},
nil,
nil,
)
index, err = indexFn(record)
require.NoError(t, err)
require.Equal(t, "test-index", index)
})

t.Run("template with metadata", func(t *testing.T) {
config := Config{
Index: "{{ index .Metadata \"opencdc.collection\" }}",
}

indexFn, err := config.IndexFunction()
require.NoError(t, err)
require.NotNil(t, indexFn)

record := sdk.SourceUtil{}.NewRecordCreate(
nil,
map[string]string{"opencdc.collection": "dynamic-index"},
nil,
nil,
)
index, err := indexFn(record)
require.NoError(t, err)
require.Equal(t, "dynamic-index", index)

record = sdk.SourceUtil{}.NewRecordCreate(nil, nil, nil, nil)
index, err = indexFn(record)
require.NoError(t, err)
require.Equal(t, "", index)
})

t.Run("invalid template syntax", func(t *testing.T) {
config := Config{
Index: "{{ invalid syntax }}",
}

indexFn, err := config.IndexFunction()
require.Error(t, err)
require.Nil(t, indexFn)
require.Contains(t, err.Error(), "index is neither a valid static index nor a valid Go template")
})

t.Run("template with invalid function", func(t *testing.T) {
config := Config{
Index: "{{ .InvalidFunction }}",
}

indexFn, err := config.IndexFunction()
require.NoError(t, err)
require.NotNil(t, indexFn)

index, err := config.GetIndex(record)
require.NoError(t, err)
require.Equal(t, indexName, index)
record := sdk.SourceUtil{}.NewRecordCreate(nil, nil, nil, nil)
_, err = indexFn(record)
require.Error(t, err)
require.Contains(t, err.Error(), "failed to execute index template")
})
}
Loading

0 comments on commit 1cb1a99

Please sign in to comment.