diff --git a/cmd/connector/main.go b/cmd/connector/main.go index 3449493..92708ba 100644 --- a/cmd/connector/main.go +++ b/cmd/connector/main.go @@ -15,9 +15,8 @@ package main import ( - sdk "github.com/conduitio/conduit-connector-sdk" - weather "github.com/conduitio-labs/conduit-connector-weather" + sdk "github.com/conduitio/conduit-connector-sdk" ) func main() { diff --git a/config_test.go b/config_test.go index c393bed..9508770 100644 --- a/config_test.go +++ b/config_test.go @@ -32,7 +32,7 @@ var exampleConfig = map[string]string{ func TestParseConfig(t *testing.T) { is := is.New(t) var got SourceConfig - err := sdk.Util.ParseConfig(exampleConfig, &got) + err := sdk.Util.ParseConfig(ctx, exampleConfig, &got) want := SourceConfig{ APPID: "my-appid-key", City: "San Francisco", diff --git a/go.mod b/go.mod index 2682d8f..972deb1 100644 --- a/go.mod +++ b/go.mod @@ -1,7 +1,9 @@ module github.com/conduitio-labs/conduit-connector-weather -go 1.22.2 +go 1.23.2 + require ( + github.com/conduitio/conduit-commons v0.4.0 github.com/conduitio/conduit-connector-sdk v0.11.0 github.com/golangci/golangci-lint v1.61.0 github.com/matryer/is v1.4.1 @@ -46,7 +48,6 @@ require ( github.com/charithe/durationcheck v0.0.10 // indirect github.com/chavacava/garif v0.1.0 // indirect github.com/ckaznocha/intrange v0.2.0 // indirect - github.com/conduitio/conduit-commons v0.4.0 // indirect github.com/conduitio/conduit-connector-protocol v0.8.0 // indirect github.com/curioswitch/go-reassign v0.2.0 // indirect github.com/daixiang0/gci v0.13.5 // indirect diff --git a/paramgen_src.go b/paramgen_src.go index 982762f..304abb2 100644 --- a/paramgen_src.go +++ b/paramgen_src.go @@ -3,45 +3,43 @@ package weather -import ( - sdk "github.com/conduitio/conduit-connector-sdk" -) +import "github.com/conduitio/conduit-commons/config" -func (SourceConfig) Parameters() map[string]sdk.Parameter { - return map[string]sdk.Parameter{ +func (SourceConfig) Parameters() config.Parameters { + return map[string]config.Parameter{ "appid": { Default: "", Description: "your unique API key (you can always find it on your account page under https://home.openweathermap.org/api_keys)", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationRequired{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationRequired{}, }, }, "city": { Default: "new york", Description: "city name to get the current weather for, ex: California, San Francisco, london. you can find the cities list {city.list.json.gz} on http://bulk.openweathermap.org/sample/", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, "pollingPeriod": { Default: "5m", Description: "how often the connector will get data from the url", - Type: sdk.ParameterTypeDuration, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeDuration, + Validations: []config.Validation{}, }, "units": { Default: "imperial", Description: "units of measurement, for Fahrenheit use imperial, for Celsius use metric, for Kelvin use standard.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{ - sdk.ValidationInclusion{List: []string{"imperial", "standard", "metric"}}, + Type: config.ParameterTypeString, + Validations: []config.Validation{ + config.ValidationInclusion{List: []string{"imperial", "standard", "metric"}}, }, }, "url": { Default: "https://api.openweathermap.org/data/2.5/weather", Description: "url that contains the weather data", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, + Type: config.ParameterTypeString, + Validations: []config.Validation{}, }, } } diff --git a/source.go b/source.go index ab85a9e..097ec91 100644 --- a/source.go +++ b/source.go @@ -22,6 +22,8 @@ import ( "net/http" "time" + "github.com/conduitio/conduit-commons/config" + "github.com/conduitio/conduit-commons/opencdc" sdk "github.com/conduitio/conduit-connector-sdk" "golang.org/x/time/rate" ) @@ -39,14 +41,14 @@ func NewSource() sdk.Source { return &Source{} } -func (s *Source) Parameters() map[string]sdk.Parameter { +func (s *Source) Parameters() config.Parameters { return s.config.Parameters() } -func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { +func (s *Source) Configure(ctx context.Context, cfg config.Config) error { sdk.Logger(ctx).Info().Msg("Configuring Weather Source Connector...") var config SourceConfig - err := sdk.Util.ParseConfig(cfg, &config) + err := sdk.Util.ParseConfig(ctx, cfg, &config, NewSource().Parameters()) if err != nil { return err } @@ -54,7 +56,7 @@ func (s *Source) Configure(ctx context.Context, cfg map[string]string) error { return nil } -func (s *Source) Open(ctx context.Context, _ sdk.Position) error { +func (s *Source) Open(ctx context.Context, _ opencdc.Position) error { s.client = &http.Client{} s.url = s.CreateRequestURL() // try pinging the URL with APPID @@ -76,19 +78,19 @@ func (s *Source) Open(ctx context.Context, _ sdk.Position) error { return nil } -func (s *Source) Read(ctx context.Context) (sdk.Record, error) { +func (s *Source) Read(ctx context.Context) (opencdc.Record, error) { err := s.limiter.Wait(ctx) if err != nil { - return sdk.Record{}, err + return opencdc.Record{}, err } rec, err := s.getRecord(ctx) if err != nil { - return sdk.Record{}, fmt.Errorf("error getting the weather data: %w", err) + return opencdc.Record{}, fmt.Errorf("error getting the weather data: %w", err) } return rec, nil } -func (s *Source) Ack(ctx context.Context, position sdk.Position) error { +func (s *Source) Ack(ctx context.Context, position opencdc.Position) error { sdk.Logger(ctx).Debug().Str("position", string(position)).Msg("got ack") return nil // no ack needed } @@ -104,47 +106,47 @@ func (s *Source) CreateRequestURL() string { return s.config.URL + "?" + "q=" + s.config.City + "&" + "APPID=" + s.config.APPID + "&" + "units=" + s.config.Units } -func (s *Source) getRecord(ctx context.Context) (sdk.Record, error) { +func (s *Source) getRecord(ctx context.Context) (opencdc.Record, error) { // create GET request req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil) if err != nil { - return sdk.Record{}, fmt.Errorf("error creating HTTP request: %w", err) + return opencdc.Record{}, fmt.Errorf("error creating HTTP request: %w", err) } // get response resp, err := s.client.Do(req) if err != nil { - return sdk.Record{}, fmt.Errorf("error getting data from URL: %w", err) + return opencdc.Record{}, fmt.Errorf("error getting data from URL: %w", err) } defer resp.Body.Close() // check response status if resp.StatusCode != http.StatusOK { - return sdk.Record{}, fmt.Errorf("response status should be %v, got status=%v", http.StatusOK, resp.StatusCode) + return opencdc.Record{}, fmt.Errorf("response status should be %v, got status=%v", http.StatusOK, resp.StatusCode) } // read body body, err := io.ReadAll(resp.Body) if err != nil { - return sdk.Record{}, fmt.Errorf("error reading body for response %v: %w", resp, err) + return opencdc.Record{}, fmt.Errorf("error reading body for response %v: %w", resp, err) } // parse json - var structData sdk.StructuredData + var structData opencdc.StructuredData err = json.Unmarshal(body, &structData) if err != nil { - return sdk.Record{}, fmt.Errorf("failed to unmarshal body as JSON: %w", err) + return opencdc.Record{}, fmt.Errorf("failed to unmarshal body as JSON: %w", err) } // create record now := time.Now().Unix() timestamp, ok := structData["dt"] if !ok { - return sdk.Record{}, fmt.Errorf("dt field not found in record: %w", err) + return opencdc.Record{}, fmt.Errorf("dt field not found in record: %w", err) } - rec := sdk.Record{ - Payload: sdk.Change{ + rec := opencdc.Record{ + Payload: opencdc.Change{ Before: nil, After: structData, }, - Operation: sdk.OperationCreate, - Position: sdk.Position(fmt.Sprintf("unix-%v", now)), - Key: sdk.RawData(fmt.Sprintf("%v", timestamp)), + Operation: opencdc.OperationCreate, + Position: opencdc.Position(fmt.Sprintf("unix-%v", now)), + Key: opencdc.RawData(fmt.Sprintf("%v", timestamp)), } return rec, nil } diff --git a/source_integration_test.go b/source_integration_test.go index 0f6939f..42163b4 100644 --- a/source_integration_test.go +++ b/source_integration_test.go @@ -21,7 +21,7 @@ import ( "time" weather "github.com/conduitio-labs/conduit-connector-weather" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" "github.com/matryer/is" ) @@ -42,7 +42,7 @@ func TestSource_GetWeather(t *testing.T) { }) is.NoErr(err) ctx := context.Background() - err = con.Open(ctx, sdk.Position{}) + err = con.Open(ctx, opencdc.Position{}) is.NoErr(err) // first read should succeed timeBefore := time.Now() diff --git a/source_test.go b/source_test.go index 392a2bb..37410a5 100644 --- a/source_test.go +++ b/source_test.go @@ -19,7 +19,7 @@ import ( "testing" weather "github.com/conduitio-labs/conduit-connector-weather" - sdk "github.com/conduitio/conduit-connector-sdk" + "github.com/conduitio/conduit-commons/opencdc" "github.com/matryer/is" ) @@ -34,7 +34,7 @@ func TestSource_ConfigureAndOpen(t *testing.T) { }) is.NoErr(err) ctx := context.Background() - err = con.Open(ctx, sdk.Position{}) + err = con.Open(ctx, opencdc.Position{}) is.True(err != nil) // auth failed url := con.CreateRequestURL() is.Equal(url, "https://api.openweathermap.org/data/2.5/weather?q=london&APPID=my-id&units=metric") diff --git a/tools.go b/tools.go index b245373..d2ee24b 100644 --- a/tools.go +++ b/tools.go @@ -17,7 +17,7 @@ package main import ( - _ "github.com/conduitio/conduit-connector-sdk/cmd/paramgen" + _ "github.com/conduitio/conduit-commons/paramgen" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" _ "go.uber.org/mock/mockgen" )