-
Notifications
You must be signed in to change notification settings - Fork 4
/
capec2technique.py
96 lines (75 loc) · 2.96 KB
/
capec2technique.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import sys
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
CAPEC_FILE = "resources/capec_db.json"
CVE_FILE = "results/new_cves.jsonl"
# Update the database with the new CVEs and save the results to a JSONL file
def save_jsonl(cve_capec_data):
# Write the results to a JSONL file
with open(CVE_FILE, 'w') as f:
for cve, data in cve_capec_data.items():
f.write(json.dumps({cve: data}) + "\n")
new_cves = {}
for cve, data in cve_capec_data.items():
year = cve.split('-')[1]
if year not in new_cves:
new_cves[year] = {}
new_cves[year][cve] = data
for year, cves in new_cves.items():
# Update the database with the new CVEs
cve_db = load_db_jsonl(year)
cve_db.update(cves)
with open(f'database/CVE-{year}.jsonl', 'w') as f:
for cve, data in cve_db.items():
f.write(json.dumps({cve: data}) + "\n")
# Load the database from a JSONL file
def load_db_jsonl(cve_year):
cve_db = {}
try:
with open(f'database/CVE-{cve_year}.jsonl', 'r') as f:
for line in f:
cve_entry = json.loads(line.strip())
cve_db.update(cve_entry)
except FileNotFoundError:
cve_db = {}
return cve_db
# Process CVE to extract the related CAPEC entries
def process_single_cve(cve, capec_list, cve_capec_data):
technics = set()
for capec in cve_capec_data[cve]["CAPEC"]:
lines = capec_list.get(capec, {}).get("techniques", "")
if lines:
entries = lines.split("NAME:ATTACK:ENTRY ")[1:]
for entry in entries:
infos = entry.split(":")
id = infos[1]
technics.add(id)
return list(sorted(technics))
# Multithreading process to extract CAPEC entries for each CVE
def process_capec(cve_capec_data, capec_list, cve_year):
with ThreadPoolExecutor() as executor:
futures = {executor.submit(process_single_cve, cve, capec_list, cve_capec_data): cve for cve in tqdm(cve_capec_data, desc=f"Processing CAPEC to TECHNIQUES for CVE-{cve_year}", unit="CVE")}
for future in as_completed(futures):
cve_result = future.result()
cve_capec_data[futures[future]]["TECHNIQUES"] = cve_result
if __name__ == "__main__":
if len(sys.argv) == 2:
file = sys.argv[1]
else:
file = CVE_FILE
# Load the JSONL file
cve_capec_data = {}
with open(file, 'r') as f:
for line in f:
cve_entry = json.loads(line.strip())
cve_capec_data.update(cve_entry)
if cve_capec_data:
# Load the CAPEC database
with open(CAPEC_FILE, 'r') as f:
capec_list = json.load(f)
cve_year = list(cve_capec_data.keys())[0].split('-')[1]
process_capec(cve_capec_data, capec_list, cve_year)
save_jsonl(cve_capec_data)
else:
print("[-]No new vulnerabilities found")