Skip to content

Commit

Permalink
Merge pull request #23 from krakend/update_to_krakend_ratelimit
Browse files Browse the repository at this point in the history
Update to krakend ratelimit
  • Loading branch information
kpacha authored Oct 26, 2023
2 parents 4023533 + ba3ad3b commit 30596fe
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 173 deletions.
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
test:
docker-compose -f ./docker-compose.test.yml up -d
sleep 2
# since we use rabbitmq sending messages, we do not want parallel tests:
go test -p 1 --tags=integration ./...
docker-compose -f ./docker-compose.test.yml down & true

.SILENT: test
.PHONY: test
28 changes: 24 additions & 4 deletions async/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"sync/atomic"
"time"

"github.com/juju/ratelimit"
"github.com/streadway/amqp"

ratelimit "github.com/krakendio/krakend-ratelimit/v3"

"github.com/luraproject/lura/v2/config"
"github.com/luraproject/lura/v2/logging"
"github.com/luraproject/lura/v2/proxy"
Expand Down Expand Up @@ -74,22 +75,40 @@ func New(ctx context.Context, cfg Subscriber, opts Options) error {
defer closeF()

// initial ping
if cap(opts.Ping) < 1 {
opts.Logger.Warning(
fmt.Sprintf("[SERVICE: AsyncAgent][AMQP][%s] Ping channel with 0 capacity might block this async agent",
cfg.Name))
}
opts.Ping <- cfg.Name

shouldAck := newProcessor(ctx, cfg, opts.Logger, opts.Proxy)
if cfg.Workers < 1 {
// If the number of workers is 0, this
// we probably need to check that the minimum amount of workers is 1
opts.Logger.Error(
fmt.Sprintf("[SERVICE: AsyncAgent][AMQP][%s] With less than 1 worker this agent does no work", cfg.Name))
}
sem := make(chan struct{}, cfg.Workers)
var shouldExit atomic.Value
shouldExit.Store(false)
defer opts.PingTicker.Stop()

waitIfRequired := func() {}
if cfg.MaxRate > 0 {
capacity := int64(cfg.MaxRate)
capacity := uint64(cfg.MaxRate)
if capacity == 0 {
capacity = 1
}
bucket := ratelimit.NewBucketWithRate(cfg.MaxRate, capacity)
waitIfRequired = func() { bucket.Wait(1) }
bucket := ratelimit.NewTokenBucket(cfg.MaxRate, capacity)
// We wait enough time to allow to have at least an extra token
// (even some other goroutine might have consumed while we wait)
pollingTime := time.Nanosecond * time.Duration(1e9/cfg.MaxRate)
waitIfRequired = func() {
for !bucket.Allow() {
time.Sleep(pollingTime)
}
}
}

recvLoop:
Expand Down Expand Up @@ -153,6 +172,7 @@ recvLoop:

opts.Logger.Warning(fmt.Sprintf("[SERVICE: AsyncAgent][AMQP][%s] Consumer stopped", cfg.Name))

// the error is not nil, only when the context is Done() and has an error:
return err
}

Expand Down
Loading

0 comments on commit 30596fe

Please sign in to comment.