Skip to content

Commit

Permalink
Merge pull request #5679 from nyaruka/flow_msg_counts
Browse files Browse the repository at this point in the history
Start tracking incoming message counts by flow
  • Loading branch information
rowanseymour authored Nov 22, 2024
2 parents 87deb12 + 5aef096 commit 0f1aaf7
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 29 deletions.
64 changes: 64 additions & 0 deletions temba/flows/migrations/0340_update_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Generated by Django 5.1.2 on 2024-11-21 20:45

from django.db import migrations

SQL = """
----------------------------------------------------------------------
-- Handles UPDATE statements on msg table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_msg_on_update() RETURNS TRIGGER AS $$
BEGIN
-- add negative counts for all old non-null system labels that don't match the new ones
INSERT INTO msgs_systemlabelcount("org_id", "label_type", "count", "is_squashed")
SELECT o.org_id, temba_msg_determine_system_label(o), -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
WHERE temba_msg_determine_system_label(o) IS DISTINCT FROM temba_msg_determine_system_label(n) AND temba_msg_determine_system_label(o) IS NOT NULL
GROUP BY 1, 2;
-- add counts for all new system labels that don't match the old ones
INSERT INTO msgs_systemlabelcount("org_id", "label_type", "count", "is_squashed")
SELECT n.org_id, temba_msg_determine_system_label(n), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE temba_msg_determine_system_label(o) IS DISTINCT FROM temba_msg_determine_system_label(n) AND temba_msg_determine_system_label(n) IS NOT NULL
GROUP BY 1, 2;
-- add negative old-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, o.visibility != 'V', -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = o.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY 1, 2;
-- add new-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, n.visibility != 'V', count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = n.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY 1, 2;
-- add new flow activity counts for incoming messages now marked as handled by a flow
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT s.flow_id, unnest(ARRAY[
format('msgsin:hour:%s', extract(hour FROM NOW())),
format('msgsin:dow:%s', extract(isodow FROM NOW())),
format('msgsin:date:%s', NOW()::date)
]), s.msgs, FALSE
FROM (
SELECT n.flow_id, count(*) AS msgs FROM newtab n INNER JOIN oldtab o ON o.id = n.id
WHERE n.direction = 'I' AND o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1
) s;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
"""


class Migration(migrations.Migration):

dependencies = [("flows", "0339_update_triggers")]

operations = [migrations.RunSQL(SQL)]
5 changes: 5 additions & 0 deletions temba/flows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ def delete(self):
for start in self.starts.all():
start.delete()

delete_in_batches(self.counts.all())
delete_in_batches(self.category_counts.all())
delete_in_batches(self.path_counts.all())
delete_in_batches(self.node_counts.all())
Expand Down Expand Up @@ -1384,6 +1385,10 @@ class QuerySet(models.QuerySet):
def sum(self) -> int:
return self.aggregate(count_sum=Sum("count"))["count_sum"] or 0

def scope_totals(self) -> dict:
counts = self.values_list("scope").annotate(count_sum=Sum("count"))
return {c[0]: c[1] for c in counts}

objects = QuerySet.as_manager()

@classmethod
Expand Down
5 changes: 4 additions & 1 deletion temba/flows/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ def update_session_wait_expires(flow_id):
@cron_task(lock_timeout=7200)
def squash_flow_counts():
FlowActivityCount.squash()
FlowStartCount.squash()


@cron_task(lock_timeout=7200)
def squash_legacy_counts():
FlowNodeCount.squash()
FlowRunStatusCount.squash()
FlowCategoryCount.squash()
FlowStartCount.squash()
FlowPathCount.squash()


Expand Down
89 changes: 68 additions & 21 deletions temba/flows/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from .tasks import (
interrupt_flow_sessions,
squash_flow_counts,
squash_legacy_counts,
trim_flow_revisions,
trim_flow_sessions,
update_session_wait_expires,
Expand Down Expand Up @@ -550,7 +551,7 @@ def test_activity(self, mr_mocks):
)

