Skip to content

Commit

Permalink
Handle write conflicts in transactions #422
Browse files Browse the repository at this point in the history
  • Loading branch information
joelvdavies committed Nov 28, 2024
1 parent 552046b commit 7891b97
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 39 deletions.
27 changes: 26 additions & 1 deletion inventory_management_system_api/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@
Module for connecting to a MongoDB database.
"""

from typing import Annotated
from contextlib import contextmanager
from typing import Annotated, Generator

from fastapi import Depends
from pymongo import MongoClient
from pymongo.client_session import ClientSession
from pymongo.database import Database
from pymongo.errors import OperationFailure

from inventory_management_system_api.core.config import config
from inventory_management_system_api.core.exceptions import WriteConflictError

db_config = config.database
mongodb_client = MongoClient(
Expand All @@ -28,4 +32,25 @@ def get_database() -> Database:
return mongodb_client[db_config.name.get_secret_value()]


@contextmanager
def start_session_transaction(action_description: str) -> Generator[ClientSession, None, None]:
"""
Starts a MongoDB session followed by a transaction and returns the session to use.
Also handles write conflicts.
:param action_description: Description of what the transaction is doing so it can be used in any raised errors.
:raises WriteConflictError: If there a write conflict during the transaction.
"""

with mongodb_client.start_session() as session:
with session.start_transaction():
try:
yield session
except OperationFailure as exc:
if "write conflict" in str(exc).lower():
raise WriteConflictError(f"Write conflict while {action_description}") from exc
raise exc


DatabaseDep = Annotated[Database, Depends(get_database)]
6 changes: 6 additions & 0 deletions inventory_management_system_api/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,9 @@ class InvalidActionError(DatabaseError):
"""
Exception raised when trying to update an item's catalogue item ID
"""


class WriteConflictError(DatabaseError):
"""
Exception raised when a transaction has a write conflict
"""
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from fastapi import Depends

from inventory_management_system_api.core.database import mongodb_client
from inventory_management_system_api.core.database import start_session_transaction
from inventory_management_system_api.core.exceptions import InvalidActionError, MissingRecordError
from inventory_management_system_api.models.catalogue_category import (
AllowedValues,
Expand Down Expand Up @@ -105,32 +105,31 @@ def create(
)

# Run all subsequent edits within a transaction to ensure they will all succeed or fail together
with mongodb_client.start_session() as session:
with session.start_transaction():
# Firstly update the catalogue category
catalogue_category_property_out = self._catalogue_category_repository.create_property(
catalogue_category_id, catalogue_category_property_in, session=session
)
with start_session_transaction("performing catalogue category property migration create") as session:
# Firstly update the catalogue category
catalogue_category_property_out = self._catalogue_category_repository.create_property(
catalogue_category_id, catalogue_category_property_in, session=session
)

property_in = PropertyIn(
id=str(catalogue_category_property_in.id),
name=catalogue_category_property_in.name,
value=catalogue_category_property.default_value,
unit=unit_value,
unit_id=catalogue_category_property.unit_id,
)
property_in = PropertyIn(
id=str(catalogue_category_property_in.id),
name=catalogue_category_property_in.name,
value=catalogue_category_property.default_value,
unit=unit_value,
unit_id=catalogue_category_property.unit_id,
)

# Add property to all catalogue items of the catalogue category
self._catalogue_item_repository.insert_property_to_all_matching(
catalogue_category_id, property_in, session=session
)
# Add property to all catalogue items of the catalogue category
self._catalogue_item_repository.insert_property_to_all_matching(
catalogue_category_id, property_in, session=session
)

# Add property to all items of the catalogue items
# Obtain a list of ids to do this rather than iterate one by one as its faster. Limiting factor
# would be memory to store these ids and the network bandwidth it takes to send the request to the
# database but for 10000 items being updated this only takes 4.92 KB
catalogue_item_ids = self._catalogue_item_repository.list_ids(catalogue_category_id, session=session)
self._item_repository.insert_property_to_all_in(catalogue_item_ids, property_in, session=session)
# Add property to all items of the catalogue items
# Obtain a list of ids to do this rather than iterate one by one as its faster. Limiting factor
# would be memory to store these ids and the network bandwidth it takes to send the request to the
# database but for 10000 items being updated this only takes 4.92 KB
catalogue_item_ids = self._catalogue_item_repository.list_ids(catalogue_category_id, session=session)
self._item_repository.insert_property_to_all_in(catalogue_item_ids, property_in, session=session)

return catalogue_category_property_out

Expand Down Expand Up @@ -228,20 +227,19 @@ def update(
property_in = CatalogueCategoryPropertyIn(**{**existing_property_out.model_dump(), **update_data})

# Run all subsequent edits within a transaction to ensure they will all succeed or fail together
with mongodb_client.start_session() as session:
with session.start_transaction():
# Firstly update the catalogue category
property_out = self._catalogue_category_repository.update_property(
catalogue_category_id, catalogue_category_property_id, property_in, session=session
)
with start_session_transaction("performing catalogue category property migration update") as session:
# Firstly update the catalogue category
property_out = self._catalogue_category_repository.update_property(
catalogue_category_id, catalogue_category_property_id, property_in, session=session
)

# Avoid propagating changes unless absolutely necessary
if updating_name:
self._catalogue_item_repository.update_names_of_all_properties_with_id(
catalogue_category_property_id, catalogue_category_property.name, session=session
)
self._item_repository.update_names_of_all_properties_with_id(
catalogue_category_property_id, catalogue_category_property.name, session=session
)
# Avoid propagating changes unless absolutely necessary
if updating_name:
self._catalogue_item_repository.update_names_of_all_properties_with_id(
catalogue_category_property_id, catalogue_category_property.name, session=session
)
self._item_repository.update_names_of_all_properties_with_id(
catalogue_category_property_id, catalogue_category_property.name, session=session
)

return property_out

0 comments on commit 7891b97

Please sign in to comment.