-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MDB-29707] Metatdata cleaner with system drop replica #241
Conversation
@@ -37,6 +37,7 @@ | |||
"unfreeze_timeout": 10 * 60, | |||
"restart_replica_timeout": 10 * 60, | |||
"restore_replica_timeout": 10 * 60, | |||
"drop_replica_timeout": 10 * 60, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 min is enough? or set it to 1 hour?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its ok. We will adjust it if necessary
WDYT @Alex-Burmak @aalexfvk ? |
""" | ||
|
||
if block_until_finised_tasks: | ||
self._queue_active[0][1].wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self._queue_active
can be empty
self._max_active_tasks = max_parrallel_tasks | ||
|
||
self._queue_pending: Deque[str] = deque() | ||
self._queue_active: Deque[Tuple[str, IAsyncResult]] = deque( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make dataclass for the tuple to get rid of naked indices like [0][1]
|
||
if not self._exists_tasks_to_do(): | ||
return None | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we can add waiting for the first ready result here and remove with_block
and block_until_finished_tasks
parameters from everywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can. But this will lead to unnecessary waiting every time we call the queue update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we remove waiting from _update_active_queue
at all and insert it here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved implementation with wait to the separate function
replica_path = os.path.join(path, "replicas") | ||
if not zk.exists(replica_path): | ||
for replicated_object in replicated_objects: | ||
# Actually rn in the ch (10.24) there are no secure way to determine that node is the root of replicated table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Actually rn in the ch (10.24) there are no secure way to determine that node is the root of replicated table. | |
# Actually there is no a reliable way to determine that node is the root of replicated table in the CH (24.10) |
if not zk.exists(replica_path): | ||
for replicated_object in replicated_objects: | ||
# Actually rn in the ch (10.24) there are no secure way to determine that node is the root of replicated table. | ||
# So we are accuming that if object not database then it is table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# So we are accuming that if object not database then it is table. | |
# So we are assuming that if object is not a database then it is a table. |
is_database = bool( | ||
zk.get(replicated_object.path)[0] == REPLICATED_DATABASE_MARKER | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is casting to bool
needed here ?
if replicated_object.path not in databases_to_cleanup: | ||
databases_to_cleanup[replicated_object.path] = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about using defaultdict
for the databases_to_cleanup
?
@@ -37,6 +37,7 @@ | |||
"unfreeze_timeout": 10 * 60, | |||
"restart_replica_timeout": 10 * 60, | |||
"restore_replica_timeout": 10 * 60, | |||
"drop_replica_timeout": 10 * 60, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its ok. We will adjust it if necessary
This reverts commit 6502ec3.
A little performance test:
gen script: