From a67d308b2227cdfee418243ba228ceaddf2ae0fb Mon Sep 17 00:00:00 2001 From: Or Ouziel Date: Tue, 2 Apr 2024 17:34:50 +0300 Subject: [PATCH] add rate limiter --- go.mod | 4 +- .../gcplib/inventory/grpc_rate_limiter.go | 76 +++++++++++++++++++ .../inventory/grpc_rate_limiter_test.go | 64 ++++++++++++++++ .../providers/gcplib/inventory/provider.go | 56 +++++++------- 4 files changed, 172 insertions(+), 28 deletions(-) create mode 100644 internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go create mode 100644 internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go diff --git a/go.mod b/go.mod index 4483fc29d3..2aabe1c125 100644 --- a/go.mod +++ b/go.mod @@ -474,13 +474,13 @@ require ( golang.org/x/sys v0.18.0 // indirect golang.org/x/term v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/time v0.5.0 // indirect + golang.org/x/time v0.5.0 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.8 // indirect google.golang.org/genproto v0.0.0-20240228224816-df926f6c8641 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect - google.golang.org/grpc v1.62.1 // indirect + google.golang.org/grpc v1.62.1 google.golang.org/protobuf v1.33.0 gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go new file mode 100644 index 0000000000..1ec9f48753 --- /dev/null +++ b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inventory + +import ( + "context" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/googleapis/gax-go/v2" + "golang.org/x/time/rate" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +// a gax.CallOption that defines a retry strategy which retries the request on ResourceExhausted error. +var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer { + return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{ + Initial: 1 * time.Second, + Max: 1 * time.Minute, + Multiplier: 1.2, + }) +}) + +type AssetsInventoryRateLimiter struct { + methods map[string]*rate.Limiter + log *logp.Logger +} + +// a map of asset inventory client methods and their quotas. +// see https://cloud.google.com/asset-inventory/docs/quota +var methods = map[string]*rate.Limiter{ + // using per-project quota suffices for both single-account and organization-account, because it's more restrictive. + "/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1), +} + +func NewAssetsInventoryRateLimiter(log *logp.Logger) *AssetsInventoryRateLimiter { + return &AssetsInventoryRateLimiter{ + log: log, + methods: methods, + } +} + +// Limits the rate of the method calls defined in the methods map. +func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string, req any) { + limiter := rl.methods[method] + if limiter != nil { + err := limiter.Wait(ctx) + if err != nil { + rl.log.Errorf("Failed to wait for project quota on method: %s, request: %v, error: %w", method, req, err) + } + } +} + +// Returns a grpc.DialOption that intercepts the unary RPCs and limits the rate of the method calls. +func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption { + return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + rl.Wait(ctx, method, req) + return invoker(ctx, method, req, reply, cc, opts...) + }) +} diff --git a/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go new file mode 100644 index 0000000000..b1cb58df78 --- /dev/null +++ b/internal/resources/providers/gcplib/inventory/grpc_rate_limiter_test.go @@ -0,0 +1,64 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package inventory + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/stretchr/testify/suite" + "golang.org/x/time/rate" +) + +type RateLimiterTestSuite struct { + suite.Suite + logger *logp.Logger + rateLimiter *AssetsInventoryRateLimiter +} + +func TestInventoryRateLimiterTestSuite(t *testing.T) { + suite.Run(t, new(RateLimiterTestSuite)) +} + +func (s *RateLimiterTestSuite) SetupTest() { + s.logger = logp.NewLogger("test") + s.rateLimiter = NewAssetsInventoryRateLimiter(s.logger) +} + +func (s *RateLimiterTestSuite) TestRateLimiterWait() { + ctx := context.Background() + duration := time.Millisecond + s.rateLimiter.methods = map[string]*rate.Limiter{ + "someMethod": rate.NewLimiter(rate.Every(duration/1), 1), // 1 request per duration + } + + totalRequests := 5 + startTime := time.Now() + for i := 0; i < totalRequests; i++ { + s.rateLimiter.Wait(ctx, "someMethod", nil) + fmt.Println("request", i) + } + endTime := time.Now() + + actualDuration := endTime.Sub(startTime) + minDuration := duration * time.Duration((totalRequests - 1)) // 1st request is instant, 2nd and above wait 1duration each + s.Assert().True(actualDuration >= minDuration, fmt.Sprintf("expected %v to be greater or equal than %v", actualDuration, minDuration)) +} diff --git a/internal/resources/providers/gcplib/inventory/provider.go b/internal/resources/providers/gcplib/inventory/provider.go index 071f2668be..e2eaa5a9c1 100644 --- a/internal/resources/providers/gcplib/inventory/provider.go +++ b/internal/resources/providers/gcplib/inventory/provider.go @@ -130,8 +130,9 @@ type ProviderInitializerAPI interface { } func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) { + limiter := NewAssetsInventoryRateLimiter(log) // initialize GCP assets inventory client - client, err := asset.NewClient(ctx, gcpConfig.ClientOpts...) + client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...) if err != nil { return nil, err } @@ -139,7 +140,7 @@ func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpCon assetsInventoryWrapper := &AssetsInventoryWrapper{ Close: client.Close, ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator { - return client.ListAssets(ctx, req, opts...) + return client.ListAssets(ctx, req, append(opts, RetryOnResourceExhausted)...) }, } @@ -180,30 +181,26 @@ func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpCon } func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []string) ([]*ExtendedGcpAsset, error) { - p.log.Infof("Listing GCP asset types: %v in %v", assetTypes, p.config.Parent) - wg := sync.WaitGroup{} var resourceAssets []*assetpb.Asset var policyAssets []*assetpb.Asset wg.Add(1) go func() { - request := &assetpb.ListAssetsRequest{ + resourceAssets = p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: assetTypes, ContentType: assetpb.ContentType_RESOURCE, - } - resourceAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request)) + }) wg.Done() }() wg.Add(1) go func() { - request := &assetpb.ListAssetsRequest{ + policyAssets = p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: assetTypes, ContentType: assetpb.ContentType_IAM_POLICY, - } - policyAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request)) + }) wg.Done() }() @@ -308,18 +305,16 @@ func (p *Provider) enrichNetworkAssets(ctx context.Context, assets []*ExtendedGc p.log.Infof("no %s assets were listed", ComputeNetworkAssetType) return } - - dnsPolicyAssets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ + dnsPolicyAssets := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ Parent: p.config.Parent, AssetTypes: []string{DnsPolicyAssetType}, ContentType: assetpb.ContentType_RESOURCE, - })) + }) if len(dnsPolicyAssets) == 0 { p.log.Infof("no %s assets were listed, return original assets", DnsPolicyAssetType) return } - dnsPolicies := decodeDnsPolicies(dnsPolicyAssets) p.log.Infof("attempting to enrich %d %s assets with dns policy", len(assets), ComputeNetworkAssetType) @@ -402,19 +397,21 @@ func getAssetsByProject[T any](assets []*ExtendedGcpAsset, log *logp.Logger, f T return enrichedAssets } -func getAllAssets(log *logp.Logger, it Iterator) []*assetpb.Asset { +func (p *Provider) getAllAssets(ctx context.Context, request *assetpb.ListAssetsRequest) []*assetpb.Asset { + p.log.Infof("Listing asset types: %v of type %v for %v", request.AssetTypes, request.ContentType, request.Parent) results := make([]*assetpb.Asset, 0) + it := p.inventory.ListAssets(ctx, request) for { response, err := it.Next() if err == iterator.Done { break } if err != nil { - log.Errorf("Error fetching GCP Asset: %s", err) - return nil + p.log.Errorf("Error fetching GCP Asset: %s", err) + return results } - log.Debugf("Fetched GCP Asset: %+v", response.Name) + p.log.Debugf("Fetched GCP Asset: %+v", response.Name) results = append(results, response) } return results @@ -466,22 +463,26 @@ func extendWithECS(ctx context.Context, crm *ResourceManagerWrapper, cache map[s } func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context) ([]*ProjectPoliciesAsset, error) { - projects := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ + projects := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ ContentType: assetpb.ContentType_IAM_POLICY, Parent: p.config.Parent, AssetTypes: []string{CrmProjectAssetType}, - })) - + }) + p.log.Infof("Listed %d GCP projects", len(projects)) + ancestorsPolicies := map[string][]*ExtendedGcpAsset{} return lo.Map(projects, func(project *assetpb.Asset, _ int) *ProjectPoliciesAsset { projectAsset := extendWithECS(ctx, p.crm, p.crmCache, []*assetpb.Asset{project})[0] // Skip first ancestor it as we already got it - policiesAssets := append([]*ExtendedGcpAsset{projectAsset}, getAncestorsAssets(ctx, p, project.Ancestors[1:])...) + policiesAssets := append([]*ExtendedGcpAsset{projectAsset}, getAncestorsAssets(ctx, ancestorsPolicies, p, project.Ancestors[1:])...) return &ProjectPoliciesAsset{CloudAccount: projectAsset.CloudAccount, Policies: policiesAssets} }), nil } -func getAncestorsAssets(ctx context.Context, p *Provider, ancestors []string) []*ExtendedGcpAsset { +func getAncestorsAssets(ctx context.Context, ancestorsPolicies map[string][]*ExtendedGcpAsset, p *Provider, ancestors []string) []*ExtendedGcpAsset { return lo.Flatten(lo.Map(ancestors, func(parent string, _ int) []*ExtendedGcpAsset { + if ancestorsPolicies[parent] != nil { + return ancestorsPolicies[parent] + } var assetType string if strings.HasPrefix(parent, "folders") { assetType = CrmFolderAssetType @@ -489,12 +490,15 @@ func getAncestorsAssets(ctx context.Context, p *Provider, ancestors []string) [] if strings.HasPrefix(parent, "organizations") { assetType = CrmOrgAssetType } - assets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{ + + assets := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{ ContentType: assetpb.ContentType_IAM_POLICY, Parent: parent, AssetTypes: []string{assetType}, - })) - return extendWithECS(ctx, p.crm, p.crmCache, assets) + }) + extendedAssets := extendWithECS(ctx, p.crm, p.crmCache, assets) + ancestorsPolicies[parent] = extendedAssets + return extendedAssets })) }