Skip to content

Commit

Permalink
BREAKING: Store large descriptions in supporting media (#139)
Browse files Browse the repository at this point in the history
* BREAKING: Store large descriptions in supporting media

Signed-off-by: Prabhu Subramanian <prabhu@appthreat.com>

---------

Signed-off-by: Prabhu Subramanian <prabhu@appthreat.com>
  • Loading branch information
prabhu authored May 30, 2024
1 parent bfa1223 commit 8c647be
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 77 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ dependencies = [
"httpx[http2]",
"appdirs",
"orjson",
"semver>=3.0.0",
"semver",
"packageurl-python",
"cvss",
"pydantic[email]",
"rich",
"apsw>=3.45.2.0"
"apsw"
]
requires-python = ">=3.10"
readme = "README.md"
Expand Down
34 changes: 25 additions & 9 deletions vdb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-

import argparse
import base64
import logging
import os
import shutil
Expand All @@ -22,6 +23,7 @@
# oras is an optional dependency
try:
from vdb.lib.orasclient import download_image

ORAS_AVAILABLE = True
except ImportError:
pass
Expand Down Expand Up @@ -126,21 +128,29 @@ def add_table_row(table: Table, res: dict, added_row_keys: dict):
if added_row_keys.get(row_key):
return
source_data: CVE = res.get("source_data")
description = ""
descriptions = []
if (
source_data.root.containers.cna
and source_data.root.containers.cna.descriptions
and source_data.root.containers.cna.descriptions.root
):
description = (
source_data.root.containers.cna.descriptions.root[0]
.value.replace("\\n", "\n")
.replace("\\t", " ")
)
for adesc in source_data.root.containers.cna.descriptions.root:
description = (
"\n".join(
[
base64.b64decode(sm.value).decode("utf-8")
for sm in adesc.supportingMedia
]
)
if adesc.supportingMedia
else adesc.value
)
description = description.replace("\\n", "\n").replace("\\t", " ")
descriptions.append(description)
table.add_row(
res.get("cve_id"),
res.get("matched_by"),
Markdown(description, justify="left", hyperlinks=True),
Markdown("\n".join(descriptions), justify="left", hyperlinks=True),
)
added_row_keys[row_key] = True

Expand Down Expand Up @@ -176,10 +186,16 @@ def main():
shutil.rmtree(config.DATA_DIR, ignore_errors=True)
if args.download_image:
if ORAS_AVAILABLE:
LOG.info("Downloading vdb image from %s to %s", config.VDB_DATABASE_URL, config.DATA_DIR)
LOG.info(
"Downloading vdb image from %s to %s",
config.VDB_DATABASE_URL,
config.DATA_DIR,
)
download_image(config.VDB_DATABASE_URL, config.DATA_DIR)
else:
console.print("Oras library is not available. Install using pip install appthreat-vulnerability-db[oras] and then re-run this command.")
console.print(
"Oras library is not available. Install using pip install appthreat-vulnerability-db[oras] and then re-run this command."
)
elif args.cache or args.cache_os:
db_lib.get()
db_lib.clear_all()
Expand Down
33 changes: 30 additions & 3 deletions vdb/lib/cve.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import base64
import os
import uuid

Expand Down Expand Up @@ -41,6 +42,7 @@
References,
State,
Status,
SupportingMediaItem,
UuidType,
Version,
Versions,
Expand Down Expand Up @@ -179,14 +181,17 @@ def to_cve_affected(avuln: Vulnerability) -> Affected | None:
tmp_a = product.split("/")
# ubuntu/upstream/virtualbox should become
# product=ubuntu and package_name=upstream/virtualbox
if vendor in config.OS_PKG_TYPES or config.VENDOR_TO_VERS_SCHEME.get(vendor):
if (
vendor in config.OS_PKG_TYPES
or config.VENDOR_TO_VERS_SCHEME.get(vendor)
):
product = tmp_a[0]
package_name = "/".join(tmp_a[1:])
elif len(tmp_a) != 2:
if len(tmp_a) > 2 and vendor in ("generic", "swift"):
product = os.path.dirname(product)
package_name = os.path.basename(package_name)
# If we get an empty package_name then fallback to using the full string as package_name
# For empty package_name fallback to using the full string
if not package_name:
product = None
package_name = parts.group("package")
Expand Down Expand Up @@ -329,7 +334,29 @@ def to_cve_containers(avuln: Vulnerability) -> CnaPublishedContainer | None:
return None
cont = CnaPublishedContainer(
providerMetadata=provier_meta,
descriptions=[Description(lang=Language("en"), value=avuln.description)],
descriptions=[
Description(
lang=Language("en"),
value=(
avuln.description
if len(avuln.description) <= 4096
else "Refer to the supporting media"
),
supportingMedia=(
[
SupportingMediaItem(
type="text/markdown",
base64=True,
value=(
(base64.b64encode(bytes(avuln.description, "utf-8")))
),
)
]
if len(avuln.description) > 4096
else None
),
)
],
affected=affected,
metrics=to_metrics(avuln),
)
Expand Down
10 changes: 4 additions & 6 deletions vdb/lib/cve_model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# generated by datamodel-codegen:
# filename: CVE_JSON_5.0_schema.json
# timestamp: 2024-02-07T18:22:42+00:00
# pylint: disable=C0115, C0103, C0301

from __future__ import annotations

Expand Down Expand Up @@ -646,11 +644,11 @@ class SupportingMediaItem(BaseModel):
title="Encoding",
),
]
# Removed max_length
value: Annotated[
str,
Field(
description="Supporting media content, up to 16K. If base64 is true, this field stores base64 encoded data.",
max_length=16384,
description="Supporting media content. If base64 is true, this field stores base64 encoded data.",
min_length=1,
),
]
Expand Down Expand Up @@ -797,7 +795,7 @@ class Description(BaseModel):
)
lang: Language
value: Annotated[
str, Field(description="Plain text description.", min_length=1)
str, Field(description="Plain text description.", min_length=1, max_length=4096)
]
supportingMedia: Annotated[
Optional[List[SupportingMediaItem]],
Expand Down
2 changes: 2 additions & 0 deletions vdb/lib/cve_model/common.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# pylint: disable=C0115, C0103, C0301

from enum import Enum
from typing import Annotated

Expand Down
4 changes: 1 addition & 3 deletions vdb/lib/cve_model/cvss_v2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# generated by datamodel-codegen:
# filename: CVE_JSON_5.0_schema.json
# timestamp: 2024-02-07T18:22:42+00:00
# pylint: disable=C0115, C0103, C0301

from __future__ import annotations

Expand Down
4 changes: 1 addition & 3 deletions vdb/lib/cve_model/cvss_v3.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
# generated by datamodel-codegen:
# filename: CVE_JSON_5.0_schema.json
# timestamp: 2024-02-07T18:22:42+00:00
# pylint: disable=C0115, C0103, C0301

from __future__ import annotations

Expand Down
92 changes: 51 additions & 41 deletions vdb/lib/osv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This module fetches the vulnerability data from osv.dev and stores them in NVD CVE 1.1 json format.
"""

from zipfile import ZipFile

import httpx
Expand All @@ -28,7 +29,7 @@
"go": "golang",
"crates.io": "cargo",
"swifturl": "swift",
"github actions": "github"
"github actions": "github",
}


Expand Down Expand Up @@ -89,20 +90,14 @@ def to_vuln(cve_data):
aliases = cve_data.get("aliases", [])
aliases_block = ""
if aliases and len(aliases) > 1:
aliases_block = """
aliases_block = f"""
## Related CVE(s)
{}
""".format(
", ".join(aliases)
)
description = """# {}
{}
{}
""".format(
cve_data.get("summary", "Summary"),
cve_data.get("details", ""),
aliases_block,
)
{", ".join(aliases)}
"""
description = f"""# {cve_data.get("summary", "Summary")}
{cve_data.get("details", "")}
{aliases_block}
"""
if "** DISPUTED **" in description or "** REJECT **" in description:
return ret_data
references = []
Expand Down Expand Up @@ -211,7 +206,12 @@ def to_vuln(cve_data):
pkg_name = f'{purl["namespace"]}/{purl["name"]}'
elif purl.get("name"):
pkg_name = purl["name"]
if ":" in pkg_name and vendor.lower() not in ("swift", "swifturl", "github", "github actions"):
if ":" in pkg_name and vendor.lower() not in (
"swift",
"swifturl",
"github",
"github actions",
):
# Example: commons-fileupload:commons-fileupload
# org.apache.tomcat:tomcat
tmp_a = pkg_name.split(":")
Expand All @@ -226,17 +226,22 @@ def to_vuln(cve_data):
if vendor_overrides.get(vendor):
vendor = vendor_overrides.get(vendor)
# Since swift allows both url and local based lookups, we store both the variations
if vendor in ("swift", "swifturl", "github", "github actions") and pkg_name.startswith("github.com"):
if vendor in (
"swift",
"swifturl",
"github",
"github actions",
) and pkg_name.startswith("github.com"):
pkg_name_list.append(pkg_name.removeprefix("github.com/"))
# For OS packages, such as alpine OSV appends the os version to the vendor
# Let's remove it and add it to package name
if ":" in vendor_ecosystem and (
"alpine" in vendor
or "apk" in vendor
or "deb" in vendor
or "debian" in vendor
or "almalinux" in vendor
or "rocky" in vendor
"alpine" in vendor
or "apk" in vendor
or "deb" in vendor
or "debian" in vendor
or "almalinux" in vendor
or "rocky" in vendor
):
tmp_v = vendor_ecosystem.split(":")
vendor = tmp_v[0].lower().replace(" ", "").replace("-", "")
Expand All @@ -249,14 +254,16 @@ def to_vuln(cve_data):
edition = f"{vendor}-{vdistro}"
# Only use the precise version for os packages
if (
"debian" in vendor
or "deb" in vendor
or "alpine" in vendor
or "apk" in vendor
or "almalinux" in vendor
or "rocky" in vendor
"debian" in vendor
or "deb" in vendor
or "alpine" in vendor
or "apk" in vendor
or "almalinux" in vendor
or "rocky" in vendor
):
pkg_name_list = [f"{vendor}/{edition}/{pkg_name.removeprefix(vendor + '/')}"]
pkg_name_list = [
f"{vendor}/{edition}/{pkg_name.removeprefix(vendor + '/')}"
]
else:
pkg_name_list.append(f"{edition}/{pkg_name}")
# For some ecosystem, osv provides a full list of versions with partial events. See osv-pypi2.json for an example
Expand Down Expand Up @@ -317,12 +324,15 @@ def to_vuln(cve_data):
for r in ranges:
if r.get("type") == "GIT" and r.get("repo"):
vendor = "generic"
repo_name = (r.get("repo").removeprefix("http://")
.removeprefix("https://")
.removeprefix("git://")
.removesuffix("/")
.removesuffix(".git")
.lower())
repo_name = (
r.get("repo")
.removeprefix("http://")
.removeprefix("https://")
.removeprefix("git://")
.removesuffix("/")
.removesuffix(".git")
.lower()
)
# See #112
for special_type in ("github.com", "gitlab.com"):
if repo_name.startswith(special_type):
Expand All @@ -342,8 +352,8 @@ def to_vuln(cve_data):
if rversions_list:
version_start_including = rversions_list[0]
if (
len(rversions_list) > 1
and version_start_including != rversions_list[-1]
len(rversions_list) > 1
and version_start_including != rversions_list[-1]
):
version_end_including = rversions_list[-1]
for ev in events:
Expand Down Expand Up @@ -372,10 +382,10 @@ def to_vuln(cve_data):
version_end_excluding = ev.get("limit").split(":")[-1]
# Create an entry for each introduced + fixed/limit event
if version_start_including and (
fix_version_start_including
or version_end_including
or version_end_excluding
or (len(events) == 1 and not versions_list)
fix_version_start_including
or version_end_including
or version_end_excluding
or (len(events) == 1 and not versions_list)
):
for full_pkg in pkg_name_list:
tdata = config.CVE_TPL % dict(
Expand Down
Loading

0 comments on commit 8c647be

Please sign in to comment.