Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1alpha sample to bulk update alerts #175

Merged
merged 1 commit into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions detect/v1alpha/bulk_update_alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#!/usr/bin/env python3

# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
r"""Executable and reusable sample for bulk updating alerts.

The file provided to the --alert_ids_file parameter should have one alert
ID per line like so:
```
de_ad9d2771-a567-49ee-6452-1b2db13c1d33
de_3c2e2556-aba1-a253-7518-b4ddb666cc32
```
Usage:
python -m alerts.v1alpha.bulk_update_alerts \
--project_id=<PROJECT_ID> \
--project_instance=<PROJECT_INSTANCE> \
--alert_ids_file=<PATH_TO_FILE> \
--confidence_score=<CONFIDENCE_SCORE> \
--priority=<PRIORITY> \
--reason=<REASON> \
--reputation=<REPUTATION> \
--priority=<PRIORITY> \
--status=<STATUS> \
--verdict=<VERDICT> \
--risk_score=<RISK_SCORE> \
--disregarded=<DISREGARDED> \
--severity=<SEVERITY> \
--comment=<COMMENT> \
--root_cause=<ROOT_CAUSE> \
--severity_display=<SEVERITY_DISPLAY>

# pylint: disable=line-too-long
API reference:
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/projects.locations.instances.legacy/legacyUpdateAlert
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Priority
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Reason
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Reputation
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Priority
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Status
https://cloud.google.com/chronicle/docs/reference/rest/v1alpha/Noun#Verdict
"""
# pylint: enable=line-too-long

import json

from common import chronicle_auth

from . import update_alert


CHRONICLE_API_BASE_URL = "https://chronicle.googleapis.com"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
]
DEFAULT_FEEDBACK = {
"comment": "automated cleanup",
"reason": "REASON_MAINTENANCE",
"reputation": "REPUTATION_UNSPECIFIED",
"root_cause": "Other",
"status": "CLOSED",
"verdict": "VERDICT_UNSPECIFIED",
}


if __name__ == "__main__":
parser = update_alert.get_update_parser()
parser.add_argument(
"--alert_ids_file", type=str, required=True,
help="File with one alert ID per line."
)
parser.set_defaults(
comment=DEFAULT_FEEDBACK["comment"],
reason=DEFAULT_FEEDBACK["reason"],
reputation=DEFAULT_FEEDBACK["reputation"],
root_cause=DEFAULT_FEEDBACK["root_cause"],
status=DEFAULT_FEEDBACK["status"],
verdict=DEFAULT_FEEDBACK["verdict"],
)
args = parser.parse_args()

# raise error if required args are not present
update_alert.check_args(parser, args)

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
SCOPES,
)
with open(args.alert_ids_file) as fh:
for alert_id in fh:
a_list = update_alert.update_alert(
auth_session,
args.project_id,
args.project_instance,
args.region,
alert_id.strip(),
args.confidence_score,
args.reason,
args.reputation,
args.priority,
args.status,
args.verdict,
args.risk_score,
args.disregarded,
args.severity,
args.comment,
args.root_cause,
)
print(json.dumps(a_list, indent=2))
231 changes: 128 additions & 103 deletions detect/v1alpha/update_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,130 @@
)


def get_update_parser():
"""Returns an argparse.ArgumentParser for the update_alert command."""
parser = argparse.ArgumentParser()
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
parser.add_argument(
"--comment",
type=str,
required=False,
default=None,
help="Analyst comment.",
)
parser.add_argument(
"--confidence_score",
type=int,
required=False,
default=None,
help="confidence score [1-100] of the finding",
)
parser.add_argument(
"--disregarded",
type=bool,
required=False,
default=None,
help="Analyst disregard (or un-disregard) the event",
)
parser.add_argument(
"--priority",
choices=PRIORITY_ENUM,
required=False,
default=None,
help="alert priority.",
)
parser.add_argument(
"--reason",
choices=REASON_ENUM,
required=False,
default=None,
help="reason for closing an Alert",
)
parser.add_argument(
"--reputation",
choices=REPUTATION_ENUM,
required=False,
default=None,
help="A categorization of the finding as useful or not useful",
)
parser.add_argument(
"--risk_score",
type=int,
required=False,
default=None,
help="risk score [0-100] of the finding",
)
parser.add_argument(
"--root_cause",
type=str,
required=False,
default=None,
help="Alert root cause.",
)
parser.add_argument(
"--status",
choices=STATUS_ENUM,
required=False,
default=None,
help="alert status",
)
parser.add_argument(
"--verdict",
choices=VERDICT_ENUM,
required=False,
default=None,
help="a verdict on whether the finding reflects a security incident",
)
parser.add_argument(
"--severity",
type=int,
required=False,
default=None,
help="severity score [0-100] of the finding",
)
return parser


