Skip to content
This repository has been archived by the owner on Feb 7, 2019. It is now read-only.

Commit

Permalink
Merge pull request #75 from brki/handle-queryset-deletions
Browse files Browse the repository at this point in the history
Handle queryset deletions
  • Loading branch information
maennel committed May 22, 2015
2 parents d9cee1e + ca4e215 commit c45cb8f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
2 changes: 2 additions & 0 deletions versions/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class DeletionOfNonCurrentVersionError(ValueError):
pass
35 changes: 33 additions & 2 deletions versions/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from django.db import models, router

from versions import settings as versions_settings
from versions.exceptions import DeletionOfNonCurrentVersionError


def get_utc_now():
Expand Down Expand Up @@ -539,6 +540,34 @@ def as_of(self, qtime=None):
clone.querytime = QueryTime(time=qtime, active=True)
return clone

def delete(self):
"""
Deletes the records in the QuerySet.
"""
assert self.query.can_filter(), \
"Cannot use 'limit' or 'offset' with delete."

del_query = self._clone()

# The delete is actually 2 queries - one to find related objects,
# and one to delete. Make sure that the discovery of related
# objects is performed on the same database as the deletion.
del_query._for_write = True

# Disable non-supported fields.
del_query.query.select_for_update = False
del_query.query.select_related = False
del_query.query.clear_ordering(force_empty=True)

collector = versions_settings.VERSIONED_DELETE_COLLECTOR_CLASS(using=del_query.db)
collector.collect(del_query)
collector.delete(get_utc_now())

# Clear the result cache, in case this QuerySet gets reused.
self._result_cache = None
delete.alters_data = True
delete.queryset_only = True


class VersionedForeignKey(ForeignKey):
"""
Expand Down Expand Up @@ -1094,7 +1123,9 @@ def __init__(self, *args, **kwargs):

def delete(self, using=None):
using = using or router.db_for_write(self.__class__, instance=self)
assert self._get_pk_val() is not None, "%s object can't be deleted because its %s attribute is set to None." % (self._meta.object_name, self._meta.pk.attname)
assert self._get_pk_val() is not None, \
"{} object can't be deleted because its {} attribute is set to None.".format(
self._meta.object_name, self._meta.pk.attname)

collector = versions_settings.VERSIONED_DELETE_COLLECTOR_CLASS(using=using)
collector.collect([self])
Expand All @@ -1115,7 +1146,7 @@ def _delete_at(self, timestamp, using=None):
self.version_end_date = timestamp
self.save(force_update=True, using=using)
else:
raise Exception('Cannot delete anything else but the current version')
raise DeletionOfNonCurrentVersionError('Cannot delete anything else but the current version')

@property
def is_current(self):
Expand Down
37 changes: 36 additions & 1 deletion versions_tests/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from django.utils.timezone import utc
from django.utils import six

from versions.exceptions import DeletionOfNonCurrentVersionError
from versions.models import get_utc_now, ForeignKeyRequiresValueError, Versionable
from versions_tests.models import (
Award, B, C1, C2, C3, City, Classroom, Directory, Fan, Mascot, NonFan, Observer, Person, Player, Professor, Pupil,
Expand Down Expand Up @@ -139,10 +140,44 @@ def test_deleting_non_current_version(self):
current = B.objects.current.first()
previous = B.objects.previous_version(current)

self.assertRaises(Exception, previous.delete)
self.assertRaises(DeletionOfNonCurrentVersionError, previous.delete)

def test_delete_using_current_queryset(self):
B.objects.current.all().delete()
bs = list(B.objects.all())
self.assertEqual(3, len(bs))
for b in bs:
self.assertIsNotNone(b.version_end_date)

def test_delete_using_non_current_queryset(self):

B.objects.create(name='Buzz')

qs = B.objects.all().filter(version_end_date__isnull=True)
self.assertEqual(2, len(qs))
pks = [o.pk for o in qs]

qs.delete()
bs = list(B.objects.all().filter(pk__in=pks))
self.assertEqual(2, len(bs))
for b in bs:
self.assertIsNotNone(b.version_end_date)

def test_deleteing_non_current_version_with_queryset(self):
qs = B.objects.all().filter(version_end_date__isnull=False)
self.assertEqual(2, qs.count())

# Use transation.atomic here to avoid the subsequent query from failing because an Exception occurred.
# After all, we're expecting the exception.
with transaction.atomic():
self.assertRaises(DeletionOfNonCurrentVersionError, qs.delete)

self.assertEqual(2, qs.count())


class DeletionHandlerTest(TestCase):
"""Tests that the ForeignKey on_delete parameters have the expected effects"""

def setUp(self):
self.city = City.objects.create(name='c.v1')
self.team = Team.objects.create(name='t.v1', city=self.city)
Expand Down

0 comments on commit c45cb8f

Please sign in to comment.