Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reqeueue bitbucket cloud repositories that hit rate limits #313

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# pylint: disable=no-name-in-module,no-member
from typing import Tuple
from typing import Tuple, Optional

import requests
import time
Expand All @@ -13,6 +13,7 @@
from heimdall_utils.env import APPLICATION
from heimdall_utils.utils import JSONUtils, ScanOptions, ServiceInfo, parse_timestamp
from heimdall_utils.variables import REV_PROXY_DOMAIN_SUBSTRING, REV_PROXY_SECRET_HEADER
from heimdall_utils.utils import HeimdallException

log = Logger(service=APPLICATION, name="ProcessBitbucketRepos", child=True)

Expand Down Expand Up @@ -73,32 +74,59 @@ def query(self) -> list:
self.service_info.url, self.service_info.org, self.service_info.repo_cursor
)

response_text = self._query_bitbucket_api(query)
resp = self.json_utils.get_json_from_response(response_text)
if not resp:
return []
try:
response_text = self._query_bitbucket_api(query)

nodes = resp.get("values", [])
if not nodes and "slug" in resp:
nodes = [resp]
resp = self.json_utils.get_json_from_response(response_text)
if not resp:
return []

repos = self._process_repos(nodes)
nodes = resp.get("values", [])
if not nodes and "slug" in resp:
nodes = [resp]

if self.service_helper.has_next_page(resp):
cursor = self.service_helper.get_cursor(resp)
# Re-queue this org, setting the cursor for the next page of the query
log.info("Queueing next page of repos %s to re-start at cursor %s", self.service_info.org, cursor)
queue_service_and_org(
self.queue,
self.service_info.service,
self.service_info.org,
{"cursor": cursor},
self.scan_options.default_branch_only,
self.scan_options.plugins,
self.scan_options.batch_id,
self.redundant_scan_query,
)
return repos
repos = self._process_repos(nodes)

if self.service_helper.has_next_page(resp):
cursor = self.service_helper.get_cursor(resp)
# Re-queue this org, setting the cursor for the next page of the query
log.info("Queueing next page of repos %s to re-start at cursor %s", self.service_info.org, cursor)
queue_service_and_org(
self.queue,
self.service_info.service,
self.service_info.org,
{"cursor": cursor},
self.scan_options.default_branch_only,
self.scan_options.plugins,
self.scan_options.batch_id,
self.redundant_scan_query,
)
return repos
except HeimdallException:
log.info("Rate limit hit. Requeuing task")
if self.scan_options.repo:
queue_branch_and_repo(
queue=self.queue,
service=self.service_info.service,
org_name=self.service_info.org,
branch_cursor=self.service_info.branch_cursor,
repo=self.scan_options.repo,
plugins=self.scan_options.plugins,
batch_id=self.scan_options.batch_id,
redundant_scan_query=self.redundant_scan_query,
)
else:
queue_service_and_org(
queue=self.queue,
service=self.service_info.service,
org_name=self.service_info.org,
page=self.service_info.repo_cursor,
default_branch_only=self.scan_options.default_branch_only,
plugins=self.scan_options.plugins,
batch_id=self.scan_options.batch_id,
redundant_scan_query=self.redundant_scan_query,
)
return []

