From b8088505b1c7a0c8c75f72447a31c91ca1179e72 Mon Sep 17 00:00:00 2001 From: Sean Whalen <44679+seanthegeek@users.noreply.github.com> Date: Mon, 19 Feb 2024 18:45:38 -0500 Subject: [PATCH] Add support for SMTP TLS reports (#453) --- .gitignore | 3 + CHANGELOG.md | 11 +- docs/source/output.md | 43 +++++ docs/source/usage.md | 1 + parsedmarc/__init__.py | 343 ++++++++++++++++++++++++++++----- parsedmarc/cli.py | 106 +++++++++- parsedmarc/elastic.py | 190 ++++++++++++++++++ parsedmarc/kafkaclient.py | 33 ++++ parsedmarc/loganalytics.py | 35 +++- parsedmarc/s3.py | 15 +- parsedmarc/splunk.py | 43 ++++- parsedmarc/syslog.py | 7 +- parsedmarc/utils.py | 2 +- pyproject.toml | 2 +- requirements.txt | 3 +- samples/smtp_tls/smtp_tls.json | 43 +++++ tests.py | 2 +- 17 files changed, 810 insertions(+), 72 deletions(-) create mode 100644 samples/smtp_tls/smtp_tls.json diff --git a/.gitignore b/.gitignore index 18149441..b86356a5 100644 --- a/.gitignore +++ b/.gitignore @@ -105,6 +105,9 @@ ENV/ # PyCharm Project settings .idea/ +# VS Code launch config +.vscode/launch.json + # Visual Studio Code settings #.vscode/ diff --git a/CHANGELOG.md b/CHANGELOG.md index f1c26792..541dac61 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,17 @@ Changelog ========= +8.7.0 +----- + +- Add support for SMTP TLS reports (PR #453 closes issue #71) +- Do not replace content in forensic samples (fix #403) +- Pin `msgraph-core` dependency at version `0.2.2` until Microsoft provides better documentation +- Properly handle base64-encoded email attachments +- Do not crash when attempting to parse invalid email content + 8.6.4 ----- +----- - Properly process aggregate reports that incorrectly call `identifiers` `identities` - Ignore SPF results in aggregate report records if the domain is not provided diff --git a/docs/source/output.md b/docs/source/output.md index f6768911..61b82737 100644 --- a/docs/source/output.md +++ b/docs/source/output.md @@ -187,3 +187,46 @@ Thanks to GitHub user [xennn](https://github.com/xennn) for the anonymized feedback_type,user_agent,version,original_envelope_id,original_mail_from,original_rcpt_to,arrival_date,arrival_date_utc,subject,message_id,authentication_results,dkim_domain,source_ip_address,source_country,source_reverse_dns,source_base_domain,delivery_result,auth_failure,reported_domain,authentication_mechanisms,sample_headers_only auth-failure,Lua/1.0,1.0,,sharepoint@domain.de,peter.pan@domain.de,"Mon, 01 Oct 2018 11:20:27 +0200",2018-10-01 09:20:27,Subject,<38.E7.30937.BD6E1BB5@ mailrelay.de>,"dmarc=fail (p=none, dis=none) header.from=domain.de",,10.10.10.10,,,,policy,dmarc,domain.de,,False ``` + +### JSON SMTP TLS report + +```json +[ + { + "organization_name": "Example Inc.", + "begin_date": "2024-01-09T00:00:00Z", + "end_date": "2024-01-09T23:59:59Z", + "report_id": "2024-01-09T00:00:00Z_example.com", + "policies": [ + { + "policy_domain": "example.com", + "policy_type": "sts", + "policy_strings": [ + "version: STSv1", + "mode: testing", + "mx: example.com", + "max_age: 86400" + ], + "successful_session_count": 0, + "failed_session_count": 3, + "failure_details": [ + { + "result_type": "validation-failure", + "failed_session_count": 2, + "sending_mta_ip": "209.85.222.201", + "receiving_ip": "173.212.201.41", + "receiving_mx_hostname": "example.com" + }, + { + "result_type": "validation-failure", + "failed_session_count": 1, + "sending_mta_ip": "209.85.208.176", + "receiving_ip": "173.212.201.41", + "receiving_mx_hostname": "example.com" + } + ] + } + ] + } +] +``` \ No newline at end of file diff --git a/docs/source/usage.md b/docs/source/usage.md index 6a665e40..b1154f65 100644 --- a/docs/source/usage.md +++ b/docs/source/usage.md @@ -300,6 +300,7 @@ The full set of configuration options are: - `dcr_immutable_id` - str: The immutable ID of the Data Collection Rule (DCR) - `dcr_aggregate_stream` - str: The stream name for aggregate reports in the DCR - `dcr_forensic_stream` - str: The stream name for the forensic reports in the DCR + - `dcr_smtp_tls_stream` - str: The stream name for the SMTP TLS reports in the DCR :::{note} Information regarding the setup of the Data Collection Rule can be found [here](https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal). diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 8584def9..291c0084 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -34,7 +34,7 @@ from parsedmarc.utils import parse_email from parsedmarc.utils import timestamp_to_human, human_timestamp_to_datetime -__version__ = "8.6.4" +__version__ = "8.7.0" logger.debug("parsedmarc v{0}".format(__version__)) @@ -46,6 +46,7 @@ MAGIC_ZIP = b"\x50\x4B\x03\x04" MAGIC_GZIP = b"\x1F\x8B" MAGIC_XML = b"\x3c\x3f\x78\x6d\x6c\x20" +MAGIC_JSON = b"\7b" IP_ADDRESS_CACHE = ExpiringDict(max_len=10000, max_age_seconds=1800) @@ -58,6 +59,10 @@ class InvalidDMARCReport(ParserError): """Raised when an invalid DMARC report is encountered""" +class InvalidSMTPTLSReport(ParserError): + """Raised when an invalid SMTP TLS report is encountered""" + + class InvalidAggregateReport(InvalidDMARCReport): """Raised when an invalid DMARC aggregate report is encountered""" @@ -204,6 +209,179 @@ def _parse_report_record(record, ip_db_path=None, offline=False, return new_record +def _parse_smtp_tls_failure_details(failure_details): + try: + new_failure_details = OrderedDict( + result_type=failure_details["result-type"], + failed_session_count=failure_details["failed-session-count"], + sending_mta_ip=failure_details["sending-mta-ip"], + receiving_ip=failure_details["receiving-ip"] + ) + + if "receiving-mx-hostname" in failure_details: + new_failure_details["receiving_mx_hostname"] = failure_details[ + "receiving-mx-hostname"] + if "receiving-mx-helo" in failure_details: + new_failure_details["receiving_mx_helo"] = failure_details[ + "receiving-mx-helo"] + if "additional-info-uri" in failure_details: + new_failure_details["additional_info_uri"] = failure_details[ + "additional-info-uri"] + if "failure-reason-code" in failure_details: + new_failure_details["failure_reason_code"] = failure_details[ + "failure-reason-code"] + + return new_failure_details + + except KeyError as e: + raise InvalidSMTPTLSReport(f"Missing required failure details field:" + f" {e}") + except Exception as e: + raise InvalidSMTPTLSReport(str(e)) + + +def _parse_smtp_tls_report_policy(policy): + policy_types = ["tlsa", "sts", "no-policy-found"] + try: + policy_domain = policy["policy"]["policy-domain"] + policy_type = policy["policy"]["policy-type"] + failure_details = [] + if policy_type not in policy_types: + raise InvalidSMTPTLSReport(f"Invalid policy type " + f"{policy_type}") + new_policy = OrderedDict(policy_domain=policy_domain, + policy_type=policy_type) + if "policy-string" in policy["policy"]: + if isinstance(policy["policy"]["policy-string"], list): + if len(policy["policy"]["policy-string"]) > 0: + new_policy["policy_strings"] = policy["policy"][ + "policy-string"] + + if "mx-host-pattern" in policy["policy"]: + if isinstance(policy["policy"]["mx-host-pattern"], list): + if len(policy["policy"]["mx-host-pattern"]) > 0: + new_policy["mx_host_patterns"] = policy["policy"][ + "mx-host-pattern"] + new_policy["successful_session_count"] = policy["summary"][ + "total-successful-session-count"] + new_policy["failed_session_count"] = policy["summary"][ + "total-failure-session-count"] + if "failure-details" in policy: + for details in policy["failure-details"]: + failure_details.append(_parse_smtp_tls_failure_details( + details)) + new_policy["failure_details"] = failure_details + + return new_policy + + except KeyError as e: + raise InvalidSMTPTLSReport(f"Missing required policy field: {e}") + except Exception as e: + raise InvalidSMTPTLSReport(str(e)) + + +def parse_smtp_tls_report_json(report): + """Parses and validates an SMTP TLS report""" + required_fields = ["organization-name", "date-range", + "contact-info", "report-id", + "policies"] + + try: + policies = [] + report = json.loads(report) + for required_field in required_fields: + if required_field not in report: + raise Exception(f"Missing required field: {required_field}]") + if not isinstance(report["policies"], list): + policies_type = type(report["policies"]) + raise InvalidSMTPTLSReport(f"policies must be a list, " + f"not {policies_type}") + for policy in report["policies"]: + policies.append(_parse_smtp_tls_report_policy(policy)) + + new_report = OrderedDict( + organization_name=report["organization-name"], + begin_date=report["date-range"]["start-datetime"], + end_date=report["date-range"]["end-datetime"], + report_id=report["report-id"], + policies=policies + ) + + return new_report + + except KeyError as e: + InvalidSMTPTLSReport(f"Missing required field: {e}") + except Exception as e: + raise InvalidSMTPTLSReport(str(e)) + + +def parsed_smtp_tls_reports_to_csv_rows(reports): + """Converts one oor more parsed SMTP TLS reports into a list of single + layer OrderedDict objects suitable for use in a CSV""" + if type(reports) is OrderedDict: + reports = [reports] + + rows = [] + for report in reports: + common_fields = OrderedDict( + organization_name=report["organization_name"], + begin_date=report["begin_date"], + end_date=report["end_date"], + report_id=report["report_id"] + ) + record = common_fields.copy() + for policy in report["policies"]: + if "policy_strings" in policy: + record["policy_strings"] = "|".join(policy["policy_strings"]) + if "mx_host_patterns" in policy: + record["mx_host_patterns"] = "|".join( + policy["mx_host_patterns"]) + successful_record = record.copy() + successful_record["successful_session_count"] = policy[ + "successful_session_count"] + rows.append(successful_record) + if "failure_details" in policy: + for failure_details in policy["failure_details"]: + failure_record = record.copy() + for key in failure_details.keys(): + failure_record[key] = failure_details[key] + rows.append(failure_record) + + return rows + + +def parsed_smtp_tls_reports_to_csv(reports): + """ + Converts one or more parsed SMTP TLS reports to flat CSV format, including + headers + + Args: + reports: A parsed aggregate report or list of parsed aggregate reports + + Returns: + str: Parsed aggregate report data in flat CSV format, including headers + """ + + fields = ["organization_name", "begin_date", "end_date", "report_id", + "successful_session_count", "failed_session_count", + "policy_domain", "policy_type", "policy_strings", + "mx_host_patterns", "sending_mta_ip", "receiving_ip", + "receiving_mx_hostname", "receiving_mx_helo", + "additional_info_uri", "failure_reason_code"] + + csv_file_object = StringIO(newline="\n") + writer = DictWriter(csv_file_object, fields) + writer.writeheader() + + rows = parsed_smtp_tls_reports_to_csv_rows(reports) + + for row in rows: + writer.writerow(row) + csv_file_object.flush() + + return csv_file_object.getvalue() + + def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False, nameservers=None, timeout=2.0, parallel=False, keep_alive=None): @@ -224,6 +402,8 @@ def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False, """ errors = [] # Parse XML and recover from errors + if isinstance(xml, bytes): + xml = xml.decode(errors='ignore') try: xmltodict.parse(xml)["feedback"] except Exception as e: @@ -368,21 +548,27 @@ def parse_aggregate_report_xml(xml, ip_db_path=None, offline=False, "Unexpected error: {0}".format(error.__str__())) -def extract_xml(input_): +def extract_report(input_): """ - Extracts xml from a zip or gzip file at the given path, file-like object, + Extracts text from a zip or gzip file at the given path, file-like object, or bytes. Args: input_: A path to a file, a file like object, or bytes Returns: - str: The extracted XML + str: The extracted text """ try: + file_object = BytesIO() if type(input_) is str: - file_object = open(input_, "rb") + try: + file_object = BytesIO(b64decode(input_)) + except binascii.Error: + pass + if file_object is None: + file_object = open(input_, "rb") elif type(input_) is bytes: file_object = BytesIO(input_) else: @@ -392,30 +578,31 @@ def extract_xml(input_): file_object.seek(0) if header.startswith(MAGIC_ZIP): _zip = zipfile.ZipFile(file_object) - xml = _zip.open(_zip.namelist()[0]).read().decode(errors='ignore') + report = _zip.open(_zip.namelist()[0]).read().decode( + errors='ignore') elif header.startswith(MAGIC_GZIP): - xml = zlib.decompress(file_object.getvalue(), - zlib.MAX_WBITS | 16).decode(errors='ignore') - elif header.startswith(MAGIC_XML): - xml = file_object.read().decode(errors='ignore') + report = zlib.decompress( + file_object.getvalue(), + zlib.MAX_WBITS | 16).decode(errors='ignore') + elif header.startswith(MAGIC_XML) or header.startswith(MAGIC_JSON): + report = file_object.read().decode(errors='ignore') else: file_object.close() - raise InvalidAggregateReport("Not a valid zip, gzip, or xml file") + raise ParserError("Not a valid zip, gzip, json, or xml file") file_object.close() except FileNotFoundError: - raise InvalidAggregateReport("File was not found") + raise ParserError("File was not found") except UnicodeDecodeError: file_object.close() - raise InvalidAggregateReport("File objects must be opened in binary " - "(rb) mode") + raise ParserError("File objects must be opened in binary (rb) mode") except Exception as error: file_object.close() - raise InvalidAggregateReport( + raise ParserError( "Invalid archive file: {0}".format(error.__str__())) - return xml + return report def parse_aggregate_report_file(_input, offline=False, ip_db_path=None, @@ -439,7 +626,11 @@ def parse_aggregate_report_file(_input, offline=False, ip_db_path=None, Returns: OrderedDict: The parsed DMARC aggregate report """ - xml = extract_xml(_input) + + try: + xml = extract_report(_input) + except Exception as e: + raise InvalidAggregateReport(e) return parse_aggregate_report_xml(xml, ip_db_path=ip_db_path, @@ -819,9 +1010,10 @@ def parse_report_email(input_, offline=False, ip_db_path=None, msg = email.message_from_string(input_) except Exception as e: - raise InvalidDMARCReport(e.__str__()) + raise ParserError(e.__str__()) subject = None feedback_report = None + smtp_tls_report = None sample = None if "From" in msg_headers: logger.info("Parsing mail from {0}".format(msg_headers["From"])) @@ -850,6 +1042,18 @@ def parse_report_email(input_, offline=False, ip_db_path=None, sample = payload elif content_type == "message/rfc822": sample = payload + elif content_type == "application/tlsrpt+json": + if "{" not in payload: + payload = str(b64decode(payload)) + smtp_tls_report = parse_smtp_tls_report_json(payload) + return OrderedDict([("report_type", "smtp_tls"), + ("report", smtp_tls_report)]) + elif content_type == "application/tlsrpt+gzip": + payload = extract_report(payload) + smtp_tls_report = parse_smtp_tls_report_json(payload) + return OrderedDict([("report_type", "smtp_tls"), + ("report", smtp_tls_report)]) + elif content_type == "text/plain": if "A message claiming to be from you has failed" in payload: parts = payload.split("detected.") @@ -863,21 +1067,25 @@ def parse_report_email(input_, offline=False, ip_db_path=None, "".format(fields["received-date"], fields["sender-ip-address"]) sample = parts[1].lstrip() - sample = sample.replace("=\r\n", "") logger.debug(sample) else: try: payload = b64decode(payload) if payload.startswith(MAGIC_ZIP) or \ - payload.startswith(MAGIC_GZIP) or \ - payload.startswith(MAGIC_XML): + payload.startswith(MAGIC_GZIP): + payload = extract_report(payload) ns = nameservers - aggregate_report = parse_aggregate_report_file( + if payload.startswith("{"): + smtp_tls_report = parse_smtp_tls_report_json(payload) + result = OrderedDict([("report_type", "smtp_tls"), + ("report", smtp_tls_report)]) + return result + aggregate_report = parse_aggregate_report_xml( payload, ip_db_path=ip_db_path, offline=offline, nameservers=ns, - dns_timeout=dns_timeout, + timeout=dns_timeout, parallel=parallel, keep_alive=keep_alive) result = OrderedDict([("report_type", "aggregate"), @@ -891,12 +1099,12 @@ def parse_report_email(input_, offline=False, ip_db_path=None, error = 'Message with subject "{0}" ' \ 'is not a valid ' \ 'aggregate DMARC report: {1}'.format(subject, e) - raise InvalidAggregateReport(error) + raise ParserError(error) except Exception as e: error = 'Unable to parse message with ' \ 'subject "{0}": {1}'.format(subject, e) - raise InvalidDMARCReport(error) + raise ParserError(error) if feedback_report and sample: try: @@ -923,7 +1131,7 @@ def parse_report_email(input_, offline=False, ip_db_path=None, if result is None: error = 'Message with subject "{0}" is ' \ - 'not a valid DMARC report'.format(subject) + 'not a valid report'.format(subject) raise InvalidDMARCReport(error) @@ -970,18 +1178,22 @@ def parse_report_file(input_, nameservers=None, dns_timeout=2.0, ("report", report)]) except InvalidAggregateReport: try: - sa = strip_attachment_payloads - results = parse_report_email(content, - ip_db_path=ip_db_path, - offline=offline, - nameservers=nameservers, - dns_timeout=dns_timeout, - strip_attachment_payloads=sa, - parallel=parallel, - keep_alive=keep_alive) - except InvalidDMARCReport: - raise InvalidDMARCReport("Not a valid aggregate or forensic " - "report") + report = parse_smtp_tls_report_json(content) + results = OrderedDict([("report_type", "smtp_tls"), + ("report", report)]) + except InvalidSMTPTLSReport: + try: + sa = strip_attachment_payloads + results = parse_report_email(content, + ip_db_path=ip_db_path, + offline=offline, + nameservers=nameservers, + dns_timeout=dns_timeout, + strip_attachment_payloads=sa, + parallel=parallel, + keep_alive=keep_alive) + except InvalidDMARCReport: + raise ParserError("Not a valid report") return results @@ -1010,6 +1222,7 @@ def get_dmarc_reports_from_mbox(input_, nameservers=None, dns_timeout=2.0, """ aggregate_reports = [] forensic_reports = [] + smtp_tls_reports = [] try: mbox = mailbox.mbox(input_) message_keys = mbox.keys() @@ -1035,12 +1248,15 @@ def get_dmarc_reports_from_mbox(input_, nameservers=None, dns_timeout=2.0, aggregate_reports.append(parsed_email["report"]) elif parsed_email["report_type"] == "forensic": forensic_reports.append(parsed_email["report"]) + elif parsed_email["report_type"] == "smtp_tls": + smtp_tls_reports.append(parsed_email["report"]) except InvalidDMARCReport as error: logger.warning(error.__str__()) except mailbox.NoSuchMailboxError: raise InvalidDMARCReport("Mailbox {0} does not exist".format(input_)) return OrderedDict([("aggregate_reports", aggregate_reports), - ("forensic_reports", forensic_reports)]) + ("forensic_reports", forensic_reports), + ("smtp_tls_reports", smtp_tls_reports)]) def get_dmarc_reports_from_mailbox(connection: MailboxConnection, @@ -1088,20 +1304,25 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, aggregate_reports = [] forensic_reports = [] + smtp_tls_reports = [] aggregate_report_msg_uids = [] forensic_report_msg_uids = [] + smtp_tls_msg_uids = [] aggregate_reports_folder = "{0}/Aggregate".format(archive_folder) forensic_reports_folder = "{0}/Forensic".format(archive_folder) + smtp_tls_reports_folder = "{0}/SMTP-TLS".format(archive_folder) invalid_reports_folder = "{0}/Invalid".format(archive_folder) if results: aggregate_reports = results["aggregate_reports"].copy() forensic_reports = results["forensic_reports"].copy() + smtp_tls_reports = results["smtp_tls_reports"].copy() if not test and create_folders: connection.create_folder(archive_folder) connection.create_folder(aggregate_reports_folder) connection.create_folder(forensic_reports_folder) + connection.create_folder(smtp_tls_reports_folder) connection.create_folder(invalid_reports_folder) messages = connection.fetch_messages(reports_folder, batch_size=batch_size) @@ -1137,7 +1358,10 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, elif parsed_email["report_type"] == "forensic": forensic_reports.append(parsed_email["report"]) forensic_report_msg_uids.append(msg_uid) - except InvalidDMARCReport as error: + elif parsed_email["report_type"] == "smtp_tls": + smtp_tls_reports.append(parsed_email["report"]) + smtp_tls_msg_uids.append(msg_uid) + except ParserError as error: logger.warning(error.__str__()) if not test: if delete: @@ -1153,7 +1377,8 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, if not test: if delete: processed_messages = aggregate_report_msg_uids + \ - forensic_report_msg_uids + forensic_report_msg_uids + \ + smtp_tls_msg_uids number_of_processed_msgs = len(processed_messages) for i in range(number_of_processed_msgs): @@ -1208,8 +1433,29 @@ def get_dmarc_reports_from_mailbox(connection: MailboxConnection, e = "Error moving message UID {0}: {1}".format( msg_uid, e) logger.error("Mailbox error: {0}".format(e)) + if len(smtp_tls_msg_uids) > 0: + message = "Moving SMTP TLS report messages from" + logger.debug( + "{0} {1} to {2}".format(message, + reports_folder, + smtp_tls_reports_folder)) + number_of_smtp_tls_uids = len(smtp_tls_msg_uids) + for i in range(number_of_smtp_tls_uids): + msg_uid = smtp_tls_msg_uids[i] + message = "Moving message" + logger.debug("{0} {1} of {2}: UID {3}".format( + message, + i + 1, smtp_tls_msg_uids, msg_uid)) + try: + connection.move_message(msg_uid, + smtp_tls_reports_folder) + except Exception as e: + e = "Error moving message UID {0}: {1}".format( + msg_uid, e) + logger.error("Mailbox error: {0}".format(e)) results = OrderedDict([("aggregate_reports", aggregate_reports), - ("forensic_reports", forensic_reports)]) + ("forensic_reports", forensic_reports), + ("smtp_tls_reports", smtp_tls_reports)]) total_messages = len(connection.fetch_messages(reports_folder)) @@ -1321,8 +1567,10 @@ def append_csv(filename, csv): def save_output(results, output_directory="output", aggregate_json_filename="aggregate.json", forensic_json_filename="forensic.json", + smtp_tls_json_filename="smtp_tls.json", aggregate_csv_filename="aggregate.csv", - forensic_csv_filename="forensic.csv"): + forensic_csv_filename="forensic.csv", + smtp_tls_csv_filename="smtp_tls.csv"): """ Save report data in the given directory @@ -1331,12 +1579,15 @@ def save_output(results, output_directory="output", output_directory (str): The path to the directory to save in aggregate_json_filename (str): Filename for the aggregate JSON file forensic_json_filename (str): Filename for the forensic JSON file + smtp_tls_json_filename (str): Filename for the SMTP TLS JSON file aggregate_csv_filename (str): Filename for the aggregate CSV file forensic_csv_filename (str): Filename for the forensic CSV file + smtp_tls_csv_filename (str): Filename for the SMTP TLS CSV file """ aggregate_reports = results["aggregate_reports"] forensic_reports = results["forensic_reports"] + smtp_tls_reports = results["smtp_tls_reports"] if os.path.exists(output_directory): if not os.path.isdir(output_directory): @@ -1356,6 +1607,12 @@ def save_output(results, output_directory="output", append_csv(os.path.join(output_directory, forensic_csv_filename), parsed_forensic_reports_to_csv(forensic_reports)) + append_json(os.path.join(output_directory, smtp_tls_json_filename), + smtp_tls_reports) + + append_csv(os.path.join(output_directory, smtp_tls_csv_filename), + parsed_smtp_tls_reports_to_csv(smtp_tls_reports)) + samples_directory = os.path.join(output_directory, "samples") if not os.path.exists(samples_directory): os.makedirs(samples_directory) diff --git a/parsedmarc/cli.py b/parsedmarc/cli.py index 445b3e40..c77ce367 100644 --- a/parsedmarc/cli.py +++ b/parsedmarc/cli.py @@ -81,8 +81,10 @@ def process_reports(reports_): save_output(results, output_directory=opts.output, aggregate_json_filename=opts.aggregate_json_filename, forensic_json_filename=opts.forensic_json_filename, + smtp_tls_json_filename=opts.smtp_tls_json_filename, aggregate_csv_filename=opts.aggregate_csv_filename, - forensic_csv_filename=opts.forensic_csv_filename) + forensic_csv_filename=opts.forensic_csv_filename, + smtp_tls_csv_filename=opts.smtp_tls_csv_filename) if opts.save_aggregate: for report in reports_["aggregate_reports"]: try: @@ -173,6 +175,50 @@ def process_reports(reports_): forensic_reports_) except splunk.SplunkError as e: logger.error("Splunk HEC error: {0}".format(e.__str__())) + if opts.save_smtp_tls: + for report in reports_["smtp_tls_reports"]: + try: + shards = opts.elasticsearch_number_of_shards + replicas = opts.elasticsearch_number_of_replicas + if opts.elasticsearch_hosts: + elastic.save_smtp_tls_report_to_elasticsearch( + report, + index_suffix=opts.elasticsearch_index_suffix, + monthly_indexes=opts.elasticsearch_monthly_indexes, + number_of_shards=shards, + number_of_replicas=replicas) + except elastic.AlreadySaved as warning: + logger.warning(warning.__str__()) + except elastic.ElasticsearchError as error_: + logger.error("Elasticsearch Error: {0}".format( + error_.__str__())) + except InvalidDMARCReport as error_: + logger.error(error_.__str__()) + try: + if opts.kafka_hosts: + kafka_client.save_smtp_tls_reports_to_kafka( + smtp_tls_reports, kafka_smtp_tls_topic) + except Exception as error_: + logger.error("Kafka Error: {0}".format( + error_.__str__())) + try: + if opts.s3_bucket: + s3_client.save_smtp_tls_report_to_s3(report) + except Exception as error_: + logger.error("S3 Error: {0}".format(error_.__str__())) + try: + if opts.syslog_server: + syslog_client.save_smtp_tls_report_to_syslog(report) + except Exception as error_: + logger.error("Syslog Error: {0}".format(error_.__str__())) + if opts.hec: + try: + smtp_tls_reports_ = reports_["smtp_tls_reports"] + if len(smtp_tls_reports_) > 0: + hec_client.save_smtp_tls_reports_to_splunk( + smtp_tls_reports_) + except splunk.SplunkError as e: + logger.error("Splunk HEC error: {0}".format(e.__str__())) if opts.la_dce: try: la_client = loganalytics.LogAnalyticsClient( @@ -182,19 +228,22 @@ def process_reports(reports_): dce=opts.la_dce, dcr_immutable_id=opts.la_dcr_immutable_id, dcr_aggregate_stream=opts.la_dcr_aggregate_stream, - dcr_forensic_stream=opts.la_dcr_forensic_stream + dcr_forensic_stream=opts.la_dcr_forensic_stream, + dcr_smtp_tls_stream=opts.la_dcr_smtp_tls_stream ) la_client.publish_results( reports_, opts.save_aggregate, - opts.save_forensic) + opts.save_forensic, + opts.save_smtp_tls) except loganalytics.LogAnalyticsException as e: - logger.error("Log Analytics error: {0}".format(e.__str__())) + logger.error( + "Log Analytics error: {0}".format(e.__str__())) except Exception as e: logger.error( - "Unknown error occured" + + "Unknown error occurred" + " during the publishing" + - " to Log Analitics: " + + " to Log Analytics: " + e.__str__()) arg_parser = ArgumentParser(description="Parses DMARC reports") @@ -216,12 +265,18 @@ def process_reports(reports_): arg_parser.add_argument("--forensic-json-filename", help="filename for the forensic JSON output file", default="forensic.json") + arg_parser.add_argument("--smtp-tls-json-filename", + help="filename for the SMTP TLS JSON output file", + default="smtp_tls.json") arg_parser.add_argument("--aggregate-csv-filename", help="filename for the aggregate CSV output file", default="aggregate.csv") arg_parser.add_argument("--forensic-csv-filename", help="filename for the forensic CSV output file", default="forensic.csv") + arg_parser.add_argument("--smtp-tls-csv-filename", + help="filename for the SMTP TLS CSV output file", + default="smtp_tls.csv") arg_parser.add_argument("-n", "--nameservers", nargs="+", help="nameservers to query") arg_parser.add_argument("-t", "--dns_timeout", @@ -247,6 +302,7 @@ def process_reports(reports_): aggregate_reports = [] forensic_reports = [] + smtp_tls_reports = [] args = arg_parser.parse_args() @@ -261,6 +317,8 @@ def process_reports(reports_): aggregate_json_filename=args.aggregate_json_filename, forensic_csv_filename=args.forensic_csv_filename, forensic_json_filename=args.forensic_json_filename, + smtp_tls_json_filename=args.smtp_tls_json_filename, + smtp_tls_csv_filename=args.smtp_tls_csv_filename, nameservers=args.nameservers, silent=args.silent, warnings=args.warnings, @@ -269,6 +327,7 @@ def process_reports(reports_): verbose=args.verbose, save_aggregate=False, save_forensic=False, + save_smtp_tls=False, mailbox_reports_folder="INBOX", mailbox_archive_folder="Archive", mailbox_watch=False, @@ -312,6 +371,7 @@ def process_reports(reports_): kafka_password=None, kafka_aggregate_topic=None, kafka_forensic_topic=None, + kafka_smtp_tls_topic=None, kafka_ssl=False, kafka_skip_certificate_verification=False, smtp_host=None, @@ -347,7 +407,8 @@ def process_reports(reports_): la_dce=None, la_dcr_immutable_id=None, la_dcr_aggregate_stream=None, - la_dcr_forensic_stream=None + la_dcr_forensic_stream=None, + la_dcr_smtp_tls_stream=None ) args = arg_parser.parse_args() @@ -374,12 +435,18 @@ def process_reports(reports_): if "forensic_json_filename" in general_config: opts.forensic_json_filename = general_config[ "forensic_json_filename"] + if "smtp_tls_json_filename" in general_config: + opts.smtp_tls_json_filename = general_config[ + "smtp_tls_json_filename"] if "aggregate_csv_filename" in general_config: opts.aggregate_csv_filename = general_config[ "aggregate_csv_filename"] if "forensic_csv_filename" in general_config: opts.forensic_csv_filename = general_config[ "forensic_csv_filename"] + if "smtp_tls_csv_filename" in general_config: + opts.smtp_tls_csv_filename = general_config[ + "smtp_tls_csv_filename"] if "nameservers" in general_config: opts.nameservers = _str_to_list(general_config["nameservers"]) if "dns_timeout" in general_config: @@ -388,6 +455,8 @@ def process_reports(reports_): opts.save_aggregate = general_config["save_aggregate"] if "save_forensic" in general_config: opts.save_forensic = general_config["save_forensic"] + if "save_smtp_tls" in general_config: + opts.save_smtp_tls = general_config["save_smtp_tls"] if "debug" in general_config: opts.debug = general_config.getboolean("debug") if "verbose" in general_config: @@ -656,6 +725,11 @@ def process_reports(reports_): exit(-1) if "forensic_topic" in kafka_config: opts.kafka_username = kafka_config["forensic_topic"] + else: + logger.critical("forensic_topic setting missing from the " + "kafka config section") + if "smtp_tls_topic" in kafka_config: + opts.kafka_username = kafka_config["smtp_tls_topic"] else: logger.critical("forensic_topic setting missing from the " "splunk_hec config section") @@ -774,6 +848,8 @@ def process_reports(reports_): log_analytics_config.get("dcr_aggregate_stream") opts.la_dcr_forensic_stream = \ log_analytics_config.get("dcr_forensic_stream") + opts.la_dcr_smtp_tls_stream = \ + log_analytics_config.get("dcr_smtp_tls_stream") logger.setLevel(logging.ERROR) @@ -805,17 +881,21 @@ def process_reports(reports_): logger.info("Starting parsedmarc") - if opts.save_aggregate or opts.save_forensic: + if opts.save_aggregate or opts.save_forensic or opts.save_smtp_tls: try: if opts.elasticsearch_hosts: es_aggregate_index = "dmarc_aggregate" es_forensic_index = "dmarc_forensic" + es_smtp_tls_index = "smtp_tls" if opts.elasticsearch_index_suffix: suffix = opts.elasticsearch_index_suffix es_aggregate_index = "{0}_{1}".format( es_aggregate_index, suffix) es_forensic_index = "{0}_{1}".format( es_forensic_index, suffix) + es_smtp_tls_index = "{0}_{1}".format( + es_smtp_tls_index, suffix + ) elastic.set_hosts(opts.elasticsearch_hosts, opts.elasticsearch_ssl, opts.elasticsearch_ssl_cert_path, @@ -883,6 +963,7 @@ def process_reports(reports_): kafka_aggregate_topic = opts.kafka_aggregate_topic kafka_forensic_topic = opts.kafka_forensic_topic + kafka_smtp_tls_topic = opts.kafka_smtp_tls_topic file_paths = [] mbox_paths = [] @@ -924,7 +1005,7 @@ def process_reports(reports_): pool.join() for result in results: - if type(result[0]) is InvalidDMARCReport: + if type(result[0]) is ParserError: logger.error("Failed to parse {0} - {1}".format(result[1], result[0])) else: @@ -932,6 +1013,8 @@ def process_reports(reports_): aggregate_reports.append(result[0]["report"]) elif result[0]["report_type"] == "forensic": forensic_reports.append(result[0]["report"]) + elif result[0]["report_type"] == "smtp_tls": + smtp_tls_reports.append(result[0]["report"]) for mbox_path in mbox_paths: strip = opts.strip_attachment_payloads @@ -944,6 +1027,7 @@ def process_reports(reports_): parallel=False) aggregate_reports += reports["aggregate_reports"] forensic_reports += reports["forensic_reports"] + smtp_tls_reports += reports["smtp_tls_reports"] mailbox_connection = None if opts.imap_host: @@ -1034,13 +1118,15 @@ def process_reports(reports_): aggregate_reports += reports["aggregate_reports"] forensic_reports += reports["forensic_reports"] + smtp_tls_reports += reports["smtp_tls_reports"] except Exception: logger.exception("Mailbox Error") exit(1) results = OrderedDict([("aggregate_reports", aggregate_reports), - ("forensic_reports", forensic_reports)]) + ("forensic_reports", forensic_reports), + ("smtp_tls_reports", smtp_tls_reports)]) process_reports(results) diff --git a/parsedmarc/elastic.py b/parsedmarc/elastic.py index 02198c9b..977531e4 100644 --- a/parsedmarc/elastic.py +++ b/parsedmarc/elastic.py @@ -164,6 +164,72 @@ class Index: sample = Object(_ForensicSampleDoc) +class _SMTPTLSFailureDetailsDoc(InnerDoc): + result_type = Text() + sending_mta_ip = Ip() + receiving_mx_helo = Text() + receiving_ip = Ip() + failed_session_count = Integer() + additional_information_uri = Text() + failure_reason_code = Text() + + +class _SMTPTLSPolicyDoc(InnerDoc): + policy_domain = Text() + policy_type = Text() + policy_strings = Text() + mx_host_patterns = Text() + successful_session_count = Integer() + failed_session_count = Integer() + failure_details = Nested(_SMTPTLSFailureDetailsDoc) + + def add_failure_details(self, result_type, ip_address, + receiving_ip, + receiving_mx_helo, + failed_session_count, + receiving_mx_hostname=None, + additional_information_uri=None, + failure_reason_code=None): + self.failure_details.append( + result_type=result_type, + ip_address=ip_address, + receiving_mx_hostname=receiving_mx_hostname, + receiving_mx_helo=receiving_mx_helo, + receiving_ip=receiving_ip, + failed_session_count=failed_session_count, + additional_information=additional_information_uri, + failure_reason_code=failure_reason_code + ) + + +class _SMTPTLSFailureReportDoc(Document): + + class Index: + name = "smtp_tls" + + organization_name = Text() + date_range = Date() + date_begin = Date() + date_end = Date() + contact_info = Text() + report_id = Text() + policies = Nested(_SMTPTLSPolicyDoc) + + def add_policy(self, policy_type, policy_domain, + successful_session_count, + failed_session_count, + policy_string=None, + mx_host_patterns=None, + failure_details=None): + self.policies.append(policy_type=policy_type, + policy_domain=policy_domain, + successful_session_count=successful_session_count, + failed_session_count=failed_session_count, + policy_string=policy_string, + mx_host_patterns=mx_host_patterns, + failure_details=failure_details) + + class AlreadySaved(ValueError): """Raised when a report to be saved matches an existing report""" @@ -553,3 +619,127 @@ def save_forensic_report_to_elasticsearch(forensic_report, except KeyError as e: raise InvalidForensicReport( "Forensic report missing required field: {0}".format(e.__str__())) + + +def save_smtp_tls_report_to_elasticsearch(report, + index_suffix=None, + monthly_indexes=False, + number_of_shards=1, + number_of_replicas=0): + """ + Saves a parsed SMTP TLS report to elasticSearch + + Args: + report (OrderedDict): A parsed SMTP TLS report + index_suffix (str): The suffix of the name of the index to save to + monthly_indexes (bool): Use monthly indexes instead of daily indexes + number_of_shards (int): The number of shards to use in the index + number_of_replicas (int): The number of replicas to use in the index + + Raises: + AlreadySaved + """ + logger.info("Saving aggregate report to Elasticsearch") + org_name = report["org_name"] + report_id = report["report_id"] + begin_date = human_timestamp_to_datetime(report["begin_date"], + to_utc=True) + end_date = human_timestamp_to_datetime(report["end_date"], + to_utc=True) + begin_date_human = begin_date.strftime("%Y-%m-%d %H:%M:%SZ") + end_date_human = end_date.strftime("%Y-%m-%d %H:%M:%SZ") + if monthly_indexes: + index_date = begin_date.strftime("%Y-%m") + else: + index_date = begin_date.strftime("%Y-%m-%d") + report["begin_date"] = begin_date + report["end_date"] = end_date + + org_name_query = Q(dict(match_phrase=dict(org_name=org_name))) + report_id_query = Q(dict(match_phrase=dict(report_id=report_id))) + begin_date_query = Q(dict(match=dict(date_begin=begin_date))) + end_date_query = Q(dict(match=dict(date_end=end_date))) + + if index_suffix is not None: + search = Search(index="smtp_tls_{0}*".format(index_suffix)) + else: + search = Search(index="smtp_tls") + query = org_name_query & report_id_query + query = query & begin_date_query & end_date_query + search.query = query + + try: + existing = search.execute() + except Exception as error_: + raise ElasticsearchError("Elasticsearch's search for existing report \ + error: {}".format(error_.__str__())) + + if len(existing) > 0: + raise AlreadySaved(f"An SMTP TLS report ID {report_id} from " + f" {org_name} with a date range of " + f"{begin_date_human} UTC to " + f"{end_date_human} UTC already " + "exists in Elasticsearch") + + index = "smtp_tls" + if index_suffix: + index = "{0}_{1}".format(index, index_suffix) + index = "{0}-{1}".format(index, index_date) + index_settings = dict(number_of_shards=number_of_shards, + number_of_replicas=number_of_replicas) + + smtp_tls_doc = _SMTPTLSFailureReportDoc( + organization_name=report["organization_name"], + date_range=[report["date_begin"], report["date_end"]], + date_begin=report["date_begin"], + date_end=report["date_end"], + contact_info=report["contact_info"], + report_id=report["report_id"] + ) + + for policy in report['policies']: + policy_strings = None + mx_host_patterns = None + if "policy_strings" in policy: + policy_strings = policy["policy_strings"] + if "mx_host_patterns" in policy: + mx_host_patterns = policy["mx_host_patterns"] + policy_doc = _SMTPTLSPolicyDoc( + policy_domain=policy["policy_domain"], + policy_type=policy["policy_type"], + policy_string=policy_strings, + mx_host_patterns=mx_host_patterns + ) + if "failure_details" in policy: + failure_details = policy["failure_details"] + receiving_mx_hostname = None + additional_information_uri = None + failure_reason_code = None + if "receiving_mx_hostname" in failure_details: + receiving_mx_hostname = failure_details[ + "receiving_mx_hostname"] + if "additional_information_uri" in failure_details: + additional_information_uri = failure_details[ + "additional_information_uri"] + if "failure_reason_code" in failure_details: + failure_reason_code = failure_details["failure_reason_code"] + policy_doc.add_failure_details( + result_type=failure_details["result_type"], + ip_address=failure_details["ip_address"], + receiving_ip=failure_details["receiving_ip"], + receiving_mx_helo=failure_details["receiving_mx_helo"], + failed_session_count=failure_details["failed_session_count"], + receiving_mx_hostname=receiving_mx_hostname, + additional_information_uri=additional_information_uri, + failure_reason_code=failure_reason_code + ) + smtp_tls_doc.policies.append(policy_doc) + + create_indexes([index], index_settings) + smtp_tls_doc.meta.index = index + + try: + smtp_tls_doc.save() + except Exception as e: + raise ElasticsearchError( + "Elasticsearch error: {0}".format(e.__str__())) diff --git a/parsedmarc/kafkaclient.py b/parsedmarc/kafkaclient.py index 02bf833a..68eef1f4 100644 --- a/parsedmarc/kafkaclient.py +++ b/parsedmarc/kafkaclient.py @@ -161,3 +161,36 @@ def save_forensic_reports_to_kafka(self, forensic_reports, forensic_topic): except Exception as e: raise KafkaError( "Kafka error: {0}".format(e.__str__())) + + def save_smtp_tls_reports_to_kafka(self, smtp_tls_reports, smtp_tls_topic): + """ + Saves SMTP TLS reports to Kafka, sends individual + records (slices) since Kafka requires messages to be <= 1MB + by default. + + Args: + smtp_tls_reports (list): A list of forensic report dicts + to save to Kafka + smtp_tls_topic (str): The name of the Kafka topic + + """ + if isinstance(smtp_tls_reports, dict): + smtp_tls_reports = [smtp_tls_reports] + + if len(smtp_tls_reports) < 1: + return + + try: + logger.debug("Saving forensic reports to Kafka") + self.producer.send(smtp_tls_topic, smtp_tls_reports) + except UnknownTopicOrPartitionError: + raise KafkaError( + "Kafka error: Unknown topic or partition on broker") + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) + try: + self.producer.flush() + except Exception as e: + raise KafkaError( + "Kafka error: {0}".format(e.__str__())) diff --git a/parsedmarc/loganalytics.py b/parsedmarc/loganalytics.py index 9ca1496c..13996132 100644 --- a/parsedmarc/loganalytics.py +++ b/parsedmarc/loganalytics.py @@ -36,6 +36,10 @@ class LogAnalyticsConfig(): The Stream name where the Forensic DMARC reports need to be pushed. + dcr_smtp_tls_stream (str): + The Stream name where + the SMTP TLS Reports + need to be pushed. """ def __init__( self, @@ -45,7 +49,8 @@ def __init__( dce: str, dcr_immutable_id: str, dcr_aggregate_stream: str, - dcr_forensic_stream: str): + dcr_forensic_stream: str, + dcr_smtp_tls_stream: str): self.client_id = client_id self.client_secret = client_secret self.tenant_id = tenant_id @@ -53,6 +58,7 @@ def __init__( self.dcr_immutable_id = dcr_immutable_id self.dcr_aggregate_stream = dcr_aggregate_stream self.dcr_forensic_stream = dcr_forensic_stream + self.dcr_smtp_tls_stream = dcr_smtp_tls_stream class LogAnalyticsClient(object): @@ -69,7 +75,8 @@ def __init__( dce: str, dcr_immutable_id: str, dcr_aggregate_stream: str, - dcr_forensic_stream: str): + dcr_forensic_stream: str, + dcr_smtp_tls_stream: str): self.conf = LogAnalyticsConfig( client_id=client_id, client_secret=client_secret, @@ -77,7 +84,8 @@ def __init__( dce=dce, dcr_immutable_id=dcr_immutable_id, dcr_aggregate_stream=dcr_aggregate_stream, - dcr_forensic_stream=dcr_forensic_stream + dcr_forensic_stream=dcr_forensic_stream, + dcr_smtp_tls_stream=dcr_smtp_tls_stream ) if ( not self.conf.client_id or @@ -96,7 +104,7 @@ def publish_json( dcr_stream: str): """ Background function to publish given - DMARC reprot to specific Data Collection Rule. + DMARC report to specific Data Collection Rule. Args: results (list): @@ -117,9 +125,11 @@ def publish_results( self, results, save_aggregate: bool, - save_forensic: bool): + save_forensic: bool, + save_smtp_tls: bool + ): """ - Function to publish DMARC reports to Log Analytics + Function to publish DMARC and/or SMTP TLS reports to Log Analytics via Data Collection Rules (DCR). Look below for docs: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview @@ -131,6 +141,8 @@ def publish_results( Whether Aggregate reports can be saved into Log Analytics save_forensic (bool): Whether Forensic reports can be saved into Log Analytics + save_smtp_tls (bool): + Whether Forensic reports can be saved into Log Analytics """ conf = self.conf credential = ClientSecretCredential( @@ -161,3 +173,14 @@ def publish_results( logs_client, conf.dcr_forensic_stream) logger.info("Successfully pushed forensic reports.") + if ( + results['smtp_tls_reports'] and + conf.dcr_smtp_tls_stream and + len(results['smtp_tls_reports']) > 0 and + save_smtp_tls): + logger.info("Publishing SMTP TLS reports.") + self.publish_json( + results['smtp_tls_reports'], + logs_client, + conf.dcr_smtp_tls_stream) + logger.info("Successfully pushed SMTP TLS reports.") diff --git a/parsedmarc/s3.py b/parsedmarc/s3.py index e8269abc..d7060467 100644 --- a/parsedmarc/s3.py +++ b/parsedmarc/s3.py @@ -48,11 +48,18 @@ def save_aggregate_report_to_s3(self, report): def save_forensic_report_to_s3(self, report): self.save_report_to_s3(report, 'forensic') + def save_smtp_tls_report_to_s3(self, report): + self.save_report_to_s3(report, "smtp_tls") + def save_report_to_s3(self, report, report_type): - report_date = human_timestamp_to_datetime( - report["report_metadata"]["begin_date"] - ) - report_id = report["report_metadata"]["report_id"] + if report_type == "smtp_tls": + report_date = report["begin_date"] + report_id = report["report_id"] + else: + report_date = human_timestamp_to_datetime( + report["report_metadata"]["begin_date"] + ) + report_id = report["report_metadata"]["report_id"] path_template = "{0}/{1}/year={2}/month={3:02d}/day={4:02d}/{5}.json" object_path = path_template.format( self.bucket_path, diff --git a/parsedmarc/splunk.py b/parsedmarc/splunk.py index 36285238..0e0ca61e 100644 --- a/parsedmarc/splunk.py +++ b/parsedmarc/splunk.py @@ -7,7 +7,7 @@ from parsedmarc import __version__ from parsedmarc.log import logger -from parsedmarc.utils import human_timestamp_to_timestamp +from parsedmarc.utils import human_timestamp_to_unix_timestamp urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) @@ -104,7 +104,7 @@ def save_aggregate_reports_to_splunk(self, aggregate_reports): "spf"] data["sourcetype"] = "dmarc:aggregate" - timestamp = human_timestamp_to_timestamp( + timestamp = human_timestamp_to_unix_timestamp( new_report["begin_date"]) data["time"] = timestamp data["event"] = new_report.copy() @@ -140,7 +140,7 @@ def save_forensic_reports_to_splunk(self, forensic_reports): for report in forensic_reports: data = self._common_data.copy() data["sourcetype"] = "dmarc:forensic" - timestamp = human_timestamp_to_timestamp( + timestamp = human_timestamp_to_unix_timestamp( report["arrival_date_utc"]) data["time"] = timestamp data["event"] = report.copy() @@ -156,3 +156,40 @@ def save_forensic_reports_to_splunk(self, forensic_reports): raise SplunkError(e.__str__()) if response["code"] != 0: raise SplunkError(response["text"]) + + def save_smtp_tls_reports_to_splunk(self, reports): + """ + Saves aggregate DMARC reports to Splunk + + Args: + reports: A list of SMTP TLS report dictionaries + to save in Splunk + + """ + logger.debug("Saving SMTP TLS reports to Splunk") + if isinstance(reports, dict): + reports = [reports] + + if len(reports) < 1: + return + + data = self._common_data.copy() + json_str = "" + for report in reports: + data["sourcetype"] = "smtp:tls" + timestamp = human_timestamp_to_unix_timestamp( + report["begin_date"]) + data["time"] = timestamp + data["event"] = report.copy() + json_str += "{0}\n".format(json.dumps(data)) + + if not self.session.verify: + logger.debug("Skipping certificate verification for Splunk HEC") + try: + response = self.session.post(self.url, data=json_str, + timeout=self.timeout) + response = response.json() + except Exception as e: + raise SplunkError(e.__str__()) + if response["code"] != 0: + raise SplunkError(response["text"]) diff --git a/parsedmarc/syslog.py b/parsedmarc/syslog.py index 0aa7e1c5..0fc47002 100644 --- a/parsedmarc/syslog.py +++ b/parsedmarc/syslog.py @@ -5,7 +5,7 @@ import json from parsedmarc import parsed_aggregate_reports_to_csv_rows, \ - parsed_forensic_reports_to_csv_rows + parsed_forensic_reports_to_csv_rows, parsed_smtp_tls_reports_to_csv_rows class SyslogClient(object): @@ -35,3 +35,8 @@ def save_forensic_report_to_syslog(self, forensic_reports): rows = parsed_forensic_reports_to_csv_rows(forensic_reports) for row in rows: self.logger.info(json.dumps(row)) + + def save_smtp_tls_report_to_syslog(self, smtp_tls_reports): + rows = parsed_smtp_tls_reports_to_csv_rows(smtp_tls_reports) + for row in rows: + self.logger.info(json.dumps(row)) diff --git a/parsedmarc/utils.py b/parsedmarc/utils.py index e2728487..313d3a81 100644 --- a/parsedmarc/utils.py +++ b/parsedmarc/utils.py @@ -218,7 +218,7 @@ def human_timestamp_to_datetime(human_timestamp, to_utc=False): return dt.astimezone(timezone.utc) if to_utc else dt -def human_timestamp_to_timestamp(human_timestamp): +def human_timestamp_to_unix_timestamp(human_timestamp): """ Converts a human-readable timestamp into a UNIX timestamp diff --git a/pyproject.toml b/pyproject.toml index d6a098ff..e5095971 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -47,7 +47,7 @@ dependencies = [ "kafka-python>=1.4.4", "lxml>=4.4.0", "mailsuite>=1.6.1", - "msgraph-core>=0.2.2", + "msgraph-core==0.2.2", "publicsuffixlist>=0.10.0", "requests>=2.22.0", "tqdm>=4.31.1", diff --git a/requirements.txt b/requirements.txt index 30dfa3f1..7ebb6701 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,7 +27,7 @@ sphinx_rtd_theme>=0.4.3 codecov>=2.0.15 lxml>=4.4.0 boto3>=1.16.63 -msgraph-core>=0.2.2 +msgraph-core==0.2.2 azure-identity>=1.8.0 azure-monitor-ingestion>=1.0.0 google-api-core>=2.4.0 @@ -40,3 +40,4 @@ myst-parser>=0.18.0 myst-parser[linkify] requests bs4 +pytest diff --git a/samples/smtp_tls/smtp_tls.json b/samples/smtp_tls/smtp_tls.json new file mode 100644 index 00000000..704f2934 --- /dev/null +++ b/samples/smtp_tls/smtp_tls.json @@ -0,0 +1,43 @@ +{ + "organization-name":"Example Inc.", + "date-range":{ + "start-datetime":"2024-01-09T00:00:00Z", + "end-datetime":"2024-01-09T23:59:59Z" + }, + "contact-info":"smtp-tls-reporting@example.com", + "report-id":"2024-01-09T00:00:00Z_example.com", + "policies":[ + { + "policy":{ + "policy-type":"sts", + "policy-string":[ + "version: STSv1", + "mode: testing", + "mx: example.com", + "max_age: 86400" + ], + "policy-domain":"example.com" + }, + "summary":{ + "total-successful-session-count":0, + "total-failure-session-count":3 + }, + "failure-details":[ + { + "result-type":"validation-failure", + "sending-mta-ip":"209.85.222.201", + "receiving-ip":"173.212.201.41", + "receiving-mx-hostname":"example.com", + "failed-session-count":2 + }, + { + "result-type":"validation-failure", + "sending-mta-ip":"209.85.208.176", + "receiving-ip":"173.212.201.41", + "receiving-mx-hostname":"example.com", + "failed-session-count":1 + } + ] + } + ] +} \ No newline at end of file diff --git a/tests.py b/tests.py index 1f311ecf..b02c55f3 100644 --- a/tests.py +++ b/tests.py @@ -41,7 +41,7 @@ def testAggregateSamples(self): def testEmptySample(self): """Test empty/unparasable report""" - with self.assertRaises(parsedmarc.InvalidDMARCReport): + with self.assertRaises(parsedmarc.ParserError): parsedmarc.parse_report_file('samples/empty.xml') def testForensicSamples(self):