Skip to content

Commit

Permalink
refactor: Add Lifecycle service (#1835)
Browse files Browse the repository at this point in the history
* wip

* update method calls

* use configuration

* wip refactor

* wip

* move stream

* add lifecycleService to runtime

* fix

* fix imports

* no errors on lifecycle service

* fix wait

* update status through its method

* it compiles

* update interfaces

* more test fixes

* update mocks

* add mocks and update tests

* fix lint

* remove GetInstances

* delete key instead

Co-Authored-By: Lovro Mažgon <lovro.mazgon@gmail.com>

* ignore test files

* delete test files

* ensures runningPipeline is locked

* fix not initialized map

Co-Authored-By: Lovro Mažgon <lovro.mazgon@gmail.com>

* fix tests

* add back the needed test file

* update comment

* add comment

* locks while retrieving the pipeline

* update comment

* unsure about the need for this

* remove backoff from pipeline service

* update method name on lifecycle service

- changes run to init

* lock running pipelines and optimize stopAll

* remove redundant call to update status

* fix typo

* pr feedback

* uses csync Map

Uses ConduitIO/conduit-commons#116

Co-Authored-By: Lovro Mažgon <lovro.mazgon@gmail.com>

* go mod tidy

* update conduit-commons

* go mod tidy

---------

Co-authored-by: Lovro Mažgon <lovro.mazgon@gmail.com>
  • Loading branch information
raulb and lovromazgon authored Sep 13, 2024
1 parent 357be13 commit 0072ae1
Show file tree
Hide file tree
Showing 54 changed files with 949 additions and 746 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,9 @@ escape_analysis.txt

# Compiled test wasm processors
pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm

# Test data
**/test/*.txt

# this one is needed for integration tests
!pkg/provisioning/test/source-file.txt
15 changes: 7 additions & 8 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Masterminds/sprig/v3 v3.3.0
github.com/NYTimes/gziphandler v1.1.1
github.com/bufbuild/buf v1.41.0
github.com/conduitio/conduit-commons v0.3.0
github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674
github.com/conduitio/conduit-connector-file v0.7.0
github.com/conduitio/conduit-connector-generator v0.7.0
github.com/conduitio/conduit-connector-kafka v0.9.0
Expand Down Expand Up @@ -141,8 +141,8 @@ require (
github.com/daixiang0/gci v0.13.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/denis-tingaikin/go-header v0.5.0 // indirect
github.com/dgraph-io/badger/v4 v4.2.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dgraph-io/badger/v4 v4.3.0 // indirect
github.com/dgraph-io/ristretto v0.1.2-0.20240116140435-c67e07994f91 // indirect
github.com/distribution/reference v0.6.0 // indirect
github.com/dlclark/regexp2 v1.11.4 // indirect
github.com/docker/cli v27.2.1+incompatible // indirect
Expand Down Expand Up @@ -180,7 +180,6 @@ require (
github.com/gofrs/flock v0.12.1 // indirect
github.com/gofrs/uuid/v5 v5.3.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.2.1 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
Expand All @@ -200,7 +199,7 @@ require (
github.com/gostaticanalysis/comment v1.4.2 // indirect
github.com/gostaticanalysis/forcetypeassert v0.1.0 // indirect
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
github.com/hamba/avro/v2 v2.24.0 // indirect
github.com/hamba/avro/v2 v2.25.2 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand All @@ -213,8 +212,8 @@ require (
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/pgx/v5 v5.6.0 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jackc/pgx/v5 v5.7.1 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/jdx/go-netrc v1.0.0 // indirect
github.com/jgautheron/goconst v1.7.1 // indirect
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
Expand Down Expand Up @@ -375,7 +374,7 @@ require (
modernc.org/libc v1.55.3 // indirect
modernc.org/mathutil v1.6.0 // indirect
modernc.org/memory v1.8.0 // indirect
modernc.org/sqlite v1.31.1 // indirect
modernc.org/sqlite v1.33.1 // indirect
modernc.org/strutil v1.2.0 // indirect
modernc.org/token v1.1.0 // indirect
mvdan.cc/unparam v0.0.0-20240528143540-8a5130ca722f // indirect
Expand Down
37 changes: 16 additions & 21 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ github.com/ccojocar/zxcvbn-go v1.0.2/go.mod h1:g1qkXtUSvHP8lhHp5GrSmTz6uWALGRMQd
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/charithe/durationcheck v0.0.10 h1:wgw73BiocdBDQPik+zcEoBG/ob8uyBHf2iyoHGPf5w4=
Expand All @@ -219,8 +218,8 @@ github.com/ckaznocha/intrange v0.2.0/go.mod h1:r5I7nUlAAG56xmkOpw4XVr16BXhwYTUdc
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/conduitio/conduit-commons v0.3.0 h1:nxQ++O4dK1p717upkyzbCQu0FLIFyP3OrgHZ9Zxvzvg=
github.com/conduitio/conduit-commons v0.3.0/go.mod h1:roxZ88dv+fpbEjjTzkdGwwbmcpunSuiD8he43y0lAoo=
github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674 h1:coVk6aVsbP2u2vheM55GaCtu3+JYdIvOimQ0abUrmLA=
github.com/conduitio/conduit-commons v0.3.1-0.20240913141354-f6dd6c835674/go.mod h1:R/GNsw7iAUy5g2mFUVupAcrfWlGCUACwMvkDDrF39RI=
github.com/conduitio/conduit-connector-file v0.7.0 h1:lUfDdpRZleJ/DDXX3NCzHN6VUYKORU/b443mJH6PJU4=
github.com/conduitio/conduit-connector-file v0.7.0/go.mod h1:OXmcc1eAXmqmn9XoS/C3TdgZn0W1GMyqfNzUZRFmHNU=
github.com/conduitio/conduit-connector-generator v0.7.0 h1:Bqsh/ak7gw6k5E8m0PxXOib0zhNlKbrJcIoLLQ0+S08=
Expand Down Expand Up @@ -276,12 +275,12 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/denis-tingaikin/go-header v0.5.0 h1:SRdnP5ZKvcO9KKRP1KJrhFR3RrlGuD+42t4429eC9k8=
github.com/denis-tingaikin/go-header v0.5.0/go.mod h1:mMenU5bWrok6Wl2UsZjy+1okegmwQ3UgWl4V1D8gjlY=
github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs=
github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgraph-io/badger/v4 v4.3.0 h1:lcsCE1/1qrRhqP+zYx6xDZb8n7U+QlwNicpc676Ub40=
github.com/dgraph-io/badger/v4 v4.3.0/go.mod h1:Sc0T595g8zqAQRDf44n+z3wG4BOqLwceaFntt8KPxUM=
github.com/dgraph-io/ristretto v0.1.2-0.20240116140435-c67e07994f91 h1:Pux6+xANi0I7RRo5E1gflI4EZ2yx3BGZ75JkAIvGEOA=
github.com/dgraph-io/ristretto v0.1.2-0.20240116140435-c67e07994f91/go.mod h1:swkazRqnUf1N62d0Nutz7KIj2UKqsm/H8tD0nBJAXqM=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/dlclark/regexp2 v1.11.4 h1:rPYF9/LECdNymJufQKmri9gV604RvvABwgOA8un7yAo=
Expand All @@ -302,7 +301,6 @@ github.com/dop251/goja v0.0.0-20240806095544-3491d4a58fbe h1:jwFJkgsdelB87ohlXaA
github.com/dop251/goja v0.0.0-20240806095544-3491d4a58fbe/go.mod h1:DF+w/nLMIkvRpyhd/0K+Okbh3fVZBtXLwRtS/ccAa5w=
github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c h1:hLoodLRD4KLWIH8eyAQCLcH8EqIrjac7fCkp/fHnvuQ=
github.com/dop251/goja_nodejs v0.0.0-20231122114759-e84d9a924c5c/go.mod h1:bhGPmCgCCTSRfiMYWjpS46IDo9EUZXlsuUaPXSWGbv0=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
Expand Down Expand Up @@ -394,8 +392,6 @@ github.com/gofrs/uuid/v5 v5.3.0/go.mod h1:CDOjlDMVAtN56jqyRUZh58JT31Tiw7/oQyEXZV
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4=
github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -497,8 +493,8 @@ github.com/gostaticanalysis/testutil v0.4.0 h1:nhdCmubdmDF6VEatUNjgUZBJKWRqugoIS
github.com/gostaticanalysis/testutil v0.4.0/go.mod h1:bLIoPefWXrRi/ssLFWX1dx7Repi5x3CuviD3dgAZaBU=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0 h1:asbCHRVmodnJTuQ3qamDwqVOIjwqUPTYmYuemVOx+Ys=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.22.0/go.mod h1:ggCgvZ2r7uOoQjOyu2Y1NhHmEPPzzuhWgcza5M1Ji1I=
github.com/hamba/avro/v2 v2.24.0 h1:axTlaYDkcSY0dVekRSy8cdrsj5MG86WqosUQacKCids=
github.com/hamba/avro/v2 v2.24.0/go.mod h1:7vDfy/2+kYCE8WUHoj2et59GTv0ap7ptktMXu0QHePI=
github.com/hamba/avro/v2 v2.25.2 h1:28dqbOCB7wA/3+J1ZN4GQ40tzsFtbtItkTPWgl97el0=
github.com/hamba/avro/v2 v2.25.2/go.mod h1:I8glyswHnpED3Nlx2ZdUe+4LJnCOOyiCzLMno9i/Uu0=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M=
github.com/hashicorp/go-plugin v1.6.1 h1:P7MR2UP6gNKGPp+y7EZw2kOiq4IR9WiqLvp0XOsVdwI=
Expand Down Expand Up @@ -534,10 +530,10 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
github.com/jackc/pgx/v5 v5.6.0 h1:SWJzexBzPL5jb0GEsrPMLIsi/3jOo7RHlzTjcAeDrPY=
github.com/jackc/pgx/v5 v5.6.0/go.mod h1:DNZ/vlrUnhWCoFGxHAG8U2ljioxukquj7utPDgtQdTw=
github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk=
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs=
github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA=
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jcmturner/gofork v0.0.0-20180107083740-2aebee971930/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jdx/go-netrc v1.0.0 h1:QbLMLyCZGj0NA8glAhxUpf1zDg6cxnWgMBbjq40W0gQ=
github.com/jdx/go-netrc v1.0.0/go.mod h1:Gh9eFQJnoTNIRHXl2j5bJXA1u84hQWJWgGh569zF3v8=
Expand Down Expand Up @@ -1107,7 +1103,6 @@ golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down Expand Up @@ -1317,8 +1312,8 @@ modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/sortutil v1.2.0 h1:jQiD3PfS2REGJNzNCMMaLSp/wdMNieTbKX920Cqdgqc=
modernc.org/sortutil v1.2.0/go.mod h1:TKU2s7kJMf1AE84OoiGppNHJwvB753OYfNl2WRb++Ss=
modernc.org/sqlite v1.31.1 h1:XVU0VyzxrYHlBhIs1DiEgSl0ZtdnPtbLVy8hSkzxGrs=
modernc.org/sqlite v1.31.1/go.mod h1:UqoylwmTb9F+IqXERT8bW9zzOWN8qwAIcLdzeBZs4hA=
modernc.org/sqlite v1.33.1 h1:trb6Z3YYoeM9eDL1O8do81kP+0ejv+YzgyFo+Gwy0nM=
modernc.org/sqlite v1.33.1/go.mod h1:pXV2xHxhzXZsgT/RtTFAPY6JJDEvOTcTdwADQCCWD4k=
modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA=
modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0=
modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y=
Expand Down
28 changes: 20 additions & 8 deletions pkg/conduit/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/conduitio/conduit/pkg/foundation/metrics"
"github.com/conduitio/conduit/pkg/foundation/metrics/measure"
"github.com/conduitio/conduit/pkg/foundation/metrics/prometheus"
"github.com/conduitio/conduit/pkg/lifecycle"
"github.com/conduitio/conduit/pkg/orchestrator"
"github.com/conduitio/conduit/pkg/pipeline"
conn_plugin "github.com/conduitio/conduit/pkg/plugin/connector"
Expand All @@ -63,6 +64,7 @@ import (
"github.com/conduitio/conduit/pkg/web/ui"
apiv1 "github.com/conduitio/conduit/proto/api/v1"
grpcruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/jpillora/backoff"
"github.com/piotrkowalczuk/promgrpc/v4"
promclient "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -94,6 +96,7 @@ type Runtime struct {
pipelineService *pipeline.Service
connectorService *connector.Service
processorService *processor.Service
lifecycleService *lifecycle.Service

connectorPluginService *conn_plugin.PluginService
processorPluginService *proc_plugin.PluginService
Expand Down Expand Up @@ -201,13 +204,21 @@ func createServices(r *Runtime) error {
tokenService,
)

errRecovery := r.Config.Pipelines.ErrorRecovery
backoffCfg := &backoff.Backoff{
Min: errRecovery.MinDelay,
Max: errRecovery.MaxDelay,
Factor: float64(errRecovery.BackoffFactor),
Jitter: true,
}

plService := pipeline.NewService(r.logger, r.DB)
connService := connector.NewService(r.logger, r.DB, r.connectorPersister)
procService := processor.NewService(r.logger, r.DB, procPluginService)
lifecycleService := lifecycle.NewService(r.logger, backoffCfg, connService, procService, connPluginService, plService)
provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, lifecycleService, r.Config.Pipelines.Path)

provisionService := provisioning.NewService(r.DB, r.logger, plService, connService, procService, connPluginService, r.Config.Pipelines.Path)

orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService)
orc := orchestrator.NewOrchestrator(r.DB, r.logger, plService, connService, procService, connPluginService, procPluginService, lifecycleService)

r.Orchestrator = orc
r.ProvisionService = provisionService
Expand All @@ -220,6 +231,7 @@ func createServices(r *Runtime) error {
r.processorPluginService = procPluginService
r.connSchemaService = connSchemaService
r.procSchemaService = procSchemaService
r.lifecycleService = lifecycleService

return nil
}
Expand Down Expand Up @@ -411,12 +423,12 @@ func (r *Runtime) registerCleanup(t *tomb.Tomb) {
// t.Err() can be nil, when we had a call: t.Kill(nil)
// t.Err() will be context.Canceled, if the tomb's context was canceled
if t.Err() == nil || cerrors.Is(t.Err(), context.Canceled) {
r.pipelineService.StopAll(ctx, pipeline.ErrGracefulShutdown)
r.lifecycleService.StopAll(ctx, pipeline.ErrGracefulShutdown)
} else {
// tomb died due to a real error
r.pipelineService.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
r.lifecycleService.StopAll(ctx, cerrors.Errorf("conduit experienced an error: %w", t.Err()))
}
err := r.pipelineService.Wait(exitTimeout)
err := r.lifecycleService.Wait(exitTimeout)
t.Go(func() error {
r.connectorPersister.Wait()
return r.DB.Close()
Expand Down Expand Up @@ -758,7 +770,7 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error {
}

if r.Config.Pipelines.ExitOnError {
r.pipelineService.OnFailure(func(e pipeline.FailureEvent) {
r.lifecycleService.OnFailure(func(e lifecycle.FailureEvent) {
r.logger.Warn(ctx).
Err(e.Error).
Str(log.PipelineIDField, e.ID).
Expand All @@ -785,7 +797,7 @@ func (r *Runtime) initServices(ctx context.Context, t *tomb.Tomb) error {
}
}

err = r.pipelineService.Run(ctx, r.connectorService, r.processorService, r.connectorPluginService)
err = r.lifecycleService.Init(ctx)
if err != nil {
cerrors.ForEach(err, func(err error) {
r.logger.Err(ctx, err).Msg("pipeline failed to be started")
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/dlq.go → pkg/lifecycle/dlq.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline
package lifecycle

import (
"bytes"
Expand All @@ -21,7 +21,7 @@ import (
"github.com/conduitio/conduit-commons/opencdc"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/pipeline/stream"
"github.com/conduitio/conduit/pkg/lifecycle/stream"
)

// DLQDestination is a DLQ handler that forwards DLQ records to a destination
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/dlq_test.go → pkg/lifecycle/dlq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package pipeline
package lifecycle

import (
"context"
Expand All @@ -22,7 +22,7 @@ import (
"github.com/conduitio/conduit/pkg/connector"
"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
streammock "github.com/conduitio/conduit/pkg/pipeline/stream/mock"
streammock "github.com/conduitio/conduit/pkg/lifecycle/stream/mock"
"github.com/matryer/is"
"go.uber.org/mock/gomock"
)
Expand Down
Loading

0 comments on commit 0072ae1

Please sign in to comment.