Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

470 - Tag Merging Service and Management Command #907

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions taggit/management/commands/merge_duplicate_tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from django.core.management.base import BaseCommand, CommandError

from taggit.services.tag_merging import TagMergingService


class Command(BaseCommand):
help = "Merges Tags with the same name but different case."

def add_arguments(self, parser):
parser.add_argument(
"tag_name", type=str, help="The name of the tag to merge duplicates for."
)

def handle(self, *args, **options):
"""
Handles the command to merge duplicate tags.

Args:
*args: Variable length argument list.
**options: Arbitrary keyword arguments. Expected to contain:
- tag_name (str): The name of the tag to merge duplicates for.


Raises:
CommandError: If the specified tag does not exist or if an unspecified error occurs during
"""
tag_name = options["tag_name"]
service = TagMergingService()

try:
service.merge_case_insensitive_tags(tag_name)
self.stdout.write(
self.style.SUCCESS(
f'Successfully merged duplicates of the tag "{tag_name}"'
)
)
except Exception as e:
raise CommandError(f"Error occurred while merging tags: {e}")
Empty file added taggit/services/__init__.py
Empty file.
183 changes: 183 additions & 0 deletions taggit/services/tag_merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import logging

from django.apps import apps
from django.db import transaction
from django.db.models import QuerySet

from taggit.managers import TaggableManager
from taggit.models import GenericTaggedItemBase, TagBase

logger = logging.getLogger(__name__)


class TagMergingService:
@staticmethod
def get_models_using_tag_through_models():
"""
Retrieves a set of 'through' models used by TaggableManager fields across all registered models.

Returns:
set: A set of models that serve as 'through' models for TaggableManager fields.
"""
return {
field.through
for model in apps.get_models()
for field in model._meta.get_fields()
if isinstance(field, TaggableManager)
}

def identify_duplicates(
self, duplicate_query_set: QuerySet[TagBase], through_model
) -> QuerySet[GenericTaggedItemBase]:
"""
Identifies TaggedItemBase instances associated with tags in the provided queryset.

This method filters `GenericTaggedItemBase` instances to find those associated with any of the tags
in the given `duplicate_query_set`.

Args:
duplicate_query_set (QuerySet[TagBase]): A queryset containing instances of TagBase
considered duplicates.

Returns:
QuerySet[GenericTaggedItemBase]: A queryset of TagItems instances associated with the tags
in `duplicate_query_set`.
"""

return through_model.objects.filter(tag__in=duplicate_query_set)

def tagged_item_exists(self, tagged_item, base_tag, through_model):
"""
Checks if a tagged item already exists with the specified base tag.

This method determines whether a `TaggedItem` instance associated with a given `tagged_item`
already exists with the `base_tag`. It supports checking for existence based on two scenarios:
- If the `tag_through_model` has a `content_type` attribute, it filters based on `content_type`,
`object_id`, and `tag`.
- If the `tag_through_model` does not have a `content_type` attribute, it filters based on
`content_object` and `tag`.

Args:
tagged_item (GenericTaggedItemBase): The tagged item instance to check for an existing tag.
base_tag (TagBase): The base tag to check against the tagged item.

Returns:
bool: True if an existing tagged item with the base tag is found, False otherwise.
"""
if hasattr(through_model, "content_type"):
return through_model.objects.filter(
content_type=tagged_item.content_type,
object_id=tagged_item.object_id,
tag=base_tag,
).exists()
return through_model.objects.filter(
content_object=tagged_item.content_object,
tag=base_tag,
).exists()

def _merge_tags(
self, base_tag: TagBase, duplicate_query_set: QuerySet[TagBase], through_model
) -> None:
"""
Merges tags in the `duplicate_query_set` into a single `base_tag`.

This method performs the merging of tags by first excluding the `base_tag`
from the `duplicate_query_set` to ensure it is not deleted or modified.
It then identifies all `TaggedItem` instances associated with the tags in
the `duplicate_query_set` and updates their `tag_id` to point to the `base_tag`.
Finally, it deletes all tags in the `duplicate_query_set`,
effectively merging them into the `base_tag`.

Args:
base_tag: The tag into which all duplicates will be merged.
duplicate_query_set: A queryset of tags considered duplicates
that should be merged into the `base_tag`.

"""
try:
duplicate_query_set = duplicate_query_set.exclude(pk=base_tag.pk)

tags_to_be_merged_names = list(
duplicate_query_set.values_list("name", flat=True)
)
tag_to_update = self.identify_duplicates(duplicate_query_set, through_model)
for tagged_item in tag_to_update:
if not self.tagged_item_exists(tagged_item, base_tag, through_model):
tagged_item.tag = base_tag
tagged_item.save()

if tags_to_be_merged_names:
logger.info(
f"Merged tags {', '.join(tags_to_be_merged_names)} into {base_tag.name} and deleted them."
)
else:
logger.info(
f"No tags were merged into {base_tag.name} as no duplicates were found."
)

except Exception as e:
logger.error(f"Error merging tags: {e}")
raise e

@staticmethod
def case_insensitive_queryset(tag_model, base_tag_name):
return tag_model.objects.filter(name__iexact=base_tag_name)

