diff --git a/orchestrator/lambdas/layers/heimdall_repos/heimdall_repos/bitbucket_utils.py b/orchestrator/lambdas/layers/heimdall_repos/heimdall_repos/bitbucket_utils.py index ea41bbe3..e8a35b73 100644 --- a/orchestrator/lambdas/layers/heimdall_repos/heimdall_repos/bitbucket_utils.py +++ b/orchestrator/lambdas/layers/heimdall_repos/heimdall_repos/bitbucket_utils.py @@ -1,5 +1,5 @@ # pylint: disable=no-name-in-module,no-member -from typing import Tuple +from typing import Tuple, Optional import requests import time @@ -13,6 +13,7 @@ from heimdall_utils.env import APPLICATION from heimdall_utils.utils import JSONUtils, ScanOptions, ServiceInfo, parse_timestamp from heimdall_utils.variables import REV_PROXY_DOMAIN_SUBSTRING, REV_PROXY_SECRET_HEADER +from heimdall_utils.utils import HeimdallException log = Logger(service=APPLICATION, name="ProcessBitbucketRepos", child=True) @@ -73,32 +74,59 @@ def query(self) -> list: self.service_info.url, self.service_info.org, self.service_info.repo_cursor ) - response_text = self._query_bitbucket_api(query) - resp = self.json_utils.get_json_from_response(response_text) - if not resp: - return [] + try: + response_text = self._query_bitbucket_api(query) - nodes = resp.get("values", []) - if not nodes and "slug" in resp: - nodes = [resp] + resp = self.json_utils.get_json_from_response(response_text) + if not resp: + return [] - repos = self._process_repos(nodes) + nodes = resp.get("values", []) + if not nodes and "slug" in resp: + nodes = [resp] - if self.service_helper.has_next_page(resp): - cursor = self.service_helper.get_cursor(resp) - # Re-queue this org, setting the cursor for the next page of the query - log.info("Queueing next page of repos %s to re-start at cursor %s", self.service_info.org, cursor) - queue_service_and_org( - self.queue, - self.service_info.service, - self.service_info.org, - {"cursor": cursor}, - self.scan_options.default_branch_only, - self.scan_options.plugins, - self.scan_options.batch_id, - self.redundant_scan_query, - ) - return repos + repos = self._process_repos(nodes) + + if self.service_helper.has_next_page(resp): + cursor = self.service_helper.get_cursor(resp) + # Re-queue this org, setting the cursor for the next page of the query + log.info("Queueing next page of repos %s to re-start at cursor %s", self.service_info.org, cursor) + queue_service_and_org( + self.queue, + self.service_info.service, + self.service_info.org, + {"cursor": cursor}, + self.scan_options.default_branch_only, + self.scan_options.plugins, + self.scan_options.batch_id, + self.redundant_scan_query, + ) + return repos + except HeimdallException: + log.info("Rate limit hit. Requeuing task") + if self.scan_options.repo: + queue_branch_and_repo( + queue=self.queue, + service=self.service_info.service, + org_name=self.service_info.org, + branch_cursor=self.service_info.branch_cursor, + repo=self.scan_options.repo, + plugins=self.scan_options.plugins, + batch_id=self.scan_options.batch_id, + redundant_scan_query=self.redundant_scan_query, + ) + else: + queue_service_and_org( + queue=self.queue, + service=self.service_info.service, + org_name=self.service_info.org, + page=self.service_info.repo_cursor, + default_branch_only=self.scan_options.default_branch_only, + plugins=self.scan_options.plugins, + batch_id=self.scan_options.batch_id, + redundant_scan_query=self.redundant_scan_query, + ) + return [] def _process_repos(self, nodes: list) -> list: """ @@ -239,7 +267,6 @@ def _get_default_branch(self, repo_name: str, repo_dict: dict) -> str: url=self.service_info.url, org=self.service_info.org, repo=repo_name ) response = self._query_bitbucket_api(url) - response = self.json_utils.get_json_from_response(response) if not response: log.warning( "Unable to retrieve Default Branch for repo: %s/%s", @@ -250,6 +277,7 @@ def _get_default_branch(self, repo_name: str, repo_dict: dict) -> str: default_branch = "HEAD" return default_branch + response = self.json_utils.get_json_from_response(response) default_branch = self.service_helper.get_default_branch_name(response) return default_branch @@ -279,7 +307,7 @@ def _get_branch_timestamp(self, repo: str, ref: dict): return timestamp - def _query_bitbucket_api(self, url: str) -> str or None: + def _query_bitbucket_api(self, url: str) -> Optional[str]: with requests.session() as session: headers = { "Authorization": f"Basic {self.service_info.api_key}", @@ -289,8 +317,11 @@ def _query_bitbucket_api(self, url: str) -> str or None: headers[REV_PROXY_SECRET_HEADER] = GetProxySecret() response = session.get(url=url, headers=headers) if response.status_code == 429: - log.error("Error retrieving Bitbucket query. Rate Limit Reached") - raise requests.HTTPError("Bitbucket Rate Limit Reached") + log.warning("Error retrieving Bitbucket query. Rate Limit Reached") + raise HeimdallException("Bitbucket Rate Limit Reached") + if response.status_code == 204: + log.warning("No data was returned for the Bitbucket Query: %s", url) + return None if response.status_code != 200: log.error("Error retrieving Bitbucket query: %s, Error Code: %s", url, response.status_code) return None diff --git a/orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py b/orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py index 92cdad7f..784c6cbf 100644 --- a/orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py +++ b/orchestrator/lambdas/repo_scan/repo_scan/repo_scan.py @@ -1,7 +1,6 @@ # pylint: disable=no-name-in-module, no-member import json import os -import warnings from collections import namedtuple from datetime import datetime from typing import Any, Optional @@ -11,7 +10,6 @@ from heimdall_utils.aws_utils import get_analyzer_api_key, send_analyzer_request from heimdall_utils.utils import JSONUtils, get_ttl_expiration from heimdall_utils.env import APPLICATION -from heimdall_utils.metrics.factory import get_metrics from repo_scan.aws_connect import ( batch_update_db, delete_processed_messages, @@ -20,25 +18,19 @@ send_sqs_message, ) -# At the end of the lambda execution, metrics will be published -# Adding this filter to Ignore warnings for empty metrics -warnings.filterwarnings("ignore", "No application metrics to publish*") - ARTEMIS_API = os.environ.get("ARTEMIS_API") API_KEY_LOC = os.environ.get("ARTEMIS_API_KEY") DEFAULT_PLUGINS = ["gitsecrets", "trufflehog", "base_images"] PROCESSED_MESSAGES = namedtuple("processed_messages", ["repos", "receipt_handles"]) -REPO_QUEUE = os.environ.get("REPO_QUEUE") -REPO_DLQ = os.environ.get("REPO_DEAD_LETTER_QUEUE") +REPO_QUEUE = os.environ.get("REPO_QUEUE", "") +REPO_DLQ = os.environ.get("REPO_DEAD_LETTER_QUEUE", "") SCAN_DEPTH = int(os.environ.get("SCAN_DEPTH", 1)) SCAN_TABLE_NAME = os.environ.get("SCAN_TABLE") or "" log = Logger(service=APPLICATION, name="repo_scan") json_utils = JSONUtils(log) -metrics = get_metrics() -@metrics.log_metrics() @log.inject_lambda_context def run(event: dict[str, Any] = None, context: LambdaContext = None, size: int = 20) -> Optional[list[dict[str, Any]]]: # Get the size of the REPO_QUEUE @@ -121,13 +113,11 @@ def submit_repos(repos: list, analyzer_url: str, api_key: str) -> list: def requeue_failed_repos(service: str, repo_lookup: dict[str, Any], failed_repos: list): """ - Send failed repos to the repo-dead-letter SQS queue + Send failed repos to the repo-dead-letter SQS queue or the repo SQS queue """ - repos_to_queue = [] - index = 0 + failed_repo_lst = [] + rate_limit_lst = [] count = 0 - if failed_repos != None: - metrics.add_metric(name="failed_repositories.count", value=len(failed_repos), version_control_service=service) for failed_repo in failed_repos: error_msg = failed_repo.get("error", "") if error_msg.startswith("Could not resolve to a Repository with the name"): @@ -137,20 +127,28 @@ def requeue_failed_repos(service: str, repo_lookup: dict[str, Any], failed_repos repo_info["service"] = service if not repo_info: continue - repos_to_queue.append({"Id": str(index), "MessageBody": json.dumps(repo_info)}) - index += 1 - if index >= 10: - if not send_sqs_message(REPO_DLQ, repos_to_queue): - log.error("There was an error queueing the repos, aborting.") - return - count += index - index = 0 - repos_to_queue = [] - if index > 0: - count += index - send_sqs_message(REPO_DLQ, repos_to_queue) + + sqs_message = {"Id": str(count), "MessageBody": json.dumps(repo_info)} + if error_msg.startswith("Rate limit") and service in ["bitbucket"]: + rate_limit_lst.append(sqs_message) + else: + failed_repo_lst.append(sqs_message) + + count += 1 log.info(f"Sending {count} repos to the repo-deadletter Queue") + send_to_sqs(REPO_QUEUE, failed_repo_lst) + send_to_sqs(REPO_DLQ, rate_limit_lst) + + +def send_to_sqs(queue: str, repos: list): + chunk_size = 10 # submit 10 messages at a time + if len(repos) == 0: + return + + for i in range(0, len(repos), chunk_size): + repo_subset = repos[i : i + chunk_size] + send_sqs_message(queue, repo_subset) def get_repo_scan_items(service, response, date=None): @@ -193,14 +191,7 @@ def construct_repo_requests(repos: list) -> dict: "batch_id": repo.get("batch_id"), "depth": SCAN_DEPTH, } - metrics.add_metric( - name="queued_repositories.count", - value=1, - repository=repo["repo"], - batch_id=repo.get("batch_id"), - organization_name=repo.get("org"), - version_control_service=service, - ) + if "branch" in repo and repo["branch"] != "HEAD": req["branch"] = repo["branch"] reqs[service].append(req)