Skip to content

Commit

Permalink
Bugfixes for search module queries and logic.
Browse files Browse the repository at this point in the history
Signed-off-by: Caroline Russell <caroline@appthreat.dev>
  • Loading branch information
cerrussell committed Jul 18, 2024
1 parent bfa1223 commit 8753764
Show file tree
Hide file tree
Showing 6 changed files with 103 additions and 103 deletions.
25 changes: 10 additions & 15 deletions INTEGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ A more robust approach:

Use the below SQL query to search by purl_prefix:

```
```sql
SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where purl_prefix = ?;
```

Expand All @@ -54,26 +54,21 @@ CPE_FULL_REGEX = re.compile(
"cpe:?:[^:]+:(?P<cve_type>[^:]+):(?P<vendor>[^:]+):(?P<package>[^:]+):(?P<version>[^:]+):(?P<update>[^:]+):(?P<edition>[^:]+):(?P<lang>[^:]+):(?P<sw_edition>[^:]+):(?P<target_sw>[^:]+):(?P<target_hw>[^:]+):(?P<other>[^:]+)"
)
```

In the `cve_index` table, vendor maps to namespace and package maps to name. The SQL query is below:
Search the `cve_data` table in the index database first to retrieve any matching cve_id and purl_prefix values.:

```sql
SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where namespace = ? AND name = ?;
```

### Comparing version ranges using vers

