Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add rate limit to asset inventory #2055

Merged
merged 6 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,13 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/time v0.5.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240228224816-df926f6c8641 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/grpc v1.62.1 // indirect
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions internal/flavors/benchmark/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func (g *GCP) NewBenchmark(ctx context.Context, log *logp.Logger, cfg *config.Co

return builder.New(
builder.WithBenchmarkDataProvider(bdp),
builder.WithManagerTimeout(cfg.Period),
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure why we need the manager timeout at all instead of just letting it work for as long as the cycle lasts, but after this PR the GCP fetchers will be slower, going at a rate of 100 requests per minute, so for a 1000 requests, that would be 10 minutes, which could conflict with the manager timeout, which has a default of 10m, as the context would be cancelled. given that, i've changed the manager timeout for GCP to be the same as the CSPM cycle period, which is 24h.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense.
Once we update the rest of the cloud providers to have a rate limiter we should consider removing the manager timeout option and configuring all to be limited to the interval period (24h).

Copy link
Contributor

@jeniawhite jeniawhite Apr 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should still consider a scale scenario when the resources combined with the rate limiting exceed the cycle time and make sure that we perform the work up until the end instead of sending partial cycles, but we will still need to have some sort of a upper bound limit in order to make sure that avoid "infinite cycles" (not part of this PR).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeniawhite I agree, but this seems to be a new feature we need to create. The upper bound limit could be (without significant effort) something like this: a cycle still running could postpone a maximum of N new cycles and then get canceled.

However, we should implement it as a new feature and consider the edge scenarios.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. i've opened an issue to handle this scenario - #2180

).Build(ctx, log, cfg, resourceCh, reg)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"context"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/googleapis/gax-go/v2"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

// a gax.CallOption that defines a retry strategy which retries the request on ResourceExhausted error.
var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
Initial: 1 * time.Second,
Max: 1 * time.Minute,
Multiplier: 1.2,
})
})

type AssetsInventoryRateLimiter struct {
methods map[string]*rate.Limiter
log *logp.Logger
}

// a map of asset inventory client methods and their quotas.
// see https://cloud.google.com/asset-inventory/docs/quota
var methods = map[string]*rate.Limiter{
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if at some point we'll use more methods from the assets inventory, we can add them here.

it might be better to add the rate limiting directly to the ListAssets method instead of adding it to the whole assets inventory client and only limit methods we pre-define, but i didn't find a way to do this. (the grpc.CallOption interface does not export relevant types)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so what happens in case we call a method that is not ListAssets? the interceptor is still active?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interceptor will be called but we'll not wait, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the interceptor will be called but we'll not wait, right?

yeah the interceptor will just be a pass-through function

// using per-project quota suffices for both single-account and organization-account, because it's more restrictive.
"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1),
}

func NewAssetsInventoryRateLimiter(log *logp.Logger) *AssetsInventoryRateLimiter {
return &AssetsInventoryRateLimiter{
log: log,
methods: methods,
}
}

// Limits the rate of the method calls defined in the methods map.
func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string, req any) {
limiter := rl.methods[method]
if limiter != nil {
err := limiter.Wait(ctx)
if err != nil {
rl.log.Errorf("Failed to wait for project quota on method: %s, request: %v, error: %v", method, req, err)
}
}
}

// Returns a grpc.DialOption that intercepts the unary RPCs and limits the rate of the method calls.
func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
rl.Wait(ctx, method, req)
return invoker(ctx, method, req, reply, cc, opts...)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"context"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/stretchr/testify/suite"
"golang.org/x/time/rate"
)

type RateLimiterTestSuite struct {
suite.Suite
logger *logp.Logger
rateLimiter *AssetsInventoryRateLimiter
}

func TestInventoryRateLimiterTestSuite(t *testing.T) {
suite.Run(t, new(RateLimiterTestSuite))
}

func (s *RateLimiterTestSuite) SetupTest() {
s.logger = logp.NewLogger("test")
s.rateLimiter = NewAssetsInventoryRateLimiter(s.logger)
}

func (s *RateLimiterTestSuite) TestRateLimiterWait() {
ctx := context.Background()
duration := time.Millisecond
s.rateLimiter.methods = map[string]*rate.Limiter{
"someMethod": rate.NewLimiter(rate.Every(duration/1), 1), // 1 request per duration
}

totalRequests := 5
startTime := time.Now()
for i := 0; i < totalRequests; i++ {
s.rateLimiter.Wait(ctx, "someMethod", nil)
}
endTime := time.Now()

actualDuration := endTime.Sub(startTime)
minDuration := duration * time.Duration((totalRequests - 1)) // 1st request is instant, 2nd and above wait 1duration each
s.GreaterOrEqual(actualDuration, minDuration)
}
40 changes: 40 additions & 0 deletions internal/resources/providers/gcplib/inventory/map_cache.go
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have a cache utility but it is used for single values and is cycle-aware

this cache just abstracts the repetitive read/write using a plain map would require and instead takes a function to get a value which will be used for initial read and assignment.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"sync"
)

type MapCache[T any] struct {
results sync.Map
}

func (c *MapCache[T]) Get(fn func() T, key string) T {
if value, ok := c.results.Load(key); ok {
return value.(T)
}

value := fn()
c.results.Store(key, value)
return value
}

func NewMapCache[T any]() *MapCache[T] {
return &MapCache[T]{}
}
63 changes: 63 additions & 0 deletions internal/resources/providers/gcplib/inventory/map_cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

type MockFunction struct {
mock.Mock
}

func (m *MockFunction) GetSomeValue() int {
m.Called()
return 0
}

func TestMapCacheGet(t *testing.T) {
cache := NewMapCache[int]()

// Test getting existing value from cache
cache.results.Store("key1", 42)
mockFunction := new(MockFunction)
result := cache.Get(mockFunction.GetSomeValue, "key1")
mockFunction.AssertNotCalled(t, "GetSomeValue")
assert.Equal(t, 42, result)

// Test getting non-existing value from cache
mockFunction.On("GetSomeValue").Return(mockFunction.GetSomeValue())
result = cache.Get(mockFunction.GetSomeValue, "key2")
mockFunction.AssertNumberOfCalls(t, "GetSomeValue", 2) // 1 by Return(), 2nd by cache.Get()
assert.Equal(t, 0, result)

// Test concurrent accesses
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cache.Get(func() int { return 1 }, "concurrent_key")
}()
}
wg.Wait()
}
Loading
Loading