Skip to content

Commit

Permalink
Merge pull request #4200 from freelawproject/612-recap-search-alerts-…
Browse files Browse the repository at this point in the history
…percolator

612 Introduced Percolator Recap Search Alerts
  • Loading branch information
mlissner authored Oct 24, 2024
2 parents a0aa171 + 363527c commit 0b10713
Show file tree
Hide file tree
Showing 29 changed files with 3,526 additions and 537 deletions.
10 changes: 8 additions & 2 deletions cl/alerts/management/commands/cl_send_recap_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import pytz
from asgiref.sync import async_to_sync
from django.contrib.auth.models import User
from django.contrib.contenttypes.models import ContentType
from django.http import QueryDict
from django.utils import timezone
from elasticsearch import Elasticsearch
Expand All @@ -21,8 +22,8 @@
from cl.alerts.utils import (
TaskCompletionStatus,
add_document_hit_to_alert_set,
alert_hits_limit_reached,
has_document_alert_hit_been_triggered,
scheduled_alert_hits_limit_reached,
)
from cl.api.models import WebhookEventType
from cl.api.tasks import send_search_alert_webhook_es
Expand Down Expand Up @@ -625,6 +626,9 @@ def query_and_schedule_alerts(
"""

alert_users = User.objects.filter(alerts__rate=rate).distinct()
docket_content_type = ContentType.objects.get(
app_label="search", model="docket"
)
for user in alert_users:
alerts = user.alerts.filter(rate=rate, alert_type=SEARCH_TYPES.RECAP)
logger.info(f"Running '{rate}' alerts for user '{user}': {alerts}")
Expand All @@ -644,7 +648,7 @@ def query_and_schedule_alerts(
continue
for hit in results_to_send:
# Schedule DAILY, WEEKLY and MONTHLY Alerts
if alert_hits_limit_reached(alert.pk, user.pk):
if scheduled_alert_hits_limit_reached(alert.pk, user.pk):
# Skip storing hits for this alert-user combination because
# the SCHEDULED_ALERT_HITS_LIMIT has been reached.
continue
Expand All @@ -662,6 +666,8 @@ def query_and_schedule_alerts(
user=user,
alert=alert,
document_content=hit_copy.to_dict(),
content_type=docket_content_type,
object_id=hit_copy.docket_id,
)
)
# Send webhooks
Expand Down
36 changes: 36 additions & 0 deletions cl/alerts/management/commands/cl_send_rt_percolator_alerts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import time

from cl.alerts.management.commands.cl_send_scheduled_alerts import (
query_and_send_alerts_by_rate,
)
from cl.alerts.models import Alert
from cl.lib.command_utils import VerboseCommand


class Command(VerboseCommand):
help = """Send real-time alerts scheduled by the Percolator every 5 minutes.
This process is performed to accumulate alerts that can be grouped into a
single email if they belong to the same user. """

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.options = {}

def add_arguments(self, parser):
parser.add_argument(
"--testing-mode",
action="store_true",
help="Use this flag for testing purposes.",
)

def handle(self, *args, **options):
super().handle(*args, **options)
testing_mode = options.get("testing_mode", False)
while True:
query_and_send_alerts_by_rate(Alert.REAL_TIME)
if testing_mode:
# Perform only 1 iteration for testing purposes.
break

# Wait for 5 minutes.
time.sleep(300)
130 changes: 86 additions & 44 deletions cl/alerts/management/commands/cl_send_scheduled_alerts.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
from collections import defaultdict
from typing import DefaultDict
from typing import Any, DefaultDict

import waffle
from asgiref.sync import async_to_sync
from django.conf import settings

from cl.alerts.models import (
SCHEDULED_ALERT_HIT_STATUS,
Expand All @@ -13,6 +14,8 @@
from cl.alerts.tasks import send_search_alert_emails
from cl.alerts.utils import InvalidDateError, override_alert_query
from cl.lib.command_utils import VerboseCommand, logger
from cl.search.models import SEARCH_TYPES
from cl.search.types import ESDictDocument
from cl.stats.utils import tally_stat

DAYS_TO_DELETE = 90
Expand Down Expand Up @@ -50,6 +53,30 @@ def get_cut_off_date(rate: str, d: datetime.date) -> datetime.date | None:
return cut_off_date


def merge_alert_child_documents(
documents: list[ESDictDocument],
) -> ESDictDocument:
"""Merge multiple child hits within the same main document.
:param documents: A list of document hits.
:return: A document dictionary where documents has been merged.
"""

main_document = documents[0].copy()
child_docs: list[ESDictDocument] = []
for doc in documents:
if "child_docs" in doc:
child_docs.extend(doc["child_docs"])
if len(child_docs) >= settings.RECAP_CHILD_HITS_PER_RESULT:
# Nested child limits reached. Set child_remaining True to show
# the "Show the view additional hits button"
main_document["child_remaining"] = True
break

if child_docs:
main_document["child_docs"] = child_docs
return main_document


def query_and_send_alerts_by_rate(rate: str) -> None:
"""Query and send alerts per user.
Expand All @@ -59,58 +86,77 @@ def query_and_send_alerts_by_rate(rate: str) -> None:

alerts_sent_count = 0
now_time = datetime.datetime.now()
alerts_to_update = []
scheduled_hits_rate = ScheduledAlertHit.objects.filter(
alert__rate=rate, hit_status=SCHEDULED_ALERT_HIT_STATUS.SCHEDULED
).select_related("user", "alert")

# Create a nested dictionary structure to hold the groups.
grouped_hits: DefaultDict[
int, DefaultDict[Alert, list[ScheduledAlertHit]]
] = defaultdict(lambda: defaultdict(list))

# Group scheduled hits by User and Alert.
for hit in scheduled_hits_rate:
user_id = hit.user.pk
alert = hit.alert
grouped_hits[user_id][alert].append(hit)

for user_id, alerts in grouped_hits.items():
# Get unique alert users with scheduled alert hits
user_ids = (
ScheduledAlertHit.objects.filter(
alert__rate=rate, hit_status=SCHEDULED_ALERT_HIT_STATUS.SCHEDULED
)
.values_list("user", flat=True)
.distinct()
)

for user_id in user_ids:
# Query ScheduledAlertHits for every user.
scheduled_hits = ScheduledAlertHit.objects.filter(
user_id=user_id,
alert__rate=rate,
hit_status=SCHEDULED_ALERT_HIT_STATUS.SCHEDULED,
).select_related("user", "alert")

# Group scheduled hits by Alert and the main_doc_id
grouped_hits: DefaultDict[
Alert, DefaultDict[int, list[dict[str, Any]]]
] = defaultdict(lambda: defaultdict(list))
alerts_to_update = set()
for hit in scheduled_hits:
alert = hit.alert
doc_content = json_date_parser(hit.document_content)
match hit.alert.alert_type:
case SEARCH_TYPES.RECAP:
main_doc_id = doc_content.get("docket_id")
case SEARCH_TYPES.ORAL_ARGUMENT:
main_doc_id = doc_content.get("id")
case _:
# Not supported alert type.
continue
grouped_hits[alert][main_doc_id].append(doc_content)
alerts_to_update.add(alert.pk)

# Merge child documents with the same main_doc_id if the document dict
# contains the child_docs key.
merged_hits: DefaultDict[Alert, list[dict[str, Any]]] = defaultdict(
list
)
for alert, document_groups in grouped_hits.items():
for documents in document_groups.values():
merged_hits[alert].append(
merge_alert_child_documents(documents)
)

hits = []
for alert, results in alerts.items():
for alert, documents in merged_hits.items():
search_type = alert.alert_type
documents = []
for result in results:
documents.append(json_date_parser(result.document_content))

alerts_to_update.append(alert.pk)

# Override order_by to show the latest items when clicking the
# "View Full Results" button.
cut_off_date = get_cut_off_date(rate, now_time.date())
qd = override_alert_query(alert, cut_off_date)
alert.query_run = qd.urlencode() # type: ignore
hits.append(
(
alert,
search_type,
documents,
len(documents),
)
)
hits.append((alert, search_type, documents, len(documents)))

if hits:
send_search_alert_emails.delay(
[(user_id, hits)], scheduled_alert=True
)
alerts_sent_count += 1

# Update Alert's date_last_hit in bulk.
Alert.objects.filter(id__in=alerts_to_update).update(
date_last_hit=now_time
)
# Update Alert's date_last_hit in bulk for this user's alerts
Alert.objects.filter(id__in=alerts_to_update).update(
date_last_hit=now_time
)

# Update Scheduled alert hits status to "SENT".
scheduled_hits_rate.update(hit_status=SCHEDULED_ALERT_HIT_STATUS.SENT)
# Update Scheduled alert hits status to "SENT" for this user
scheduled_hits.update(hit_status=SCHEDULED_ALERT_HIT_STATUS.SENT)

# Remove old Scheduled alert hits sent, daily.
if rate == Alert.DAILY:
Expand All @@ -124,16 +170,12 @@ def query_and_send_alerts_by_rate(rate: str) -> None:


def send_scheduled_alerts(rate: str) -> None:
if rate == Alert.DAILY:
query_and_send_alerts_by_rate(Alert.DAILY)
elif rate == Alert.WEEKLY:
query_and_send_alerts_by_rate(Alert.WEEKLY)
elif rate == Alert.MONTHLY:
if rate == Alert.MONTHLY:
if datetime.date.today().day > 28:
raise InvalidDateError(
"Monthly alerts cannot be run on the 29th, 30th or 31st."
)
query_and_send_alerts_by_rate(Alert.MONTHLY)
query_and_send_alerts_by_rate(rate)


def delete_old_scheduled_alerts() -> int:
Expand Down
29 changes: 29 additions & 0 deletions cl/alerts/migrations/0011_add_schedule_alert_hit_content_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Generated by Django 5.1.2 on 2024-10-18 20:18

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("alerts", "0010_pghistory_v3_4_0_trigger_update"),
("contenttypes", "0002_remove_content_type_name"),
]