Refer to the vers [documentation](https://github.com/package-url/purl-spec/blob/version-range-spec/VERSION-RANGE-SPEC.rst) for information regarding vers and a logic to parse and check if a version is within a range. To simplify the logic, a value from the vers column in `cve_index` would contain only a maximum of two constraints (one greater than and one lesser than).

## Combining data

Search the `cve_index` table in the index database first to retrieve any matching cve_id and purl_prefix values. Use these two column values to retrieve the full CVE source information from the `cve_data` table. An example query is shown below:
In the `cve_index` table, vendor maps to namespace and package maps to name. The SQL query is below:

```sql
SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data), purl_prefix FROM cve_data
WHERE cve_id = ? AND purl_prefix = ?
GROUP BY purl_prefix
ORDER BY cve_id DESC;
SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data) FROM cve_data
WHERE cve_id = ? AND namespace = ? AND name = ?
GROUP BY namespace, name
ORDER BY cve_id DESC;
```

Use the `source_data_hash` values to filter out any duplicate results for the same CVE. Duplicate results are possible when multiple vers match the same CVE and purl prefixes.
### Comparing version ranges using vers

Refer to the vers [documentation](https://github.com/package-url/purl-spec/blob/version-range-spec/VERSION-RANGE-SPEC.rst) for information regarding vers and a logic to parse and check if a version is within a range. To simplify the logic, a value from the vers column in `cve_index` would contain only a maximum of two constraints (one greater than and one lesser than).
21 changes: 9 additions & 12 deletions test/test_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,30 +521,27 @@ def test_nvd_api_convert(
cve_data_count, cve_index_count = db6.stats()
assert cve_data_count == 4
assert cve_index_count == 20
results_count = len(list(search.search_by_any("CVE-2020-8022")))
results_count = len(list(search.search_by_any("CVE-2020-8022", with_data=True, to_dict=True)))
assert results_count == 0
results_count = len(list(search.search_by_any("CVE-2024-0057")))
assert results_count == 10
results_count = len(
list(search.search_by_any("cpe:2.3:a:microsoft:.net:*:*:*:*:*:*:*:*"))
)
assert results_count == 1
assert results_count == 20
results_count = len(list(search.search_by_any("cpe:2.3:a:microsoft:.net:*:*:*:*:*:*:*:*")))
assert results_count == 3

# json2
vulnerabilities = nvdlatest.convert(test_nvd_api_json2)
assert len(vulnerabilities) == 1
cvesource = CVESource()
cve = cvesource.convert5(vulnerabilities)
assert len(cve) == 1

db6.clear_all()
nvdlatest.store(vulnerabilities)
cve_data_count, cve_index_count = db6.stats()
assert cve_data_count == 1
assert cve_index_count == 7
results_count = len(list(search.search_by_any("CVE-2020-8022")))
assert results_count == 0
results_count = len(list(search.search_by_any("CVE-2024-21312")))
results_count = len(list(search.search_by_any("CVE-2024-21312", with_data=True, to_dict=True)))
assert results_count == 7

# json3
Expand Down Expand Up @@ -693,7 +690,7 @@ def test_osv_convert(
results_count = len(list(search.search_by_any("CVE-2020-8022")))
assert results_count == 0
results_count = len(list(search.search_by_any("CVE-2019-0647")))
assert results_count == 2
assert results_count == 5
results_count = len(list(search.search_by_any("pkg:maven/org.springframework/spring-web")))
assert results_count == 0
results_count = len(list(search.search_by_any("pkg:apk/alpine/mariadb?arch=source")))
Expand All @@ -714,7 +711,7 @@ def test_osv_convert(
results_count = len(list(search.search_by_any("CVE-2020-8022")))
assert results_count == 0
results_count = len(list(search.search_by_any("CVE-2019-3192")))
assert results_count == 1
assert results_count == 4
results_count = len(list(search.search_by_any("pkg:apk/alpine/mariadb?arch=source")))
assert results_count == 0
results_count = len(list(search.search_by_any("pkg:maven/org.springframework/spring-web")))
Expand Down Expand Up @@ -784,7 +781,7 @@ def test_osv_convert(
results_count = len(list(search.search_by_any("CVE-2020-8022")))
assert results_count == 0
results_count = len(list(search.search_by_any("CVE-2021-23440")))
assert results_count == 2
assert results_count == 3

# go
cve_data = osvlatest.convert(test_osv_go_json)
Expand Down Expand Up @@ -907,7 +904,7 @@ def test_aqua_convert(
results_count = len(list(search.search_by_any("CVE-2020-8022")))
assert results_count == 0
results_count = len(list(search.search_by_any("CVE-2022-45406")))
assert results_count == 26
assert results_count == 27

# ubuntu1
cve_data = aqualatest.convert(test_aqua_ubuntu1_json)
Expand Down
11 changes: 7 additions & 4 deletions vdb/lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
{"cve":{"data_type":"CVE","data_format":"MITRE","data_version":"4.0","CVE_data_meta":{"ID":"%(cve_id)s","ASSIGNER":"%(assigner)s"},"problemtype":{"problemtype_data":[{"description":[{"lang":"en","value":"%(cwe_id)s"}]}]},"references":{"reference_data": %(references)s},"description":{"description_data":[{"lang":"en","value":"%(description)s"}]}},"configurations":{"CVE_data_version":"4.0","nodes":[{"operator":"OR","cpe_match":[{"vulnerable":true,"cpe23Uri":"cpe:2.3:a:%(vendor)s:%(product)s:%(version)s:*:%(edition)s:*:*:*:*:*","versionStartExcluding":"%(version_start_excluding)s","versionEndExcluding":"%(version_end_excluding)s","versionStartIncluding":"%(version_start_including)s","versionEndIncluding":"%(version_end_including)s"}, {"vulnerable":false,"cpe23Uri":"cpe:2.3:a:%(vendor)s:%(product)s:%(fix_version_start_including)s:*:%(edition)s:*:*:*:*:*","versionStartExcluding":"%(fix_version_start_excluding)s","versionEndExcluding":"%(fix_version_end_excluding)s","versionStartIncluding":"%(fix_version_start_including)s","versionEndIncluding":"%(fix_version_end_including)s"}]}]},"impact":{"baseMetricV3":{"cvssV3":{"version":"3.1","vectorString":"%(vectorString)s","attackVector":"NETWORK","attackComplexity":"%(attackComplexity)s","privilegesRequired":"NONE","userInteraction":"%(userInteraction)s","scope":"UNCHANGED","confidentialityImpact":"%(severity)s","integrityImpact":"%(severity)s","availabilityImpact":"%(severity)s","baseScore":%(score).1f,"baseSeverity":"%(severity)s"},"exploitabilityScore":%(exploitabilityScore).1f,"impactScore":%(score).1f},"baseMetricV2":{"cvssV2":{"version":"2.0","vectorString":"AV:N/AC:M/Au:N/C:P/I:P/A:P","accessVector":"NETWORK","accessComplexity":"MEDIUM","authentication":"NONE","confidentialityImpact":"PARTIAL","integrityImpact":"PARTIAL","availabilityImpact":"PARTIAL","baseScore":%(score).1f},"severity":"%(severity)s","exploitabilityScore":%(exploitabilityScore).1f,"impactScore":%(score).1f,"acInsufInfo":false,"obtainAllPrivilege":false,"obtainUserPrivilege":false,"obtainOtherPrivilege":false,"userInteractionRequired":false}},"publishedDate":"%(publishedDate)s","lastModifiedDate":"%(lastModifiedDate)s"}
"""

osv_url_dict = {
OSV_URL_DICT = {
"javascript": "https://osv-vulnerabilities.storage.googleapis.com/npm/all.zip",
"python": "https://osv-vulnerabilities.storage.googleapis.com/PyPI/all.zip",
"go": "https://osv-vulnerabilities.storage.googleapis.com/Go/all.zip",
Expand All @@ -71,13 +71,13 @@

# These feeds introduce too much false positives
if os.getenv("OSV_INCLUDE_FUZZ"):
osv_url_dict["linux"] = (
OSV_URL_DICT["linux"] = (
"https://osv-vulnerabilities.storage.googleapis.com/Linux/all.zip"
)
osv_url_dict["oss-fuzz"] = (
OSV_URL_DICT["oss-fuzz"] = (
"https://osv-vulnerabilities.storage.googleapis.com/OSS-Fuzz/all.zip"
)
osv_url_dict["android"] = (
OSV_URL_DICT["android"] = (
"https://osv-vulnerabilities.storage.googleapis.com/Android/all.zip",
)

Expand Down Expand Up @@ -158,3 +158,6 @@
else "ghcr.io/appthreat/vdbxz:v6"
),
)

# Used for search_by_any, ordered by most common to least common
ADVISORY_PREFIXES = ["CVE-", "ALSA-", "MAL-", "DSA-", "ALBA-", "DLA-","RLSA-","ALEA-", "GHSA-", "RUSTSEC-", "DTSA-", "PSF-", "GO-", "PYSEC-", "OSV-", "RSEC-", "RXSA-" ]
2 changes: 1 addition & 1 deletion vdb/lib/osv.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def download_all(self):
# For performance do not retain the whole data in-memory
# See: https://github.com/AppThreat/vulnerability-db/issues/27
data_list = []
for _, url in config.osv_url_dict.items():
for _, url in config.OSV_URL_DICT.items():
data = self.fetch(url)
if data:
self.store(data)
Expand Down
145 changes: 75 additions & 70 deletions vdb/lib/search.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
from typing import Any, Generator
import json
from typing import Generator, List, Tuple

import apsw
import orjson

from vdb.lib.config import ADVISORY_PREFIXES
from vdb.lib import db6, utils
from vdb.lib.cve_model import CVE, CVE1


def _filter_hits(raw_hits: list, compare_ver: str) -> list:
def exec_query(conn, query: str, args: Tuple[str, ...]) -> List:
res = conn.execute(query, args)
return res.fetchall()


def filter_hits(raw_hits: List, compare_ver: str) -> List:
filtered_list = []
for ahit in raw_hits:
cve_id = ahit[0]
Expand All @@ -25,15 +33,14 @@ def _filter_hits(raw_hits: list, compare_ver: str) -> list:
return filtered_list


def get_cve_data(
db_conn, index_hits: list[dict, Any], search_str: str
) -> Generator | list[dict[str, str | CVE | None]]:
def get_cve_data(db_conn: apsw.Connection | None, index_hits: List, search_str: str, to_dict=False) -> Generator:
"""Get CVE data for the index results
Args:
db_conn: DB Connection or None to create a new one
index_hits: Hits from one of the search methods
search_str: Original search string used
to_dict: Convert source_data to dict
Returns:
generator: generator for CVE data with original source data as a pydantic model
Expand All @@ -43,8 +50,8 @@ def get_cve_data(
for ahit in index_hits:
results = exec_query(
db_conn,
"SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data), purl_prefix FROM cve_data WHERE cve_id = ? AND purl_prefix = ? GROUP BY purl_prefix ORDER BY cve_id DESC;",
(ahit["cve_id"], ahit["purl_prefix"]),
"SELECT DISTINCT cve_id, type, namespace, name, source_data_hash, json(source_data), json(override_data) FROM cve_data WHERE cve_id = ? AND type = ? AND namespace = ? AND name = ? GROUP BY namespace, name ORDER BY cve_id DESC;",
(ahit["cve_id"], ahit["type"], ahit["namespace"], ahit["name"]),
)
for res in results:
yield {
Expand All @@ -56,52 +63,90 @@ def get_cve_data(
"matched_by": search_str,
"source_data_hash": res[4],
"source_data": (
CVE(root=CVE1.model_validate(orjson.loads(res[5]), strict=False))
get_source_data(res, to_dict=to_dict)
if res[5]
else None
),
"override_data": (orjson.loads(res[6]) if res[6] else None),
"purl_prefix": res[7],
}


def search_by_any(any_str: str, with_data: bool = False) -> list | None:
def get_source_data(res, to_dict=False):
"""Allows returning a dict or a pydantic model"""
if to_dict:
return json.loads(CVE(root=CVE1.model_validate(orjson.loads(res[5]), strict=False)).json())
return CVE(root=CVE1.model_validate(orjson.loads(res[5]), strict=False))


def latest_malware(with_limit=20, with_data=False, to_dict=False) -> Generator:
yield search_by_cve("MAL-%", with_data=with_data, with_limit=with_limit, to_dict=to_dict)


def search_by_any(any_str: str, with_data=False, to_dict=False) -> List:
"""Convenient method to search by a string"""
if any_str.startswith("pkg:"):
return search_by_purl_like(any_str, with_data)
if (
any_str.startswith("CVE-")
or any_str.startswith("GHSA-")
or any_str.startswith("MAL-")
):
return search_by_cve(any_str, with_data)
return search_by_purl_like(any_str, with_data, to_dict)
if any(any_str.startswith(ap) for ap in ADVISORY_PREFIXES):
return search_by_cve(any_str, with_data, to_dict=to_dict)
if any_str.startswith("http"):
return search_by_url(any_str, with_data)
return search_by_cpe_like(any_str, with_data)
return search_by_url(any_str, with_data, to_dict)
return search_by_cpe_like(any_str, with_data, to_dict)


def search_by_cpe_like(cpe: str, with_data=False) -> list | None:
def search_by_cdx_bom(bom_file: str, with_data=False, to_dict=False) -> Generator:
"""Search by CycloneDX BOM file"""
with open(bom_file, encoding="utf-8", mode="r") as fp:
cdx_obj = orjson.loads(fp.read())
for component in cdx_obj.get("components"):
if component.get("purl"):
yield search_by_purl_like(component.get("purl"), with_data, to_dict)
if component.get("cpe"):
yield search_by_cpe_like(component.get("cpe"), with_data, to_dict)


def search_by_cpe_like(cpe: str, with_data=False, to_dict=False) -> List:
"""Search by CPE or colon-separate strings"""
db_conn, index_conn = db6.get(read_only=True)
if cpe.startswith("cpe:"):
vendor, package, version, _ = utils.parse_cpe(cpe)
elif cpe.count(":") == 2:
vendor, package, version = cpe.split(":")
else:
return None
return []
# check for vendor name in both namespace and type
raw_hits = exec_query(
index_conn,
"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where (namespace = ? OR type = ?) AND name = ?;",
(vendor, vendor, package),
)
filtered_list = _filter_hits(raw_hits, version)
filtered_list = filter_hits(raw_hits, version)
if with_data:
return list(get_cve_data(db_conn,filtered_list, cpe, to_dict))
return filtered_list


def search_by_cve(cve_id: str, with_data=False, with_limit=None, to_dict=False) -> List:
"""Search by CVE"""
db_conn, index_conn = db6.get(read_only=True)
filter_part = "cve_id LIKE ?" if "%" in cve_id else "cve_id = ?"
filter_part = f"{filter_part} ORDER BY cve_id DESC"
args = [cve_id]
if with_limit:
filter_part = f"{filter_part} LIMIT ?"
args.append(with_limit)
args = tuple(args)
raw_hits = exec_query(
index_conn,
f"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where {filter_part}",
args,
)
filtered_list = filter_hits(raw_hits, "*")
if with_data:
return get_cve_data(db_conn, filtered_list, cpe)
return list(get_cve_data(db_conn, filtered_list, cve_id, to_dict))
return filtered_list


def search_by_purl_like(purl: str, with_data=False) -> list | None:
def search_by_purl_like(purl: str, with_data=False, to_dict=False) -> List:
"""Search by purl like string"""
db_conn, index_conn = db6.get(read_only=True)
purl_obj = utils.parse_purl(purl)
Expand All @@ -124,38 +169,18 @@ def search_by_purl_like(purl: str, with_data=False) -> list | None:
"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where purl_prefix = ?;",
args,
)
filtered_list = _filter_hits(raw_hits, version)
filtered_list = filter_hits(raw_hits, version)
if with_data:
return get_cve_data(db_conn, filtered_list, purl)
return list(get_cve_data(db_conn, filtered_list, purl, to_dict))
return filtered_list
return None
return []


def search_by_cve(cve_id: str, with_data=False, with_limit=None) -> list | None:
"""Search by CVE"""
db_conn, index_conn = db6.get(read_only=True)
filter_part = "cve_id LIKE ?" if "%" in cve_id else "cve_id = ?"
filter_part = f"{filter_part} ORDER BY cve_id DESC"
args = [cve_id]
if with_limit:
filter_part = f"{filter_part} LIMIT ?"
args.append(with_limit)
raw_hits = exec_query(
index_conn,
f"SELECT DISTINCT cve_id, type, namespace, name, vers, purl_prefix FROM cve_index where {filter_part}",
args,
)
filtered_list = _filter_hits(raw_hits, "*")
if with_data:
return get_cve_data(db_conn, filtered_list, cve_id)
return filtered_list


def search_by_url(url: str, with_data=False) -> list | None:
def search_by_url(url: str, with_data=False, to_dict=False) -> List:
"""Search by URL"""
purl_obj = utils.url_to_purl(url)
if not purl_obj:
return None
return []
name = purl_obj["name"]
purl_str = (
f"pkg:{purl_obj['type']}/{purl_obj['namespace']}/{name}"
Expand All @@ -164,24 +189,4 @@ def search_by_url(url: str, with_data=False) -> list | None:
)
if purl_obj["version"]:
purl_str = f"{purl_str}@{purl_obj['version']}"
return search_by_purl_like(purl_str, with_data)


def search_by_cdx_bom(bom_file: str, with_data=False) -> Generator:
"""Search by CycloneDX BOM file"""
with open(bom_file, encoding="utf-8", mode="r") as fp:
cdx_obj = orjson.loads(fp.read())
for component in cdx_obj.get("components"):
if component.get("purl"):
yield search_by_purl_like(component.get("purl"), with_data)
if component.get("cpe"):
yield search_by_cpe_like(component.get("cpe"), with_data)


def latest_malware(with_limit=20, with_data=False) -> Generator:
yield search_by_cve("MAL-%", with_data=with_data, with_limit=with_limit)


def exec_query(conn, query: str, args: tuple[str, ...]) -> list:
res = conn.execute(query, args)
return res.fetchall()
return search_by_purl_like(purl_str, with_data, to_dict)
2 changes: 1 addition & 1 deletion vdb/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ def trim_epoch(
def vers_compare(compare_ver: str | int | float, vers: str) -> bool:
"""Purl vers based version comparison"""
min_version, max_version, min_excluding, max_excluding = None, None, None, None
if vers == "*" or compare_ver is None:
if vers == "*" or compare_ver == "*" or not compare_ver:
return True
if vers.startswith("vers:"):
vers_parts = vers.split("/")[-1].split("|")
Expand Down

0 comments on commit 8753764

Please sign in to comment.