From 6ae04447503784bc25795747a9d5e9afc2719a06 Mon Sep 17 00:00:00 2001 From: Scott Esbrandt Date: Mon, 25 Mar 2024 16:11:36 -0400 Subject: [PATCH 1/2] feat: buid full tree with nested dep relationships --- README.md | 30 +++++-- sbom_to_snyk_depgraph/main.py | 157 ++++++++++++++++++++++++---------- 2 files changed, 135 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index 7b5637b..e7bdd69 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,20 @@ ![snyk-oss-category](https://github.com/snyk-labs/oss-images/blob/main/oss-example.jpg) # sbom-to-snyk-depgraph -convert cyclone dx sbom to snyk depgraph and monitor/test/print +Convert [CycloneDX](https://cyclonedx.org/) SBOM to snyk depgraph and monitor/test/print ``` Usage: main.py [OPTIONS] COMMAND [ARGS]... Options: - --sbom-file TEXT Full path to SBOM file [env var: SBOM_FILE] - --debug / --no-debug Set log level to debug [default: no-debug] - --help Show this message and exit. + --sbom-file TEXT Full path to SBOM file [env var: SBOM_FILE; + required] + --prune-repeated-subdependencies / --no-prune-repeated-subdependencies + Use if too many repeated sub dependencies + causes test or monitor to fail [default: + no-prune-repeated-subdependencies] + --debug / --no-debug Set log level to debug [default: no-debug] + --help Show this message and exit. Commands: monitor Monitor SBOM with Snyk @@ -17,6 +22,18 @@ Commands: test Test SBOM with Snyk ``` + ### Exit Codes + + - 2 = an error occurred during execution and did not complete successfully (see error message on stdout) + + When using `test` command + - 0 = test completed successfully and no security issues were found + - 1 = test completed successfully and 1 more security issues were found + + When using other commands (e.g. `monitor` or `print-graph`) + - 0 = command completed successfully + + # Setup Requires [Poetry](https://python-poetry.org/) for Python @@ -26,7 +43,7 @@ Commands: # Examples - ### monitor sbom as snyk project + ### Monitor SBOM as Snyk project ``` Usage: main.py monitor [OPTIONS] @@ -45,7 +62,7 @@ Options: poetry run python3 main.py --sbom-file="/path/to/bom.json" monitor --snyk-org-id= - ### test sbom against snyk database + ### Test SBOM against Snyk database ``` Usage: main.py test [OPTIONS] @@ -60,4 +77,3 @@ Options: poetry run python3 main.py --sbom-file="/path/to/bom.json" test --snyk-org-id= - diff --git a/sbom_to_snyk_depgraph/main.py b/sbom_to_snyk_depgraph/main.py index da0446f..fe9648e 100644 --- a/sbom_to_snyk_depgraph/main.py +++ b/sbom_to_snyk_depgraph/main.py @@ -1,5 +1,6 @@ import typer import json +import time import sys import logging import requests @@ -7,7 +8,8 @@ from cyclonedx.model.bom import Bom from snyk import SnykClient from snyk_depgraph import DepGraph -from typing import Optional, List +from typing import Optional +from typing import List # set up logging logger = logging.getLogger(__name__) @@ -21,10 +23,23 @@ # globals g={} -g['DEPGRAPH_BASE_TEST_URL'] = "/test/dep-graph?org=" -g['DEPGRAPH_BASE_MONITOR_URL'] = "/monitor/dep-graph?org=" -g['package_source'] = "maven" -g['dep_graph']: DepGraph = DepGraph(g['package_source'], False) +# g['DEPGRAPH_BASE_TEST_URL'] = "/test/dep-graph?org=" +# g['DEPGRAPH_BASE_MONITOR_URL'] = "/monitor/dep-graph?org=" +# g['package_source'] = "maven" +# g['dep_graph'] = DepGraph(g['package_source'], False) + +DEPGRAPH_BASE_TEST_URL = "/test/dep-graph?org=" +DEPGRAPH_BASE_MONITOR_URL = "/monitor/dep-graph?org=" + +package_source = "maven" +dep_graph = DepGraph(package_source, False) + +visited = [] +visited_temp = [] +dep_path_counts = {} + +# future use +# transitive_closures = [] # ===== METHODS ===== @@ -32,15 +47,20 @@ def main( ctx: typer.Context, sbom_file: str = typer.Option( - None, + ..., envvar="SBOM_FILE", help="Full path to SBOM file" ), + prune_repeated_subdependencies: bool = typer.Option( + False, + help="Use if too many repeated sub dependencies causes test or monitor to fail" + ), debug: bool = typer.Option( False, help="Set log level to debug" ), ): + """ Entrypoint into typer CLI handling """ if debug: logger.debug("*** DEBUG MODE ENABLED ***", file=sys.stderr) logger.setLevel(logging.DEBUG) @@ -52,17 +72,22 @@ def main( root_component_ref = "unknown" - try: - root_component_ref = f"{str(g['sbom']._metadata.component.name)}@{str(g['sbom']._metadata.component.version)}" - except: - try: - root_component_ref = f"{str(g['sbom']._metadata.component.name)}" - except: - pass + if g['sbom']._metadata.component.purl: + root_component_ref = f"{str(g['sbom']._metadata.component.purl)}" + elif g['sbom']._metadata.component.name: + root_component_ref = f"{str(g['sbom']._metadata.component.name)}" - logger.debug(root_component_ref) + logger.debug(f"{root_component_ref=}") - cdx_sbom_to_depgraph(parent_node_ref=root_component_ref) + sbom_to_depgraph( + parent_component_ref=root_component_ref, + depth=0 + ) + + if prune_repeated_subdependencies: + logger.info("Pruning graph ...") + time.sleep(2) + prune() return @@ -71,7 +96,7 @@ def print_graph(): """ Print Snyk depGraph representation of SBOM """ - dep_graph: DepGraph = g['dep_graph'] + # dep_graph: DepGraph = g['dep_graph'] typer.echo(f"{json.dumps(dep_graph.graph(), indent=4)}") @@ -93,9 +118,10 @@ def test( """ Test SBOM with Snyk """ - dep_graph: DepGraph = g['dep_graph'] + global dep_graph + snyk_client = SnykClient(snyk_token) - response: requests.Response = snyk_client.post(f"{g['DEPGRAPH_BASE_TEST_URL']}{snyk_org_id}", body=dep_graph.graph()) + response: requests.Response = snyk_client.post(f"{DEPGRAPH_BASE_TEST_URL}{snyk_org_id}", body=dep_graph.graph()) json_response = response.json() print(json.dumps(json_response, indent=4)) @@ -118,18 +144,17 @@ def monitor( envvar="SNYK_ORG_ID", help="Please specify the Snyk ORG ID to run commands against" ), - snyk_project_name: Optional[str] = typer.Option( - None, - envvar="SNYK_PROJECT_NAME", - help="Specify a custom Snyk project name. By default Snyk will use the name of the root node." - ), ): """ Monitor SBOM with Snyk """ - dep_graph: DepGraph = g['dep_graph'] + global dep_graph + snyk_client = SnykClient(snyk_token) - response: requests.Response = snyk_client.post(f"{g['DEPGRAPH_BASE_MONITOR_URL']}{snyk_org_id}", body=dep_graph.graph()) + response: requests.Response = snyk_client.post( + f"{DEPGRAPH_BASE_MONITOR_URL}{snyk_org_id}", + body=dep_graph.graph() + ) json_response = response.json() print(json.dumps(json_response, indent=4)) @@ -143,44 +168,86 @@ def monitor( # Utility Functions # ----------------- -def cdx_sbom_to_depgraph( - parent_node_ref: str +def sbom_to_depgraph( + parent_component_ref: str, + depth: int ) -> DepGraph: """ Convert the CDX SBOM components to snyk depgraph to find issues """ - logger.debug(f"processing depgraph from root: {parent_node_ref}") - dep_graph: DepGraph = g['dep_graph'] + global visited + global visited_temp - parent_dep_for_depgraph = purl_to_depgraph_dep(purl=parent_node_ref) + logger.debug(f"processing sbom deps for parent: {parent_component_ref}") + #dep_graph: DepGraph = g['dep_graph'] - component_purls = get_sbom_component_purls() + parent_dep_for_depgraph = purl_to_depgraph_dep(purl=parent_component_ref) - dep_graph.set_root_node_package(f"{parent_dep_for_depgraph}") + # special entry for the root node of the dep graph + if depth == 0: + logger.debug(f"setting degraph root node to: {parent_dep_for_depgraph}") + dep_graph.set_root_node_package(f"{parent_dep_for_depgraph}") - for purl in component_purls: - depgraph_dep = purl_to_depgraph_dep(purl=purl) + children = get_dependencies_from_ref(parent_component_ref) + logger.debug(f"found child dependencies: {children=}") + + for child in children: + logger.debug(f"{str(child)=}") + depgraph_dep = purl_to_depgraph_dep(purl=str(child)) logger.debug(f"adding pkg {depgraph_dep=} to depgraph") dep_graph.add_pkg(depgraph_dep) + increment_dep_path_count(depgraph_dep) + logger.debug(f"adding dep {depgraph_dep=} for {parent_dep_for_depgraph=} to depgraph") - dep_graph.add_dep(depgraph_dep, parent_dep_for_depgraph) - dep_graph.add_dep(child_node_id = None, parent_node_id = depgraph_dep) + dep_graph.add_dep(child_node_id=depgraph_dep, parent_node_id=parent_dep_for_depgraph) -def get_sbom_component_purls() -> List[str]: - """ - Return the list of component Purl strings + visited_temp.append(parent_component_ref) + + # if we've already processed this subtree, then just return + if child not in visited: + sbom_to_depgraph(str(child), depth=depth+1) + # else: + # future use for smarter pruning + # account for node in the subtree to count all paths + + # we've reach a leaf node and just need to add an entry with empty deps array + if len(children) == 0: + dep_graph.add_dep(child_node_id=None, parent_node_id=parent_dep_for_depgraph) + visited.extend(visited_temp) + + visited_temp = [] + +def get_dependencies_from_ref(dependency_ref) -> List: + children = [] + for dependency in g['sbom'].dependencies: + if str(dependency.ref) == dependency_ref: + logger.debug(f"{dependency.dependencies=}") + children.extend([str(x.ref) for x in dependency.dependencies]) + + return children + +def increment_dep_path_count(dep: str): + """ + Keep track of path counts in case we need to prune """ - component_purls = [] + global dep_path_counts - for component in g['sbom'].components: - component_purls.append(str(component._purl)) + dep_path_counts[dep] = dep_path_counts.get(dep, 0) + 1 - return component_purls - +def prune(): + global dep_graph -def purl_to_depgraph_dep(purl: str): + for dep, instances in dep_path_counts.items(): + if instances > 2: + logger.info(f"pruning {dep} ({instances=})") + dep_graph.prune_dep(dep) + +def purl_to_depgraph_dep(purl: str) -> str: + """ + Convert purl format string to package@version for snyk + """ k = purl.rfind("@") logger.debug(f"{purl[:k]=}") logger.debug(f"{purl[k+1:]=}") From b29baa7bde34a5e3e872014083761cf550bd8952 Mon Sep 17 00:00:00 2001 From: scott-es <59706011+scott-es@users.noreply.github.com> Date: Mon, 25 Mar 2024 16:18:46 -0400 Subject: [PATCH 2/2] docs: Update README.md --- README.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/README.md b/README.md index e7bdd69..2bc9a66 100644 --- a/README.md +++ b/README.md @@ -54,9 +54,6 @@ Options: SNYK_TOKEN] --snyk-org-id TEXT Please specify the Snyk ORG ID to run commands against [env var: SNYK_ORG_ID] - --snyk-project-name TEXT Specify a custom Snyk project name. By default - Snyk will use the name of the root node. [env - var: SNYK_PROJECT_NAME] --help Show this message and exit. ```