# check squashing doesn't change anything
squash_flow_counts()
squash_legacy_counts()

(active, visited) = flow.get_activity()

Expand Down Expand Up @@ -833,24 +834,6 @@ def test_activity(self, mr_mocks):
flow.get_run_stats(),
)

def test_activity_new(self):
# for now just check squashing works
flow = self.create_flow("Test")
flow.counts.create(scope="foo:1", count=1)
flow.counts.create(scope="foo:1", count=2)
flow.counts.create(scope="foo:2", count=4)

self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())

squash_flow_counts()

self.assertEqual(2, flow.counts.count())
self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())

def test_category_counts(self):
def assertCount(counts, result_key, category_name, truth):
found = False
Expand Down Expand Up @@ -3403,7 +3386,7 @@ def create_runs(flow_status_pairs: tuple) -> list:
self.assertEqual({"W": 2}, FlowRunStatusCount.get_totals(flow2))

# no difference after squashing
squash_flow_counts()
squash_legacy_counts()

self.assertEqual({"A": 2, "W": 1, "C": 1}, FlowRunStatusCount.get_totals(flow1))
self.assertEqual({"W": 2}, FlowRunStatusCount.get_totals(flow2))
Expand All @@ -3428,7 +3411,7 @@ def create_runs(flow_status_pairs: tuple) -> list:
self.assertEqual({"W": 0, "X": 1, "I": 2}, FlowRunStatusCount.get_totals(flow2))

# no difference after squashing
squash_flow_counts()
squash_legacy_counts()

self.assertEqual({"A": 2, "W": 0, "C": 0, "I": 4}, FlowRunStatusCount.get_totals(flow1))
self.assertEqual({"W": 0, "X": 1, "I": 2}, FlowRunStatusCount.get_totals(flow2))
Expand Down Expand Up @@ -5498,3 +5481,67 @@ def test_trim_revisions(self):
trim_flow_revisions()
self.assertEqual(2, FlowRevision.objects.filter(flow=flow2).count())
self.assertEqual(31, FlowRevision.objects.filter(flow=flow1).count())


class FlowActivityCountTest(TembaTest):
def test_msgsin_counts(self):
flow1 = self.create_flow("Test 1")
flow2 = self.create_flow("Test 2")

def handle(msg, flow):
msg.status = "H"
msg.flow = flow
msg.save(update_fields=("status", "flow"))

contact = self.create_contact("Bob", phone="+1234567890")
self.create_outgoing_msg(contact, "Out") # should be ignored
in1 = self.create_incoming_msg(contact, "In 1", status="P")
in2 = self.create_incoming_msg(contact, "In 2", status="P")
in3 = self.create_incoming_msg(contact, "In 3", status="P")

self.assertEqual(0, flow1.counts.count())
self.assertEqual(0, flow2.counts.count())

handle(in1, flow1)
handle(in2, flow1)
handle(in3, flow2)

self.assertEqual(6, flow1.counts.count())
self.assertEqual(3, flow2.counts.count())

today = date.today().isoformat() # date as YYYY-MM-DD
dow = date.today().isoweekday() # weekday as 1(Mon)-7(Sun)
hour = timezone.now().astimezone(tzone.utc).hour

self.assertEqual(
{f"msgsin:date:{today}": 2, f"msgsin:dow:{dow}": 2, f"msgsin:hour:{hour}": 2},
flow1.counts.filter(scope__startswith="msgsin:").scope_totals(),
)
self.assertEqual(
{f"msgsin:date:{today}": 1, f"msgsin:dow:{dow}": 1, f"msgsin:hour:{hour}": 1},
flow2.counts.filter(scope__startswith="msgsin:").scope_totals(),
)

# other changes to msgs shouldn't create new counts
in1.archive()
in2.archive()

self.assertEqual(6, flow1.counts.count())
self.assertEqual(3, flow2.counts.count())