operations = [
migrations.AddField(
model_name="scheduledalerthit",
name="content_type",
field=models.ForeignKey(
null=True,
on_delete=django.db.models.deletion.CASCADE,
to="contenttypes.contenttype",
),
),
migrations.AddField(
model_name="scheduledalerthit",
name="object_id",
field=models.PositiveIntegerField(null=True),
),
]
11 changes: 11 additions & 0 deletions cl/alerts/migrations/0011_add_schedule_alert_hit_content_type.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
BEGIN;
--
-- Add field content_type to scheduledalerthit
--
ALTER TABLE "alerts_scheduledalerthit" ADD COLUMN "content_type_id" integer NULL CONSTRAINT "alerts_scheduledaler_content_type_id_c1a4db47_fk_django_co" REFERENCES "django_content_type"("id") DEFERRABLE INITIALLY DEFERRED; SET CONSTRAINTS "alerts_scheduledaler_content_type_id_c1a4db47_fk_django_co" IMMEDIATE;
--
-- Add field object_id to scheduledalerthit
--
ALTER TABLE "alerts_scheduledalerthit" ADD COLUMN "object_id" integer NULL CHECK ("object_id" >= 0);
CREATE INDEX "alerts_scheduledalerthit_content_type_id_c1a4db47" ON "alerts_scheduledalerthit" ("content_type_id");
COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 5.1.2 on 2024-10-18 20:19