def merge_case_insensitive_tags(self, base_tag_name: str):
"""
Merges all tags that match the `base_tag_name` case-insensitively into a single tag.

This method finds all tags that match the given `base_tag_name` without considering case (case-insensitive match).
It then merges all these tags into a single tag identified by the exact `base_tag_name`.
This is useful for consolidating tags that are meant to be the same but may have been created with different
case usage, ensuring data consistency and reducing redundancy.

Raises:
Tag.DoesNotExist: If no tag with the exact `base_tag_name` is found

Args:
base_tag_name (str): The name of the base tag into which all case-insensitive matches will be merged.
"""
self.merge_tags(base_tag_name, self.case_insensitive_queryset)

def merge_tags(self, base_tag_name, duplicate_query_function) -> None:
"""
Merges all tags that match the `base_tag_name` into a single tag.

The `base_tag_name` must exist in the database. If the `base_tag_name` does not exist tags will not be merged.

This method finds all tags that match the given `base_tag_name` and merges them into a single tag
identified by the exact `base_tag_name`. It uses the provided `duplicate_query_function` to determine
the tags to merge based on the `base_tag_name`. The `duplicate_query_function` should accept two arguments:
the tag model and the `base_tag_name`, and return a queryset of tags to merge into the `base_tag_name`.

Args:
base_tag_name (str): The name of the base tag into which all duplicates will be merged.
duplicate_query_function (Callable): A function that accepts the tag model and the `base_tag_name`
and returns a queryset of tags to merge into the `base_tag_name`.

Raises:
ValueError: If the `duplicate_query_function` is not
callable.

"""
if not callable(duplicate_query_function):
raise ValueError("duplicate_query_function must be callable")
tag_models = set()
with transaction.atomic():
for through_model in self.get_models_using_tag_through_models():
tag_model = through_model.tag_model()
try:
base_tag = tag_model.objects.get(name=base_tag_name)
except tag_model.DoesNotExist:
continue
duplicate_query_set = duplicate_query_function(
tag_model, base_tag_name
).exclude(name=base_tag_name)
self._merge_tags(base_tag, duplicate_query_set, through_model)
tag_models.add(tag_model)

for tag_model in tag_models:
duplicate_query_function(tag_model, base_tag_name).exclude(
name=base_tag_name
).delete()
81 changes: 81 additions & 0 deletions tests/test_tag_merging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from django.test import TestCase

from taggit.models import Tag
from taggit.services.tag_merging import TagMergingService
from tests.models import DirectFood, Food, HousePet


class TagMergingServiceTests(TestCase):
def setUp(self):
self.service = TagMergingService()
self.tag0 = Tag.objects.create(name="PyThon")
self.tag1 = Tag.objects.create(name="Python")
self.tag2 = Tag.objects.create(name="python")
self.tag3 = Tag.objects.create(name="Django")
self.tag4 = Tag.objects.create(name="PythonFundamentals")

def test_merging_identical_tags_does_nothing(self):
def filter_same_tag(tag, base_tag_name):
return tag.objects.filter(name=base_tag_name)

self.service.merge_tags(self.tag1.name, filter_same_tag)
self.assertEqual(Tag.objects.count(), 5)

def test_merging_case_insensitive_tags_merges_correctly(self):
self.service.merge_case_insensitive_tags("python")
self.assertEqual(Tag.objects.count(), 3)
self.assertFalse(Tag.objects.filter(name="Python").exists())
self.assertTrue(Tag.objects.filter(name="python").exists())

def test_merging_tags_with_itself_does_not_delete_it(self):
def filter_python(tag, base_tag_name):
return tag.objects.filter(name=base_tag_name)

self.service.merge_tags(self.tag1.name, filter_python)
self.assertTrue(Tag.objects.filter(name="Python").exists())

def test_merging_tags_deletes_duplicates(self):
def filter_starts_with(tag, _):
return tag.objects.filter(name__istartswith="python")

self.service.merge_tags(self.tag1.name, filter_starts_with)
self.assertEqual(Tag.objects.count(), 2)
self.assertFalse(Tag.objects.filter(name="PythonFundamentals").exists())
self.assertTrue(Tag.objects.filter(name="Python").exists())

def test_merging_tags_updates_tagged_items_correctly(self):
# Create instances of DirectFood and DirectPet
food_item = Food.objects.create(name="Apple")
pet_item = HousePet.objects.create(name="Fido")

# Tag the instances
food_item.tags.add(self.tag0)
pet_item.tags.add(self.tag1)

self.service.merge_case_insensitive_tags("python")

# Refresh the instances from the database
food_item.refresh_from_db()
pet_item.refresh_from_db()

# Assert that the tags have been updated to the merged tag
self.assertTrue(food_item.tags.filter(name="python").exists())
self.assertTrue(pet_item.tags.filter(name="python").exists())

def test_merging_tags_direct_updates_tagged_items_correctly(self):
# Create instances of DirectFood and DirectPet
food_item = DirectFood.objects.create(name="Apple")

# Tag the instances
food_item.tags.add(self.tag0)
food_item.tags.add(self.tag1)

# Merge the tags
service = TagMergingService()
service.merge_case_insensitive_tags("python")

# Refresh the instances from the database
food_item.refresh_from_db()

# Assert that the tags have been updated to the merged tag
self.assertTrue(food_item.tags.filter(name="python").exists())
Loading