Skip to content

Commit

Permalink
New commands: "shodan alert export" and "shodan alert import" to help…
Browse files Browse the repository at this point in the history
… backup/ restore network monitoring configurations
  • Loading branch information
achillean committed Feb 22, 2022
1 parent 1d6e91d commit 99756e2
Showing 1 changed file with 83 additions and 0 deletions.
83 changes: 83 additions & 0 deletions shodan/cli/alert.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import click
import csv
import gzip
import json
import shodan

from collections import defaultdict
Expand Down Expand Up @@ -221,6 +223,87 @@ def batch(iterable, size=1):
click.secho('Successfully downloaded results into: {}'.format(filename), fg='green')


@alert.command(name='export')
@click.option('--filename', help='Name of the output file', default='shodan-alerts.json.gz', type=str)
def alert_export(filename):
"""Export the configuration of monitored networks/ IPs to be used by ``shodan alert import``."""
# Setup the API wrapper
key = get_api_key()
api = shodan.Shodan(key)

try:
# Get the list of alerts for the user
click.echo('Looking up alert information...')
alerts = api.alerts()

# Create the output file
click.echo('Writing alerts to file: {}'.format(filename))
with gzip.open(filename, 'wt', encoding='utf-8') as fout:
json.dump(alerts, fout)
except Exception as e:
raise click.ClickException(e.value)

click.secho('Successfully exported monitored networks', fg='green')


@alert.command(name='import')
@click.argument('filename', metavar='<export file>')
def alert_import(filename):
"""Export the configuration of monitored networks/ IPs to be used by ``shodan alert import``."""
# Setup the API wrapper
key = get_api_key()
api = shodan.Shodan(key)

# A mapping of the old notifier IDs to the new ones
notifier_map = {}

try:
# Loading the alerts
click.echo('Loading alerts from: {}'.format(filename))
with gzip.open(filename, 'rt', encoding='utf-8') as fin:
alerts = json.load(fin)

for item in alerts:
# Create the alert
click.echo('Creating: {}'.format(item['name']))
alert = api.create_alert(item['name'], item['filters']['ip'])

# Enable any triggers
if item.get('triggers', {}):
triggers = ','.join(item['triggers'].keys())

api.enable_alert_trigger(alert['id'], triggers)

# Add any whitelisted services for this trigger
for trigger, info in item['triggers'].items():
if info.get('ignore', []):
for whitelist in info['ignore']:
api.ignore_alert_trigger_notification(alert['id'], trigger, whitelist['ip'], whitelist['port'])

# Enable the notifiers
for prev_notifier in item.get('notifiers', []):
# We don't need to do anything for the default notifier as that
# uses the account's email address automatically.
if prev_notifier['id'] == 'default':
continue

# Get the new notifier based on the ID of the old one
notifier = notifier_map.get(prev_notifier['id'])

# Create the notifier if it doesn't yet exist
if notifier is None:
notifier = api.notifier.create(prev_notifier['provider'], prev_notifier['args'], description=prev_notifier['description'])

# Add it to our map of old notifier IDs to new notifiers
notifier_map[prev_notifier['id']] = notifier

api.add_alert_notifier(alert['id'], notifier['id'])
except Exception as e:
raise click.ClickException(e.value)

click.secho('Successfully imported monitored networks', fg='green')


@alert.command(name='info')
@click.argument('alert', metavar='<alert id>')
def alert_info(alert):
Expand Down

0 comments on commit 99756e2

Please sign in to comment.