def check_args(
parser: argparse.ArgumentParser,
args_to_check: argparse.Namespace):
"""Checks if at least one of the required arguments is provided.

Args:
parser: instance of argparse.ArgumentParser (to raise error if needed).
args_to_check: instance of argparse.Namespace with the arguments to check.
"""
if not any(
[
args_to_check.comment or args.comment == "", # pylint: disable=g-explicit-bool-comparison
args_to_check.disregarded,
args_to_check.priority,
args_to_check.reason,
args_to_check.reputation,
args_to_check.risk_score or args.risk_score == 0,
args_to_check.root_cause or args.root_cause == "", # pylint: disable=g-explicit-bool-comparison
args_to_check.severity or args.severity == 0,
args_to_check.status,
args_to_check.verdict,
]
):
parser.error("At least one of the arguments "
"--comment, "
"--disregarded, "
"--priority, "
"--reason, "
"--reputation, "
"--risk_score, "
"--root_cause, "
"--severity, "
"--status, "
"or --verdict "
"is required.")


def update_alert(
http_session: requests.AuthorizedSession,
proj_id: str,
Expand Down Expand Up @@ -189,114 +313,15 @@ def update_alert(


if __name__ == "__main__":
parser = argparse.ArgumentParser()
chronicle_auth.add_argument_credentials_file(parser)
project_instance.add_argument_project_instance(parser)
project_id.add_argument_project_id(parser)
regions.add_argument_region(parser)
parser.add_argument(
main_parser = get_update_parser()
main_parser.add_argument(
"--alert_id", type=str, required=True,
help="identifier for the alert"
)
parser.add_argument(
"--confidence_score",
type=int,
required=False,
default=None,
help="confidence score [1-100] of the finding",
)
parser.add_argument(
"--priority",
choices=PRIORITY_ENUM,
required=False,
default=None,
help="alert priority.",
)
parser.add_argument(
"--reason",
choices=REASON_ENUM,
required=False,
default=None,
help="reason for closing an Alert",
)
parser.add_argument(
"--reputation",
choices=REPUTATION_ENUM,
required=False,
default=None,
help="A categorization of the finding as useful or not useful",
)
parser.add_argument(
"--status",
choices=STATUS_ENUM,
required=False,
default=None,
help="alert status",
)
parser.add_argument(
"--verdict",
choices=VERDICT_ENUM,
required=False,
default=None,
help="a verdict on whether the finding reflects a security incident",
)
parser.add_argument(
"--risk_score",
type=int,
required=False,
default=None,
help="risk score [0-100] of the finding",
)
parser.add_argument(
"--disregarded",
type=bool,
required=False,
default=None,
help="Analyst disregard (or un-disregard) the event",
)
parser.add_argument(
"--severity",
type=int,
required=False,
default=None,
help="severity score [0-100] of the finding",
)
parser.add_argument(
"--comment",
type=str,
required=False,
default=None,
help="Analyst comment.",
)
parser.add_argument(
"--root_cause",
type=str,
required=False,
default=None,
help="Alert root cause.",
)

args = parser.parse_args()
args = main_parser.parse_args()

# Check if at least one of the specific arguments is provided
if not any(
[
args.reason,
args.reputation,
args.priority,
args.status,
args.verdict,
args.risk_score or args.risk_score == 0,
args.disregarded,
args.severity or args.severity == 0,
args.comment or args.comment == "", # pylint: disable=g-explicit-bool-comparison
args.root_cause or args.root_cause == "", # pylint: disable=g-explicit-bool-comparison
]
):
parser.error("At least one of the arguments --reputation, --reason, "
"--priority, --status, --verdict, --risk_score, "
"--disregarded, --severity, --comment, "
"or --root_cause is required.")
check_args(main_parser, args)

auth_session = chronicle_auth.initialize_http_session(
args.credentials_file,
Expand Down
Loading