Skip to content

Commit

Permalink
Add some utils that have been duplicated in various code bases (#378)
Browse files Browse the repository at this point in the history
* Add utils to sort records with same syntax as server

* Add utils to compare records

* Add collection_diff util
  • Loading branch information
leplatrem authored Nov 20, 2024
1 parent cf0251e commit acd0571
Show file tree
Hide file tree
Showing 2 changed files with 237 additions and 0 deletions.
53 changes: 53 additions & 0 deletions src/kinto_http/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import json
import re
import sys
import unicodedata
from datetime import date, datetime

Expand All @@ -9,6 +10,9 @@
from kinto_http.constants import VALID_SLUG_REGEXP


MAX_LENGTH_INT = len(str(sys.maxsize * 2 + 1))


def slugify(value):
"""Normalizes string, converts to lowercase, removes non-alpha characters
and converts spaces to hyphens.
Expand Down Expand Up @@ -57,3 +61,52 @@ def json_iso_datetime(obj):


json_dumps = functools.partial(json.dumps, default=json_iso_datetime)


def sort_records(records, sort):
"""
Sort records following the same format as the server ``name,-last_modified``.
"""

def reversed(way, value):
if isinstance(value, (int, float)):
value = str(way * value).zfill(MAX_LENGTH_INT)
if isinstance(value, str):
return "".join(chr(255 - ord(c)) for c in value) if way < 0 else value
return str(value)

sort_fields = [
(-1, f.strip()[1:]) if f.startswith("-") else (1, f.strip()) for f in sort.split(",")
]
return sorted(
records, key=lambda r: tuple(reversed(way, r.get(field)) for way, field in sort_fields)
)


def records_equal(a, b):
"""
Compare records attributes, ignoring those assigned automatically
by the server.
"""
ignore_fields = ("last_modified", "schema")
ac = {k: v for k, v in a.items() if k not in ignore_fields}
bc = {k: v for k, v in b.items() if k not in ignore_fields}
return ac == bc


def collection_diff(src, dest):
"""
Compare two lists of records.
"""
dest_by_id = {r["id"]: r for r in dest}
to_create = []
to_update = []
for r in src:
record = dest_by_id.pop(r["id"], None)
if record is None:
to_create.append(r)
elif not records_equal(r, record):
r.pop("last_modified", None)
to_update.append((record, r))
to_delete = list(dest_by_id.values())
return to_create, to_update, to_delete
184 changes: 184 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,187 @@ def test_quote_strips_extra_quotes():

def test_quotes_can_take_integers():
assert utils.quote(1234) == '"1234"'


def test_sort_single_field_ascending():
records = [
{"name": "Charlie", "age": 25},
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 20},
]
result = utils.sort_records(records, "name")
expected = [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 20},
{"name": "Charlie", "age": 25},
]
assert result == expected


def test_sort_single_field_descending():
records = [
{"name": "Charlie", "age": 25},
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 20},
]
result = utils.sort_records(records, "-name")
expected = [
{"name": "Charlie", "age": 25},
{"name": "Bob", "age": 20},
{"name": "Alice", "age": 30},
]
assert result == expected


def test_sort_multiple_fields():
records = [
{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25},
{"name": "Alice", "age": 20},
]
result = utils.sort_records(records, "name,-age")
expected = [
{"name": "Alice", "age": 30},
{"name": "Alice", "age": 20},
{"name": "Bob", "age": 25},
]
assert result == expected


def test_sort_missing_field():
records = [
{"name": "Charlie", "age": 25},
{"name": "Alice"},
{"name": "Bob", "age": 20},
]
result = utils.sort_records(records, "age")
expected = [
{"name": "Bob", "age": 20},
{"name": "Charlie", "age": 25},
{"name": "Alice"}, # Missing "age" is treated as default
]
assert result == expected


def test_sort_numeric_field_descending():
records = [
{"name": "Charlie", "score": 85},
{"name": "Alice", "score": 95},
{"name": "Bob", "score": 111},
]
result = utils.sort_records(records, "-score")
expected = [
{"name": "Bob", "score": 111},
{"name": "Alice", "score": 95},
{"name": "Charlie", "score": 85},
]
assert result == expected


def test_sort_mixed_numeric_and_string():
records = [
{"name": "Charlie", "age": 25},
{"name": "Alice", "age": 20},
{"name": "Bob", "age": 20},
]
result = utils.sort_records(records, "age,-name")
expected = [
{"name": "Bob", "age": 20},
{"name": "Alice", "age": 20},
{"name": "Charlie", "age": 25},
]
assert result == expected


def test_records_equal_identical_records():
a = {"id": 1, "name": "Alice", "last_modified": 123, "schema": "v1"}
b = {"id": 1, "name": "Alice", "last_modified": 456, "schema": "v2"}
assert utils.records_equal(a, b)


def test_records_equal_different_records():
a = {"id": 1, "name": "Alice", "last_modified": 123}
b = {"id": 2, "name": "Bob", "last_modified": 456}
assert not utils.records_equal(a, b)


def test_records_equal_missing_fields():
a = {"id": 1, "name": "Alice", "last_modified": 123}
b = {"id": 1, "name": "Alice"}
assert utils.records_equal(a, b)


def test_records_equal_extra_fields():
a = {"id": 1, "name": "Alice", "extra": "field"}
b = {"id": 1, "name": "Alice"}
assert not utils.records_equal(a, b)


def test_records_equal_empty_records():
a = {}
b = {}
assert utils.records_equal(a, b)


def test_records_equal_only_ignored_fields():
a = {"last_modified": 123, "schema": "v1"}
b = {"last_modified": 456, "schema": "v2"}
assert utils.records_equal(a, b)


def test_collection_diff_create():
src = [{"id": 1, "name": "Alice"}]
dest = []
to_create, to_update, to_delete = utils.collection_diff(src, dest)
assert to_create == [{"id": 1, "name": "Alice"}]
assert to_update == []
assert to_delete == []


def test_collection_diff_update():
src = [{"id": 1, "name": "Alice"}]
dest = [{"id": 1, "name": "Bob"}]
to_create, to_update, to_delete = utils.collection_diff(src, dest)
assert to_create == []
assert to_update == [({"id": 1, "name": "Bob"}, {"id": 1, "name": "Alice"})]
assert to_delete == []


def test_collection_diff_delete():
src = []
dest = [{"id": 1, "name": "Alice"}]
to_create, to_update, to_delete = utils.collection_diff(src, dest)
assert to_create == []
assert to_update == []
assert to_delete == [{"id": 1, "name": "Alice"}]


def test_collection_diff_mixed():
src = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Charlie"}]
dest = [
{"id": 2, "name": "Bob"},
{"id": 3, "name": "CharlieUpdated"},
{"id": 4, "name": "Dave"},
]
to_create, to_update, to_delete = utils.collection_diff(src, dest)
assert to_create == [{"id": 1, "name": "Alice"}]
assert to_update == [({"id": 3, "name": "CharlieUpdated"}, {"id": 3, "name": "Charlie"})]
assert to_delete == [{"id": 4, "name": "Dave"}]


def test_collection_diff_no_changes():
src = [{"id": 1, "name": "Alice"}]
dest = [{"id": 1, "name": "Alice"}]
to_create, to_update, to_delete = utils.collection_diff(src, dest)
assert to_create == []
assert to_update == []
assert to_delete == []


def test_collection_diff_empty_collections():
src = []
dest = []
to_create, to_update, to_delete = utils.collection_diff(src, dest)
assert to_create == []
assert to_update == []
assert to_delete == []

0 comments on commit acd0571

Please sign in to comment.