Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for copy, avoid overlapping file events #441

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions tests/unit/test_eventmonitor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
unmanic.test_eventmonitor.py

Written by: Josh.5 <[email protected]>
Date: 30 Dec 2023, (6:59 PM)

Copyright:
Copyright (C) Josh Sunnex - All Rights Reserved

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
OR OTHER DEALINGS IN THE SOFTWARE.

"""
import pytest
from unittest.mock import patch
from unmanic.libs.eventmonitor import EventHandler

@pytest.mark.unittest
class TestEventMonitor(object):
def test_wait_for_file_stabilization_file_stable(self):
"""
Test that confirms that True is returned when the file does not change size.
It should only check the file size twice and sleep once.
"""
file_path = "/path/to/file.txt"
event_monitor = EventHandler([], 1, set())

with patch("os.path.getsize") as mock_getsize, \
patch("time.sleep") as mock_sleep:
mock_getsize.return_value = 100

result = event_monitor._wait_for_file_stabilization(file_path)

assert result is True
mock_getsize.assert_called_with(file_path)
assert mock_getsize.call_count == 2
assert mock_sleep.call_count == 1

def test_wait_for_file_stabilization_file_not_stable(self):
"""
Test that the method wait waits for the file to stabilize if the size changes.
It should check the file size 4 times and sleep 3 times.
"""
file_path = "/path/to/file.txt"
event_monitor = EventHandler([], 1, set())

with patch("os.path.getsize") as mock_getsize, \
patch("time.sleep") as mock_sleep:
mock_getsize.side_effect = [50, 100, 150, 150]

result = event_monitor._wait_for_file_stabilization(file_path)

assert result is True
assert mock_getsize.call_count == 4
assert mock_sleep.call_count == 3
mock_getsize.assert_called_with(file_path)

def test_wait_for_file_stabilization_file_deleted(self):
"""
Test that the method raises an exception if the file is moved or deleted while being checked for stabilization.
"""
file_path = "/path/to/file.txt"
event_monitor = EventHandler([], 1, set())

with patch("os.path.getsize") as mock_getsize, \
patch("time.sleep") as mock_sleep:
mock_getsize.side_effect = OSError

with pytest.raises(OSError):
event_monitor._wait_for_file_stabilization(file_path)

mock_getsize.assert_called_once_with(file_path)
mock_sleep.assert_not_called()

def test_wait_for_file_stabilization_timeout_reached(self):
"""
Test that ensures the method will throw a TimeoutError if the timeout is reached.
Since each iteration requires a sleep of 1 second, the method should sleep once before raising the exception.
"""
file_path = "/path/to/file.txt"
event_monitor = EventHandler([], 1, set())

with patch("os.path.getsize") as mock_getsize:
mock_getsize.side_effect = [100, 200, 300, 400]

with pytest.raises(TimeoutError):
event_monitor._wait_for_file_stabilization(file_path, timeout_seconds=0.1)

mock_getsize.assert_called_with(file_path)
assert mock_getsize.call_count == 1
65 changes: 56 additions & 9 deletions unmanic/libs/eventmonitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ class EventHandler(FileSystemEventHandler):
From this, the only event we really need to monitor is the "created" and "closed" events.
"""

def __init__(self, files_to_test, library_id):
def __init__(self, files_to_test, library_id, active_files):
self.active_files = active_files # This is a shared set
self.name = "EventProcessor"
self.files_to_test = files_to_test
self.library_id = library_id
Expand All @@ -93,16 +94,58 @@ def _log(self, message, message2='', level="info"):
getattr(self.logger, level)(message)

def on_any_event(self, event):
if event.event_type == "closed":
if event.event_type in ["created", "modified"]:
# Ensure event was not for a directory
if event.is_directory:
self._log("Detected event is for a directory. Ignoring...", level="debug")
else:
self._log("Detected '{}' event on file path '{}'".format(event.event_type, event.src_path))
self.files_to_test.put({
'src_path': event.src_path,
'library_id': self.library_id,
})
return

# Check and update the active_files set
with threading.Lock(): # Ensure thread safety
if event.src_path in self.active_files:
# File is already being processed, so ignore this event
self._log("Ignoring '{}' event on file path '{}' due to other events".format(event.event_type, event.src_path), level="debug")
return
self.active_files.add(event.src_path)

self._log("Detected '{}' event on file path '{}'".format(event.event_type, event.src_path))

try:
# Wait for file to be fully written to disk
self._wait_for_file_stabilization(event.src_path)
except Exception as e:
self._log(str(e), level="error")
with threading.Lock():
self.active_files.discard(event.src_path) # Remove the file from the set
return

self.files_to_test.put({
'src_path': event.src_path,
'library_id': self.library_id,
})

def _wait_for_file_stabilization(self, file_path, timeout_seconds=600):
"""
Wait for the file to be fully written to disk (i.e., file size becomes stable).

:param file_path: Path to the file to check.
:param timeout_seconds: Timeout in seconds.
:raises: TimeoutError if file stabilization times out.
"""
start_time = time.time()
last_size = -1

while time.time() - start_time < timeout_seconds:
current_size = os.path.getsize(file_path)

if last_size == current_size:
return True # File size is stable

last_size = current_size
self._log("Monitoring file path '{}' for changes, waiting.".format(file_path), level="debug")
time.sleep(1)

raise TimeoutError("Timeout reached while waiting for file stabilization.")


class EventMonitorManager(threading.Thread):
Expand All @@ -126,6 +169,7 @@ def __init__(self, data_queues, event):
# Create an event queue
self.files_to_test = queue.Queue()

self.active_files = set()
self.abort_flag = threading.Event()
self.abort_flag.clear()

Expand Down Expand Up @@ -229,7 +273,7 @@ def start_event_processor(self):
if not os.path.exists(library_path):
continue
self._log("Adding library path to monitor '{}'".format(library_path))
event_handler = EventHandler(self.files_to_test, library.get_id())
event_handler = EventHandler(self.files_to_test, library.get_id(), self.active_files)
self.event_observer_thread.schedule(event_handler, library_path, recursive=True)
monitoring_path = True
# Only start observer if a path was added to be monitored
Expand Down Expand Up @@ -287,6 +331,9 @@ def manage_event_queue(self, pathname, library_id):
self._log("File contains Unicode characters that cannot be processed. Ignoring.", level="warning")
except Exception as e:
self._log("Exception testing file path in {}. Ignoring.".format(self.name), message2=str(e), level="exception")
finally:
with threading.Lock():
self.active_files.discard(pathname) # Remove the file from the set after processing

def __add_path_to_queue(self, pathname, library_id, priority_score):
"""
Expand Down
Loading