def test_squashing(self):
flow = self.create_flow("Test")
flow.counts.create(scope="foo:1", count=1)
flow.counts.create(scope="foo:1", count=2)
flow.counts.create(scope="foo:2", count=4)

self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())

squash_flow_counts()

self.assertEqual(2, flow.counts.count())
self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())
3 changes: 2 additions & 1 deletion temba/settings_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,8 @@
"send-notification-emails": {"task": "send_notification_emails", "schedule": timedelta(seconds=60)},
"squash-channel-counts": {"task": "squash_channel_counts", "schedule": timedelta(seconds=60)},
"squash-group-counts": {"task": "squash_group_counts", "schedule": timedelta(seconds=60)},
"squash-flow-counts": {"task": "squash_flow_counts", "schedule": timedelta(seconds=60)},
"squash-legacy-counts": {"task": "squash_legacy_counts", "schedule": timedelta(seconds=60)},
"squash-flow-counts": {"task": "squash_flow_counts", "schedule": timedelta(seconds=30)},
"squash-item-counts": {"task": "squash_item_counts", "schedule": timedelta(seconds=30)},
"squash-msg-counts": {"task": "squash_msg_counts", "schedule": timedelta(seconds=60)},
"squash-ticket-counts": {"task": "squash_ticket_counts", "schedule": timedelta(seconds=60)},
Expand Down
23 changes: 18 additions & 5 deletions temba/sql/current_functions.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Generated by collect_sql on 2024-11-21 17:56 UTC
-- Generated by collect_sql on 2024-11-22 14:18 UTC

----------------------------------------------------------------------
-- Convenience method to call contact_toggle_system_group with a row
Expand Down Expand Up @@ -531,30 +531,43 @@ BEGIN
SELECT o.org_id, temba_msg_determine_system_label(o), -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
WHERE temba_msg_determine_system_label(o) IS DISTINCT FROM temba_msg_determine_system_label(n) AND temba_msg_determine_system_label(o) IS NOT NULL
GROUP BY o.org_id, temba_msg_determine_system_label(o);
GROUP BY 1, 2;

-- add counts for all new system labels that don't match the old ones
INSERT INTO msgs_systemlabelcount("org_id", "label_type", "count", "is_squashed")
SELECT n.org_id, temba_msg_determine_system_label(n), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE temba_msg_determine_system_label(o) IS DISTINCT FROM temba_msg_determine_system_label(n) AND temba_msg_determine_system_label(n) IS NOT NULL
GROUP BY n.org_id, temba_msg_determine_system_label(n);
GROUP BY 1, 2;

-- add negative old-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, o.visibility != 'V', -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = o.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY ml.label_id, o.visibility != 'V';
GROUP BY 1, 2;

-- add new-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, n.visibility != 'V', count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = n.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY ml.label_id, n.visibility != 'V';
GROUP BY 1, 2;

-- add new flow activity counts for incoming messages now marked as handled by a flow
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT s.flow_id, unnest(ARRAY[
format('msgsin:hour:%s', extract(hour FROM NOW())),
format('msgsin:dow:%s', extract(isodow FROM NOW())),
format('msgsin:date:%s', NOW()::date)
]), s.msgs, FALSE
FROM (
SELECT n.flow_id, count(*) AS msgs FROM newtab n INNER JOIN oldtab o ON o.id = n.id
WHERE n.direction = 'I' AND o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1
) s;

RETURN NULL;
END;
Expand Down
2 changes: 1 addition & 1 deletion temba/sql/current_triggers.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Generated by collect_sql on 2024-11-21 17:56 UTC
-- Generated by collect_sql on 2024-11-22 14:18 UTC

CREATE TRIGGER temba_broadcast_on_delete
AFTER DELETE ON msgs_broadcast REFERENCING OLD TABLE AS oldtab
Expand Down

0 comments on commit 0f1aaf7

Please sign in to comment.