Skip to content

Commit

Permalink
update to use conduit commons
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Oct 18, 2024
1 parent 27a3539 commit 33aef92
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 48 deletions.
3 changes: 1 addition & 2 deletions cmd/connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@
package main

import (
sdk "github.com/conduitio/conduit-connector-sdk"

weather "github.com/conduitio-labs/conduit-connector-weather"
sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var exampleConfig = map[string]string{
func TestParseConfig(t *testing.T) {
is := is.New(t)
var got SourceConfig
err := sdk.Util.ParseConfig(exampleConfig, &got)
err := sdk.Util.ParseConfig(ctx, exampleConfig, &got)

Check failure on line 35 in config_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: ctx

Check failure on line 35 in config_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

not enough arguments in call to sdk.Util.ParseConfig

Check failure on line 35 in config_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

undefined: ctx

Check failure on line 35 in config_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

not enough arguments in call to sdk.Util.ParseConfig

Check failure on line 35 in config_test.go

View workflow job for this annotation

GitHub Actions / build

undefined: ctx

Check failure on line 35 in config_test.go

View workflow job for this annotation

GitHub Actions / build

not enough arguments in call to sdk.Util.ParseConfig
want := SourceConfig{
APPID: "my-appid-key",
City: "San Francisco",
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
module github.com/conduitio-labs/conduit-connector-weather

go 1.22.2
go 1.23.2

require (
github.com/conduitio/conduit-commons v0.4.0
github.com/conduitio/conduit-connector-sdk v0.11.0
github.com/golangci/golangci-lint v1.61.0
github.com/matryer/is v1.4.1
Expand Down Expand Up @@ -46,7 +48,6 @@ require (
github.com/charithe/durationcheck v0.0.10 // indirect
github.com/chavacava/garif v0.1.0 // indirect
github.com/ckaznocha/intrange v0.2.0 // indirect
github.com/conduitio/conduit-commons v0.4.0 // indirect
github.com/conduitio/conduit-connector-protocol v0.8.0 // indirect
github.com/curioswitch/go-reassign v0.2.0 // indirect
github.com/daixiang0/gci v0.13.5 // indirect
Expand Down
32 changes: 15 additions & 17 deletions paramgen_src.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 23 additions & 21 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"net/http"
"time"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
"golang.org/x/time/rate"
)
Expand All @@ -39,22 +41,22 @@ func NewSource() sdk.Source {
return &Source{}
}

func (s *Source) Parameters() map[string]sdk.Parameter {
func (s *Source) Parameters() config.Parameters {
return s.config.Parameters()
}

func (s *Source) Configure(ctx context.Context, cfg map[string]string) error {
func (s *Source) Configure(ctx context.Context, cfg config.Config) error {
sdk.Logger(ctx).Info().Msg("Configuring Weather Source Connector...")
var config SourceConfig
err := sdk.Util.ParseConfig(cfg, &config)
err := sdk.Util.ParseConfig(ctx, cfg, &config, NewSource().Parameters())
if err != nil {
return err
}
s.config = config
return nil
}

func (s *Source) Open(ctx context.Context, _ sdk.Position) error {
func (s *Source) Open(ctx context.Context, _ opencdc.Position) error {
s.client = &http.Client{}
s.url = s.CreateRequestURL()
// try pinging the URL with APPID
Expand All @@ -76,19 +78,19 @@ func (s *Source) Open(ctx context.Context, _ sdk.Position) error {
return nil
}

func (s *Source) Read(ctx context.Context) (sdk.Record, error) {
func (s *Source) Read(ctx context.Context) (opencdc.Record, error) {
err := s.limiter.Wait(ctx)
if err != nil {
return sdk.Record{}, err
return opencdc.Record{}, err
}
rec, err := s.getRecord(ctx)
if err != nil {
return sdk.Record{}, fmt.Errorf("error getting the weather data: %w", err)
return opencdc.Record{}, fmt.Errorf("error getting the weather data: %w", err)
}
return rec, nil
}

func (s *Source) Ack(ctx context.Context, position sdk.Position) error {
func (s *Source) Ack(ctx context.Context, position opencdc.Position) error {
sdk.Logger(ctx).Debug().Str("position", string(position)).Msg("got ack")
return nil // no ack needed
}
Expand All @@ -104,47 +106,47 @@ func (s *Source) CreateRequestURL() string {
return s.config.URL + "?" + "q=" + s.config.City + "&" + "APPID=" + s.config.APPID + "&" + "units=" + s.config.Units
}

func (s *Source) getRecord(ctx context.Context) (sdk.Record, error) {
func (s *Source) getRecord(ctx context.Context) (opencdc.Record, error) {
// create GET request
req, err := http.NewRequestWithContext(ctx, http.MethodGet, s.url, nil)
if err != nil {
return sdk.Record{}, fmt.Errorf("error creating HTTP request: %w", err)
return opencdc.Record{}, fmt.Errorf("error creating HTTP request: %w", err)
}
// get response
resp, err := s.client.Do(req)
if err != nil {
return sdk.Record{}, fmt.Errorf("error getting data from URL: %w", err)
return opencdc.Record{}, fmt.Errorf("error getting data from URL: %w", err)
}
defer resp.Body.Close()
// check response status
if resp.StatusCode != http.StatusOK {
return sdk.Record{}, fmt.Errorf("response status should be %v, got status=%v", http.StatusOK, resp.StatusCode)
return opencdc.Record{}, fmt.Errorf("response status should be %v, got status=%v", http.StatusOK, resp.StatusCode)
}
// read body
body, err := io.ReadAll(resp.Body)
if err != nil {
return sdk.Record{}, fmt.Errorf("error reading body for response %v: %w", resp, err)
return opencdc.Record{}, fmt.Errorf("error reading body for response %v: %w", resp, err)
}
// parse json
var structData sdk.StructuredData
var structData opencdc.StructuredData
err = json.Unmarshal(body, &structData)
if err != nil {
return sdk.Record{}, fmt.Errorf("failed to unmarshal body as JSON: %w", err)
return opencdc.Record{}, fmt.Errorf("failed to unmarshal body as JSON: %w", err)
}
// create record
now := time.Now().Unix()
timestamp, ok := structData["dt"]
if !ok {
return sdk.Record{}, fmt.Errorf("dt field not found in record: %w", err)
return opencdc.Record{}, fmt.Errorf("dt field not found in record: %w", err)
}
rec := sdk.Record{
Payload: sdk.Change{
rec := opencdc.Record{
Payload: opencdc.Change{
Before: nil,
After: structData,
},
Operation: sdk.OperationCreate,
Position: sdk.Position(fmt.Sprintf("unix-%v", now)),
Key: sdk.RawData(fmt.Sprintf("%v", timestamp)),
Operation: opencdc.OperationCreate,
Position: opencdc.Position(fmt.Sprintf("unix-%v", now)),
Key: opencdc.RawData(fmt.Sprintf("%v", timestamp)),
}
return rec, nil
}
4 changes: 2 additions & 2 deletions source_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"time"

weather "github.com/conduitio-labs/conduit-connector-weather"

Check failure on line 23 in source_integration_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

could not import github.com/conduitio-labs/conduit-connector-weather (-: # github.com/conduitio-labs/conduit-connector-weather [github.com/conduitio-labs/conduit-connector-weather.test]
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/matryer/is"
)

Expand All @@ -42,7 +42,7 @@ func TestSource_GetWeather(t *testing.T) {
})
is.NoErr(err)
ctx := context.Background()
err = con.Open(ctx, sdk.Position{})
err = con.Open(ctx, opencdc.Position{})
is.NoErr(err)
// first read should succeed
timeBefore := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"

weather "github.com/conduitio-labs/conduit-connector-weather"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/matryer/is"
)

Expand All @@ -34,7 +34,7 @@ func TestSource_ConfigureAndOpen(t *testing.T) {
})
is.NoErr(err)
ctx := context.Background()
err = con.Open(ctx, sdk.Position{})
err = con.Open(ctx, opencdc.Position{})
is.True(err != nil) // auth failed
url := con.CreateRequestURL()
is.Equal(url, "https://api.openweathermap.org/data/2.5/weather?q=london&APPID=my-id&units=metric")
Expand Down
2 changes: 1 addition & 1 deletion tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package main

import (
_ "github.com/conduitio/conduit-connector-sdk/cmd/paramgen"
_ "github.com/conduitio/conduit-commons/paramgen"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "go.uber.org/mock/mockgen"
)

0 comments on commit 33aef92

Please sign in to comment.