Skip to content

Commit

Permalink
Start tracking incoming message counts by flow
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Nov 21, 2024
1 parent 87deb12 commit a8c434f
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 24 deletions.
68 changes: 68 additions & 0 deletions temba/flows/migrations/0340_update_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Generated by Django 5.1.2 on 2024-11-21 20:45

from django.db import migrations

SQL = """
----------------------------------------------------------------------
-- Handles UPDATE statements on msg table
----------------------------------------------------------------------
CREATE OR REPLACE FUNCTION temba_msg_on_update() RETURNS TRIGGER AS $$
BEGIN
-- add negative counts for all old non-null system labels that don't match the new ones
INSERT INTO msgs_systemlabelcount("org_id", "label_type", "count", "is_squashed")
SELECT o.org_id, temba_msg_determine_system_label(o), -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
WHERE temba_msg_determine_system_label(o) IS DISTINCT FROM temba_msg_determine_system_label(n) AND temba_msg_determine_system_label(o) IS NOT NULL
GROUP BY 1, 2;
-- add counts for all new system labels that don't match the old ones
INSERT INTO msgs_systemlabelcount("org_id", "label_type", "count", "is_squashed")
SELECT n.org_id, temba_msg_determine_system_label(n), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE temba_msg_determine_system_label(o) IS DISTINCT FROM temba_msg_determine_system_label(n) AND temba_msg_determine_system_label(n) IS NOT NULL
GROUP BY 1, 2;
-- add negative old-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, o.visibility != 'V', -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = o.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY 1, 2;
-- add new-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, n.visibility != 'V', count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = n.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY 1, 2;
-- add new flow activity counts for incoming messages now marked as handled by a flow
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT n.flow_id, format('msgsin:hour:%s', extract(hour FROM n.created_on)), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1, 2;
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT n.flow_id, format('msgsin:dow:%s', extract(isodow FROM n.created_on)), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1, 2;
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT n.flow_id, format('msgsin:date:%s', n.created_on::date), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1, 2;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
"""


class Migration(migrations.Migration):

dependencies = [("flows", "0339_update_triggers")]

operations = [migrations.RunSQL(SQL)]
5 changes: 5 additions & 0 deletions temba/flows/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,7 @@ def delete(self):
for start in self.starts.all():
start.delete()

delete_in_batches(self.counts.all())
delete_in_batches(self.category_counts.all())
delete_in_batches(self.path_counts.all())
delete_in_batches(self.node_counts.all())
Expand Down Expand Up @@ -1384,6 +1385,10 @@ class QuerySet(models.QuerySet):
def sum(self) -> int:
return self.aggregate(count_sum=Sum("count"))["count_sum"] or 0

def scope_totals(self) -> dict:
counts = self.values_list("scope").annotate(count_sum=Sum("count"))
return {c[0]: c[1] for c in counts}

objects = QuerySet.as_manager()

@classmethod
Expand Down
84 changes: 66 additions & 18 deletions temba/flows/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -833,24 +833,6 @@ def test_activity(self, mr_mocks):
flow.get_run_stats(),
)

def test_activity_new(self):
# for now just check squashing works
flow = self.create_flow("Test")
flow.counts.create(scope="foo:1", count=1)
flow.counts.create(scope="foo:1", count=2)
flow.counts.create(scope="foo:2", count=4)

self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())

squash_flow_counts()

self.assertEqual(2, flow.counts.count())
self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())

def test_category_counts(self):
def assertCount(counts, result_key, category_name, truth):
found = False
Expand Down Expand Up @@ -5498,3 +5480,69 @@ def test_trim_revisions(self):
trim_flow_revisions()
self.assertEqual(2, FlowRevision.objects.filter(flow=flow2).count())
self.assertEqual(31, FlowRevision.objects.filter(flow=flow1).count())


class FlowActivityCountTest(TembaTest):
def test_msgsin_counts(self):
flow1 = self.create_flow("Test 1")
flow2 = self.create_flow("Test 2")

def handle(msg, flow):
msg.status = "H"
msg.flow = flow
msg.save(update_fields=("status", "flow"))

contact = self.create_contact("Bob", phone="+1234567890")
self.create_outgoing_msg(contact, "Out") # should be ignored
in1 = self.create_incoming_msg(
contact, "In 1", status="P", created_on=datetime(2024, 11, 21, 16, 39, tzinfo=tzone.utc)
)
in2 = self.create_incoming_msg(
contact, "In 2", status="P", created_on=datetime(2024, 11, 21, 16, 40, tzinfo=tzone.utc)
)
in3 = self.create_incoming_msg(
contact, "In 3", status="P", created_on=datetime(2024, 11, 22, 17, 10, tzinfo=tzone.utc)
)