from django.contrib.postgres.operations import AddIndexConcurrently
from django.conf import settings
from django.db import migrations, models


class Migration(migrations.Migration):
atomic = False

dependencies = [
("alerts", "0011_add_schedule_alert_hit_content_type"),
("contenttypes", "0002_remove_content_type_name"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]

operations = [
AddIndexConcurrently(
model_name="scheduledalerthit",
index=models.Index(
fields=["content_type", "object_id"],
name="alerts_sche_content_c5e627_idx",
),
),
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
--
-- Concurrently create index alerts_sche_content_c5e627_idx on field(s) content_type, object_id of model scheduledalerthit
--
CREATE INDEX CONCURRENTLY "alerts_sche_content_c5e627_idx" ON "alerts_scheduledalerthit" ("content_type_id", "object_id");
12 changes: 12 additions & 0 deletions cl/alerts/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import pghistory
from django.contrib.auth.models import User
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from django.core.serializers.json import DjangoJSONEncoder
from django.db import models
from django.http import QueryDict
Expand Down Expand Up @@ -201,3 +203,13 @@ class ScheduledAlertHit(AbstractDateTimeModel):
default=SCHEDULED_ALERT_HIT_STATUS.SCHEDULED,
choices=SCHEDULED_ALERT_HIT_STATUS.STATUS,
)
content_type = models.ForeignKey(
ContentType, on_delete=models.CASCADE, null=True
)
object_id = models.PositiveIntegerField(null=True)
content_object = GenericForeignKey("content_type", "object_id")

class Meta:
indexes = [
models.Index(fields=["content_type", "object_id"]),
]
Loading

0 comments on commit 0b10713

Please sign in to comment.