Skip to content

Commit

Permalink
add rate limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
orouz committed Apr 7, 2024
1 parent 61b21ec commit a67d308
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 28 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,13 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/term v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/time v0.5.0
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto v0.0.0-20240228224816-df926f6c8641 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240311132316-a219d84964c2 // indirect
google.golang.org/grpc v1.62.1 // indirect
google.golang.org/grpc v1.62.1
google.golang.org/protobuf v1.33.0
gopkg.in/cheggaaa/pb.v1 v1.0.28 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
76 changes: 76 additions & 0 deletions internal/resources/providers/gcplib/inventory/grpc_rate_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"context"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/googleapis/gax-go/v2"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

// a gax.CallOption that defines a retry strategy which retries the request on ResourceExhausted error.
var RetryOnResourceExhausted = gax.WithRetry(func() gax.Retryer {
return gax.OnCodes([]codes.Code{codes.ResourceExhausted}, gax.Backoff{
Initial: 1 * time.Second,
Max: 1 * time.Minute,
Multiplier: 1.2,
})
})

type AssetsInventoryRateLimiter struct {
methods map[string]*rate.Limiter
log *logp.Logger
}

// a map of asset inventory client methods and their quotas.
// see https://cloud.google.com/asset-inventory/docs/quota
var methods = map[string]*rate.Limiter{
// using per-project quota suffices for both single-account and organization-account, because it's more restrictive.
"/google.cloud.asset.v1.AssetService/ListAssets": rate.NewLimiter(rate.Every(time.Minute/100), 1),
}

func NewAssetsInventoryRateLimiter(log *logp.Logger) *AssetsInventoryRateLimiter {
return &AssetsInventoryRateLimiter{
log: log,
methods: methods,
}
}

// Limits the rate of the method calls defined in the methods map.
func (rl *AssetsInventoryRateLimiter) Wait(ctx context.Context, method string, req any) {
limiter := rl.methods[method]
if limiter != nil {
err := limiter.Wait(ctx)
if err != nil {
rl.log.Errorf("Failed to wait for project quota on method: %s, request: %v, error: %w", method, req, err)
}
}
}

// Returns a grpc.DialOption that intercepts the unary RPCs and limits the rate of the method calls.
func (rl *AssetsInventoryRateLimiter) GetInterceptorDialOption() grpc.DialOption {
return grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
rl.Wait(ctx, method, req)
return invoker(ctx, method, req, reply, cc, opts...)
})
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package inventory

import (
"context"
"fmt"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/stretchr/testify/suite"
"golang.org/x/time/rate"
)

type RateLimiterTestSuite struct {
suite.Suite
logger *logp.Logger
rateLimiter *AssetsInventoryRateLimiter
}

func TestInventoryRateLimiterTestSuite(t *testing.T) {
suite.Run(t, new(RateLimiterTestSuite))
}

func (s *RateLimiterTestSuite) SetupTest() {
s.logger = logp.NewLogger("test")
s.rateLimiter = NewAssetsInventoryRateLimiter(s.logger)
}

func (s *RateLimiterTestSuite) TestRateLimiterWait() {
ctx := context.Background()
duration := time.Millisecond
s.rateLimiter.methods = map[string]*rate.Limiter{
"someMethod": rate.NewLimiter(rate.Every(duration/1), 1), // 1 request per duration
}

totalRequests := 5
startTime := time.Now()
for i := 0; i < totalRequests; i++ {
s.rateLimiter.Wait(ctx, "someMethod", nil)
fmt.Println("request", i)
}
endTime := time.Now()

actualDuration := endTime.Sub(startTime)
minDuration := duration * time.Duration((totalRequests - 1)) // 1st request is instant, 2nd and above wait 1duration each
s.Assert().True(actualDuration >= minDuration, fmt.Sprintf("expected %v to be greater or equal than %v", actualDuration, minDuration))
}
56 changes: 30 additions & 26 deletions internal/resources/providers/gcplib/inventory/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,17 @@ type ProviderInitializerAPI interface {
}

func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpConfig auth.GcpFactoryConfig) (ServiceAPI, error) {
limiter := NewAssetsInventoryRateLimiter(log)
// initialize GCP assets inventory client
client, err := asset.NewClient(ctx, gcpConfig.ClientOpts...)
client, err := asset.NewClient(ctx, append(gcpConfig.ClientOpts, option.WithGRPCDialOption(limiter.GetInterceptorDialOption()))...)
if err != nil {
return nil, err
}
// wrap the assets inventory client for mocking
assetsInventoryWrapper := &AssetsInventoryWrapper{
Close: client.Close,
ListAssets: func(ctx context.Context, req *assetpb.ListAssetsRequest, opts ...gax.CallOption) Iterator {
return client.ListAssets(ctx, req, opts...)
return client.ListAssets(ctx, req, append(opts, RetryOnResourceExhausted)...)
},
}

Expand Down Expand Up @@ -180,30 +181,26 @@ func (p *ProviderInitializer) Init(ctx context.Context, log *logp.Logger, gcpCon
}

func (p *Provider) ListAllAssetTypesByName(ctx context.Context, assetTypes []string) ([]*ExtendedGcpAsset, error) {
p.log.Infof("Listing GCP asset types: %v in %v", assetTypes, p.config.Parent)

wg := sync.WaitGroup{}
var resourceAssets []*assetpb.Asset
var policyAssets []*assetpb.Asset

wg.Add(1)
go func() {
request := &assetpb.ListAssetsRequest{
resourceAssets = p.getAllAssets(ctx, &assetpb.ListAssetsRequest{
Parent: p.config.Parent,
AssetTypes: assetTypes,
ContentType: assetpb.ContentType_RESOURCE,
}
resourceAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request))
})
wg.Done()
}()
wg.Add(1)
go func() {
request := &assetpb.ListAssetsRequest{
policyAssets = p.getAllAssets(ctx, &assetpb.ListAssetsRequest{
Parent: p.config.Parent,
AssetTypes: assetTypes,
ContentType: assetpb.ContentType_IAM_POLICY,
}
policyAssets = getAllAssets(p.log, p.inventory.ListAssets(ctx, request))
})
wg.Done()
}()

