Skip to content

Commit

Permalink
Merge pull request #1 from snyk-labs/feat/component-relationships
Browse files Browse the repository at this point in the history
feat: build full tree with nested dep relationships
  • Loading branch information
scott-es authored Mar 25, 2024
2 parents 5bbab93 + b29baa7 commit b1ba25d
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 55 deletions.
33 changes: 23 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,39 @@
![snyk-oss-category](https://github.com/snyk-labs/oss-images/blob/main/oss-example.jpg)

# sbom-to-snyk-depgraph
convert cyclone dx sbom to snyk depgraph and monitor/test/print
Convert [CycloneDX](https://cyclonedx.org/) SBOM to snyk depgraph and monitor/test/print

```
Usage: main.py [OPTIONS] COMMAND [ARGS]...
Options:
--sbom-file TEXT Full path to SBOM file [env var: SBOM_FILE]
--debug / --no-debug Set log level to debug [default: no-debug]
--help Show this message and exit.
--sbom-file TEXT Full path to SBOM file [env var: SBOM_FILE;
required]
--prune-repeated-subdependencies / --no-prune-repeated-subdependencies
Use if too many repeated sub dependencies
causes test or monitor to fail [default:
no-prune-repeated-subdependencies]
--debug / --no-debug Set log level to debug [default: no-debug]
--help Show this message and exit.
Commands:
monitor Monitor SBOM with Snyk
print-graph Print Snyk depGraph representation of SBOM
test Test SBOM with Snyk
```

### Exit Codes

- 2 = an error occurred during execution and did not complete successfully (see error message on stdout)

When using `test` command
- 0 = test completed successfully and no security issues were found
- 1 = test completed successfully and 1 more security issues were found

When using other commands (e.g. `monitor` or `print-graph`)
- 0 = command completed successfully


# Setup

Requires [Poetry](https://python-poetry.org/) for Python
Expand All @@ -26,7 +43,7 @@ Commands:

# Examples

### monitor sbom as snyk project
### Monitor SBOM as Snyk project
```
Usage: main.py monitor [OPTIONS]
Expand All @@ -37,15 +54,12 @@ Options:
SNYK_TOKEN]
--snyk-org-id TEXT Please specify the Snyk ORG ID to run commands
against [env var: SNYK_ORG_ID]
--snyk-project-name TEXT Specify a custom Snyk project name. By default
Snyk will use the name of the root node. [env
var: SNYK_PROJECT_NAME]
--help Show this message and exit.
```

poetry run python3 main.py --sbom-file="/path/to/bom.json" monitor --snyk-org-id=<org_id>

### test sbom against snyk database
### Test SBOM against Snyk database
```
Usage: main.py test [OPTIONS]
Expand All @@ -60,4 +74,3 @@ Options:

poetry run python3 main.py --sbom-file="/path/to/bom.json" test --snyk-org-id=<org_id>


157 changes: 112 additions & 45 deletions sbom_to_snyk_depgraph/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import typer
import json
import time
import sys
import logging
import requests
from uuid import UUID
from cyclonedx.model.bom import Bom
from snyk import SnykClient
from snyk_depgraph import DepGraph
from typing import Optional, List
from typing import Optional
from typing import List

# set up logging
logger = logging.getLogger(__name__)
Expand All @@ -21,26 +23,44 @@

# globals
g={}
g['DEPGRAPH_BASE_TEST_URL'] = "/test/dep-graph?org="
g['DEPGRAPH_BASE_MONITOR_URL'] = "/monitor/dep-graph?org="
g['package_source'] = "maven"
g['dep_graph']: DepGraph = DepGraph(g['package_source'], False)
# g['DEPGRAPH_BASE_TEST_URL'] = "/test/dep-graph?org="
# g['DEPGRAPH_BASE_MONITOR_URL'] = "/monitor/dep-graph?org="
# g['package_source'] = "maven"
# g['dep_graph'] = DepGraph(g['package_source'], False)

DEPGRAPH_BASE_TEST_URL = "/test/dep-graph?org="
DEPGRAPH_BASE_MONITOR_URL = "/monitor/dep-graph?org="

package_source = "maven"
dep_graph = DepGraph(package_source, False)

visited = []
visited_temp = []
dep_path_counts = {}

# future use
# transitive_closures = []

# ===== METHODS =====

@app.callback()
def main(
ctx: typer.Context,
sbom_file: str = typer.Option(
None,
...,
envvar="SBOM_FILE",
help="Full path to SBOM file"
),
prune_repeated_subdependencies: bool = typer.Option(
False,
help="Use if too many repeated sub dependencies causes test or monitor to fail"
),
debug: bool = typer.Option(
False,
help="Set log level to debug"
),
):
""" Entrypoint into typer CLI handling """
if debug:
logger.debug("*** DEBUG MODE ENABLED ***", file=sys.stderr)
logger.setLevel(logging.DEBUG)
Expand All @@ -52,17 +72,22 @@ def main(

root_component_ref = "unknown"

try:
root_component_ref = f"{str(g['sbom']._metadata.component.name)}@{str(g['sbom']._metadata.component.version)}"
except:
try:
root_component_ref = f"{str(g['sbom']._metadata.component.name)}"
except:
pass
if g['sbom']._metadata.component.purl:
root_component_ref = f"{str(g['sbom']._metadata.component.purl)}"
elif g['sbom']._metadata.component.name:
root_component_ref = f"{str(g['sbom']._metadata.component.name)}"

logger.debug(root_component_ref)
logger.debug(f"{root_component_ref=}")

cdx_sbom_to_depgraph(parent_node_ref=root_component_ref)
sbom_to_depgraph(
parent_component_ref=root_component_ref,
depth=0
)

if prune_repeated_subdependencies:
logger.info("Pruning graph ...")
time.sleep(2)
prune()

return

Expand All @@ -71,7 +96,7 @@ def print_graph():
"""
Print Snyk depGraph representation of SBOM
"""
dep_graph: DepGraph = g['dep_graph']
# dep_graph: DepGraph = g['dep_graph']

typer.echo(f"{json.dumps(dep_graph.graph(), indent=4)}")

Expand All @@ -93,9 +118,10 @@ def test(
"""
Test SBOM with Snyk
"""
dep_graph: DepGraph = g['dep_graph']
global dep_graph

snyk_client = SnykClient(snyk_token)
response: requests.Response = snyk_client.post(f"{g['DEPGRAPH_BASE_TEST_URL']}{snyk_org_id}", body=dep_graph.graph())
response: requests.Response = snyk_client.post(f"{DEPGRAPH_BASE_TEST_URL}{snyk_org_id}", body=dep_graph.graph())

json_response = response.json()
print(json.dumps(json_response, indent=4))
Expand All @@ -118,18 +144,17 @@ def monitor(
envvar="SNYK_ORG_ID",
help="Please specify the Snyk ORG ID to run commands against"
),
snyk_project_name: Optional[str] = typer.Option(
None,
envvar="SNYK_PROJECT_NAME",
help="Specify a custom Snyk project name. By default Snyk will use the name of the root node."
),
):
"""
Monitor SBOM with Snyk
"""
dep_graph: DepGraph = g['dep_graph']
global dep_graph

snyk_client = SnykClient(snyk_token)
response: requests.Response = snyk_client.post(f"{g['DEPGRAPH_BASE_MONITOR_URL']}{snyk_org_id}", body=dep_graph.graph())
response: requests.Response = snyk_client.post(
f"{DEPGRAPH_BASE_MONITOR_URL}{snyk_org_id}",
body=dep_graph.graph()
)

json_response = response.json()
print(json.dumps(json_response, indent=4))
Expand All @@ -143,44 +168,86 @@ def monitor(
# Utility Functions
# -----------------

def cdx_sbom_to_depgraph(
parent_node_ref: str
def sbom_to_depgraph(
parent_component_ref: str,
depth: int
) -> DepGraph:
"""
Convert the CDX SBOM components to snyk depgraph to find issues
"""
logger.debug(f"processing depgraph from root: {parent_node_ref}")
dep_graph: DepGraph = g['dep_graph']
global visited
global visited_temp

parent_dep_for_depgraph = purl_to_depgraph_dep(purl=parent_node_ref)
logger.debug(f"processing sbom deps for parent: {parent_component_ref}")
#dep_graph: DepGraph = g['dep_graph']

component_purls = get_sbom_component_purls()
parent_dep_for_depgraph = purl_to_depgraph_dep(purl=parent_component_ref)

dep_graph.set_root_node_package(f"{parent_dep_for_depgraph}")
# special entry for the root node of the dep graph
if depth == 0:
logger.debug(f"setting degraph root node to: {parent_dep_for_depgraph}")
dep_graph.set_root_node_package(f"{parent_dep_for_depgraph}")

for purl in component_purls:
depgraph_dep = purl_to_depgraph_dep(purl=purl)
children = get_dependencies_from_ref(parent_component_ref)
logger.debug(f"found child dependencies: {children=}")

for child in children:
logger.debug(f"{str(child)=}")
depgraph_dep = purl_to_depgraph_dep(purl=str(child))

logger.debug(f"adding pkg {depgraph_dep=} to depgraph")
dep_graph.add_pkg(depgraph_dep)

increment_dep_path_count(depgraph_dep)

logger.debug(f"adding dep {depgraph_dep=} for {parent_dep_for_depgraph=} to depgraph")
dep_graph.add_dep(depgraph_dep, parent_dep_for_depgraph)
dep_graph.add_dep(child_node_id = None, parent_node_id = depgraph_dep)
dep_graph.add_dep(child_node_id=depgraph_dep, parent_node_id=parent_dep_for_depgraph)

def get_sbom_component_purls() -> List[str]:
"""
Return the list of component Purl strings
visited_temp.append(parent_component_ref)

# if we've already processed this subtree, then just return
if child not in visited:
sbom_to_depgraph(str(child), depth=depth+1)
# else:
# future use for smarter pruning
# account for node in the subtree to count all paths

# we've reach a leaf node and just need to add an entry with empty deps array
if len(children) == 0:
dep_graph.add_dep(child_node_id=None, parent_node_id=parent_dep_for_depgraph)
visited.extend(visited_temp)

visited_temp = []

def get_dependencies_from_ref(dependency_ref) -> List:
children = []
for dependency in g['sbom'].dependencies:
if str(dependency.ref) == dependency_ref:
logger.debug(f"{dependency.dependencies=}")
children.extend([str(x.ref) for x in dependency.dependencies])

return children

def increment_dep_path_count(dep: str):
"""
Keep track of path counts in case we need to prune
"""
component_purls = []
global dep_path_counts

for component in g['sbom'].components:
component_purls.append(str(component._purl))
dep_path_counts[dep] = dep_path_counts.get(dep, 0) + 1

return component_purls

def prune():
global dep_graph

def purl_to_depgraph_dep(purl: str):
for dep, instances in dep_path_counts.items():
if instances > 2:
logger.info(f"pruning {dep} ({instances=})")
dep_graph.prune_dep(dep)

def purl_to_depgraph_dep(purl: str) -> str:
"""
Convert purl format string to package@version for snyk
"""
k = purl.rfind("@")
logger.debug(f"{purl[:k]=}")
logger.debug(f"{purl[k+1:]=}")
Expand Down

0 comments on commit b1ba25d

Please sign in to comment.