def _process_repos(self, nodes: list) -> list:
"""
Expand Down Expand Up @@ -239,7 +267,6 @@ def _get_default_branch(self, repo_name: str, repo_dict: dict) -> str:
url=self.service_info.url, org=self.service_info.org, repo=repo_name
)
response = self._query_bitbucket_api(url)
response = self.json_utils.get_json_from_response(response)
if not response:
log.warning(
"Unable to retrieve Default Branch for repo: %s/%s",
Expand All @@ -250,6 +277,7 @@ def _get_default_branch(self, repo_name: str, repo_dict: dict) -> str:
default_branch = "HEAD"
return default_branch

response = self.json_utils.get_json_from_response(response)
default_branch = self.service_helper.get_default_branch_name(response)
return default_branch

Expand Down Expand Up @@ -279,7 +307,7 @@ def _get_branch_timestamp(self, repo: str, ref: dict):

return timestamp

def _query_bitbucket_api(self, url: str) -> str or None:
def _query_bitbucket_api(self, url: str) -> Optional[str]:
with requests.session() as session:
headers = {
"Authorization": f"Basic {self.service_info.api_key}",
Expand All @@ -289,8 +317,11 @@ def _query_bitbucket_api(self, url: str) -> str or None:
headers[REV_PROXY_SECRET_HEADER] = GetProxySecret()
response = session.get(url=url, headers=headers)
if response.status_code == 429:
log.error("Error retrieving Bitbucket query. Rate Limit Reached")
raise requests.HTTPError("Bitbucket Rate Limit Reached")
log.warning("Error retrieving Bitbucket query. Rate Limit Reached")
raise HeimdallException("Bitbucket Rate Limit Reached")
if response.status_code == 204:
log.warning("No data was returned for the Bitbucket Query: %s", url)
return None
if response.status_code != 200:
log.error("Error retrieving Bitbucket query: %s, Error Code: %s", url, response.status_code)
return None
Expand Down
61 changes: 26 additions & 35 deletions orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# pylint: disable=no-name-in-module, no-member
import json
import os
import warnings
from collections import namedtuple
from datetime import datetime
from typing import Any, Optional
Expand All @@ -11,7 +10,6 @@
from heimdall_utils.aws_utils import get_analyzer_api_key, send_analyzer_request
from heimdall_utils.utils import JSONUtils, get_ttl_expiration
from heimdall_utils.env import APPLICATION
from heimdall_utils.metrics.factory import get_metrics
from repo_scan.aws_connect import (
batch_update_db,
delete_processed_messages,
Expand All @@ -20,25 +18,19 @@
send_sqs_message,
)

# At the end of the lambda execution, metrics will be published
# Adding this filter to Ignore warnings for empty metrics
warnings.filterwarnings("ignore", "No application metrics to publish*")

ARTEMIS_API = os.environ.get("ARTEMIS_API")
API_KEY_LOC = os.environ.get("ARTEMIS_API_KEY")
DEFAULT_PLUGINS = ["gitsecrets", "trufflehog", "base_images"]
PROCESSED_MESSAGES = namedtuple("processed_messages", ["repos", "receipt_handles"])
REPO_QUEUE = os.environ.get("REPO_QUEUE")
REPO_DLQ = os.environ.get("REPO_DEAD_LETTER_QUEUE")
REPO_QUEUE = os.environ.get("REPO_QUEUE", "")
REPO_DLQ = os.environ.get("REPO_DEAD_LETTER_QUEUE", "")
SCAN_DEPTH = int(os.environ.get("SCAN_DEPTH", 1))
SCAN_TABLE_NAME = os.environ.get("SCAN_TABLE") or ""

log = Logger(service=APPLICATION, name="repo_scan")
json_utils = JSONUtils(log)
metrics = get_metrics()


@metrics.log_metrics()
@log.inject_lambda_context
def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = 20) -> Optional[list[dict[str, Any]]]:
# Get the size of the REPO_QUEUE
Expand Down Expand Up @@ -121,13 +113,11 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list:

def requeue_failed_repos(service: str, repo_lookup: dict[str, Any], failed_repos: list):
"""
Send failed repos to the repo-dead-letter SQS queue
Send failed repos to the repo-dead-letter SQS queue or the repo SQS queue
"""
repos_to_queue = []
index = 0
failed_repo_lst = []
rate_limit_lst = []
count = 0
if failed_repos != None:
metrics.add_metric(name="failed_repositories.count", value=len(failed_repos), version_control_service=service)
for failed_repo in failed_repos:
error_msg = failed_repo.get("error", "")
if error_msg.startswith("Could not resolve to a Repository with the name"):
Expand All @@ -137,20 +127,28 @@ def requeue_failed_repos(service: str, repo_lookup: dict[str, Any], failed_repos
repo_info["service"] = service
if not repo_info:
continue
repos_to_queue.append({"Id": str(index), "MessageBody": json.dumps(repo_info)})
index += 1
if index >= 10:
if not send_sqs_message(REPO_DLQ, repos_to_queue):
log.error("There was an error queueing the repos, aborting.")
return
count += index
index = 0
repos_to_queue = []
if index > 0:
count += index
send_sqs_message(REPO_DLQ, repos_to_queue)

sqs_message = {"Id": str(count), "MessageBody": json.dumps(repo_info)}
if error_msg.startswith("Rate limit") and service in ["bitbucket"]:
rate_limit_lst.append(sqs_message)
else:
failed_repo_lst.append(sqs_message)

count += 1

log.info(f"Sending {count} repos to the repo-deadletter Queue")
send_to_sqs(REPO_QUEUE, failed_repo_lst)
send_to_sqs(REPO_DLQ, rate_limit_lst)


def send_to_sqs(queue: str, repos: list):
chunk_size = 10 # submit 10 messages at a time
if len(repos) == 0:
return

for i in range(0, len(repos), chunk_size):
repo_subset = repos[i : i + chunk_size]
send_sqs_message(queue, repo_subset)


def get_repo_scan_items(service, response, date=None):
Expand Down Expand Up @@ -193,14 +191,7 @@ def construct_repo_requests(repos: list) -> dict:
"batch_id": repo.get("batch_id"),
"depth": SCAN_DEPTH,
}
metrics.add_metric(
name="queued_repositories.count",
value=1,
repository=repo["repo"],
batch_id=repo.get("batch_id"),
organization_name=repo.get("org"),
version_control_service=service,
)

if "branch" in repo and repo["branch"] != "HEAD":
req["branch"] = repo["branch"]
reqs[service].append(req)
Expand Down
Loading