self.assertEqual(0, flow1.counts.count())
self.assertEqual(0, flow2.counts.count())

handle(in1, flow1)
handle(in2, flow1)
handle(in3, flow2)

self.assertEqual(6, flow1.counts.count())
self.assertEqual(3, flow2.counts.count())

self.assertEqual(
{"msgsin:date:2024-11-21": 2, "msgsin:dow:4": 2, "msgsin:hour:16": 2},
flow1.counts.filter(scope__startswith="msgsin:").scope_totals(),
)
self.assertEqual(
{"msgsin:date:2024-11-22": 1, "msgsin:dow:5": 1, "msgsin:hour:17": 1},
flow2.counts.filter(scope__startswith="msgsin:").scope_totals(),
)

# other changes to msgs shouldn't create new counts
in1.archive()
in2.archive()

self.assertEqual(6, flow1.counts.count())
self.assertEqual(3, flow2.counts.count())

def test_squashing(self):
flow = self.create_flow("Test")
flow.counts.create(scope="foo:1", count=1)
flow.counts.create(scope="foo:1", count=2)
flow.counts.create(scope="foo:2", count=4)

self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())

squash_flow_counts()

self.assertEqual(2, flow.counts.count())
self.assertEqual(3, flow.counts.filter(scope="foo:1").sum())
self.assertEqual(4, flow.counts.filter(scope="foo:2").sum())
self.assertEqual(0, flow.counts.filter(scope="foo:3").sum())
27 changes: 22 additions & 5 deletions temba/sql/current_functions.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Generated by collect_sql on 2024-11-21 17:56 UTC
-- Generated by collect_sql on 2024-11-21 21:57 UTC

----------------------------------------------------------------------
-- Convenience method to call contact_toggle_system_group with a row
Expand Down Expand Up @@ -531,30 +531,47 @@ BEGIN
SELECT o.org_id, temba_msg_determine_system_label(o), -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
WHERE temba_msg_determine_system_label(o) IS DISTINCT FROM temba_msg_determine_system_label(n) AND temba_msg_determine_system_label(o) IS NOT NULL
GROUP BY o.org_id, temba_msg_determine_system_label(o);
GROUP BY 1, 2;

-- add counts for all new system labels that don't match the old ones
INSERT INTO msgs_systemlabelcount("org_id", "label_type", "count", "is_squashed")
SELECT n.org_id, temba_msg_determine_system_label(n), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE temba_msg_determine_system_label(o) IS DISTINCT FROM temba_msg_determine_system_label(n) AND temba_msg_determine_system_label(n) IS NOT NULL
GROUP BY n.org_id, temba_msg_determine_system_label(n);
GROUP BY 1, 2;

-- add negative old-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, o.visibility != 'V', -count(*), FALSE FROM oldtab o
INNER JOIN newtab n ON n.id = o.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = o.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY ml.label_id, o.visibility != 'V';
GROUP BY 1, 2;

-- add new-state label counts for all messages being archived/restored
INSERT INTO msgs_labelcount("label_id", "is_archived", "count", "is_squashed")
SELECT ml.label_id, n.visibility != 'V', count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
INNER JOIN msgs_msg_labels ml ON ml.msg_id = n.id
WHERE (o.visibility = 'V' AND n.visibility != 'V') or (o.visibility != 'V' AND n.visibility = 'V')
GROUP BY ml.label_id, n.visibility != 'V';
GROUP BY 1, 2;

-- add new flow activity counts for incoming messages now marked as handled by a flow
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT n.flow_id, format('msgsin:hour:%s', extract(hour FROM n.created_on)), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1, 2;
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT n.flow_id, format('msgsin:dow:%s', extract(isodow FROM n.created_on)), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1, 2;
INSERT INTO flows_flowactivitycount("flow_id", "scope", "count", "is_squashed")
SELECT n.flow_id, format('msgsin:date:%s', n.created_on::date), count(*), FALSE FROM newtab n
INNER JOIN oldtab o ON o.id = n.id
WHERE o.flow_id IS NULL AND n.flow_id IS NOT NULL
GROUP BY 1, 2;

RETURN NULL;
END;
Expand Down
2 changes: 1 addition & 1 deletion temba/sql/current_triggers.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-- Generated by collect_sql on 2024-11-21 17:56 UTC
-- Generated by collect_sql on 2024-11-21 21:57 UTC

CREATE TRIGGER temba_broadcast_on_delete
AFTER DELETE ON msgs_broadcast REFERENCING OLD TABLE AS oldtab
Expand Down

0 comments on commit a8c434f

Please sign in to comment.