Expand Down Expand Up @@ -308,18 +305,16 @@ func (p *Provider) enrichNetworkAssets(ctx context.Context, assets []*ExtendedGc
p.log.Infof("no %s assets were listed", ComputeNetworkAssetType)
return
}

dnsPolicyAssets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{
dnsPolicyAssets := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{
Parent: p.config.Parent,
AssetTypes: []string{DnsPolicyAssetType},
ContentType: assetpb.ContentType_RESOURCE,
}))
})

if len(dnsPolicyAssets) == 0 {
p.log.Infof("no %s assets were listed, return original assets", DnsPolicyAssetType)
return
}

dnsPolicies := decodeDnsPolicies(dnsPolicyAssets)

p.log.Infof("attempting to enrich %d %s assets with dns policy", len(assets), ComputeNetworkAssetType)
Expand Down Expand Up @@ -402,19 +397,21 @@ func getAssetsByProject[T any](assets []*ExtendedGcpAsset, log *logp.Logger, f T
return enrichedAssets
}

func getAllAssets(log *logp.Logger, it Iterator) []*assetpb.Asset {
func (p *Provider) getAllAssets(ctx context.Context, request *assetpb.ListAssetsRequest) []*assetpb.Asset {
p.log.Infof("Listing asset types: %v of type %v for %v", request.AssetTypes, request.ContentType, request.Parent)
results := make([]*assetpb.Asset, 0)
it := p.inventory.ListAssets(ctx, request)
for {
response, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Errorf("Error fetching GCP Asset: %s", err)
return nil
p.log.Errorf("Error fetching GCP Asset: %s", err)
return results
}

log.Debugf("Fetched GCP Asset: %+v", response.Name)
p.log.Debugf("Fetched GCP Asset: %+v", response.Name)
results = append(results, response)
}
return results
Expand Down Expand Up @@ -466,35 +463,42 @@ func extendWithECS(ctx context.Context, crm *ResourceManagerWrapper, cache map[s
}

func (p *Provider) ListProjectsAncestorsPolicies(ctx context.Context) ([]*ProjectPoliciesAsset, error) {
projects := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{
projects := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{
ContentType: assetpb.ContentType_IAM_POLICY,
Parent: p.config.Parent,
AssetTypes: []string{CrmProjectAssetType},
}))

})
p.log.Infof("Listed %d GCP projects", len(projects))
ancestorsPolicies := map[string][]*ExtendedGcpAsset{}
return lo.Map(projects, func(project *assetpb.Asset, _ int) *ProjectPoliciesAsset {
projectAsset := extendWithECS(ctx, p.crm, p.crmCache, []*assetpb.Asset{project})[0]
// Skip first ancestor it as we already got it
policiesAssets := append([]*ExtendedGcpAsset{projectAsset}, getAncestorsAssets(ctx, p, project.Ancestors[1:])...)
policiesAssets := append([]*ExtendedGcpAsset{projectAsset}, getAncestorsAssets(ctx, ancestorsPolicies, p, project.Ancestors[1:])...)
return &ProjectPoliciesAsset{CloudAccount: projectAsset.CloudAccount, Policies: policiesAssets}
}), nil
}

func getAncestorsAssets(ctx context.Context, p *Provider, ancestors []string) []*ExtendedGcpAsset {
func getAncestorsAssets(ctx context.Context, ancestorsPolicies map[string][]*ExtendedGcpAsset, p *Provider, ancestors []string) []*ExtendedGcpAsset {
return lo.Flatten(lo.Map(ancestors, func(parent string, _ int) []*ExtendedGcpAsset {
if ancestorsPolicies[parent] != nil {
return ancestorsPolicies[parent]
}
var assetType string
if strings.HasPrefix(parent, "folders") {
assetType = CrmFolderAssetType
}
if strings.HasPrefix(parent, "organizations") {
assetType = CrmOrgAssetType
}
assets := getAllAssets(p.log, p.inventory.ListAssets(ctx, &assetpb.ListAssetsRequest{

assets := p.getAllAssets(ctx, &assetpb.ListAssetsRequest{
ContentType: assetpb.ContentType_IAM_POLICY,
Parent: parent,
AssetTypes: []string{assetType},
}))
return extendWithECS(ctx, p.crm, p.crmCache, assets)
})
extendedAssets := extendWithECS(ctx, p.crm, p.crmCache, assets)
ancestorsPolicies[parent] = extendedAssets
return extendedAssets
}))
}

Expand Down

0 comments on commit a67d308

Please sign in to comment.