Skip to content

Commit

Permalink
[MDB-30666] Broken zero copy search and reattach (#228)
Browse files Browse the repository at this point in the history
* Draft

* Exception handler

* From draft to PR

* Tests fix

* Fix tests againg

* Review fix

* more method desc
  • Loading branch information
MikhailBurdukov authored Sep 6, 2024
1 parent b3e694d commit eeb827f
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 3 deletions.
1 change: 1 addition & 0 deletions ch_tools/chadmin/chadmin_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ def cli(ctx, format_, settings, timeout, port, debug):
commands: List[Any] = [
config_command,
diagnostics_command,
data_store_group,
list_async_metrics_command,
list_events_command,
list_functions_command,
Expand Down
187 changes: 184 additions & 3 deletions ch_tools/chadmin/cli/data_store_group.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,36 @@
import os
import shutil
import subprocess
from typing import List, Optional
from pathlib import Path
from typing import Dict, List, Optional, Tuple

from click import group, option, pass_context
import boto3
from click import Context, group, option, pass_context

from ch_tools.chadmin.cli.chadmin_group import Chadmin
from ch_tools.chadmin.internal.clickhouse_disks import (
CLICKHOUSE_METADATA_PATH,
CLICKHOUSE_PATH,
CLICKHOUSE_STORE_PATH,
S3_METADATA_STORE_PATH,
make_ch_disks_config,
remove_from_ch_disk,
)
from ch_tools.chadmin.internal.utils import remove_from_disk
from ch_tools.chadmin.internal.object_storage.s3_object_metadata import (
S3ObjectLocalInfo,
S3ObjectLocalMetaData,
)
from ch_tools.chadmin.internal.utils import execute_query, remove_from_disk
from ch_tools.common import logging
from ch_tools.common.cli.formatting import print_response
from ch_tools.common.clickhouse.client import OutputFormat
from ch_tools.common.clickhouse.config import get_clickhouse_config
from ch_tools.common.clickhouse.config.storage_configuration import S3DiskConfiguration
from ch_tools.common.process_pool import WorkerTask, execute_tasks_in_parallel

ATTACH_DETTACH_TIMEOUT = 5000
ATTACH_DETACH_QUERY_RETRY = 10


@group("data-store", cls=Chadmin)
def data_store_group():
Expand Down Expand Up @@ -312,3 +325,171 @@ def collect_orphaned_sql_objects_recursive(
max_depth,
max_sql_objects,
)


@data_store_group.command("detect-broken-partitions")
@option(
"--root-path",
"root_path",
default=S3_METADATA_STORE_PATH,
help="Set the store subdirectory path.",
)
@option(
"--reattach",
is_flag=True,
default=False,
help="Flag to reattach broken partitions.",
)
@pass_context
def detect_broken_partitions(ctx, root_path, reattach):
parts_paths_with_lost_keys = find_paths_to_part_with_lost_keys(ctx, root_path)
partition_list = get_partitions_by_path(ctx, parts_paths_with_lost_keys)

print_response(ctx, partition_list, default_format="table")
if reattach:
for partition_info in partition_list:
reattach_partition(
ctx, partition_info["table"], partition_info["partition"]
)


def find_paths_to_part_with_lost_keys(ctx: Context, root_path: str) -> List[str]:
"""
Find paths of parts with keys that doesn't have objects in s3.
"""

result = []

ch_config = get_clickhouse_config(ctx)

disk_conf: S3DiskConfiguration = (
ch_config.storage_configuration.s3_disk_configuration(
"object_storage", ctx.obj["config"]["object_storage"]["bucket_name_prefix"]
)
)
s3_client = boto3.client(
"s3",
endpoint_url=disk_conf.endpoint_url,
aws_access_key_id=disk_conf.access_key_id,
aws_secret_access_key=disk_conf.secret_access_key,
)

for path, _, files in os.walk(root_path):
objects: List[S3ObjectLocalInfo] = []
logging.debug(f"Checking files from: {path}")
for file in files:
file_full_path = Path(os.path.join(path, file))
objects.extend(S3ObjectLocalMetaData.from_file(file_full_path).objects)

for s3_object in objects:
full_key = os.path.join(disk_conf.prefix, s3_object.key)
if not check_key_in_object_storage(
s3_client, disk_conf.bucket_name, full_key
):
logging.debug("Not found key {}", full_key)
result.append(path)
logging.debug(
"Add part to check with path", result[-1][0], result[-1][1]
)
break

logging.debug("Found parts with missing s3 keys. Local paths of parts: {}", result)
return result


def check_key_in_object_storage(s3_client: boto3.client, bucket: str, key: str) -> bool:
"""
Check that object exists in s3 bucket with the specified key.
"""
s3_resp = s3_client.list_objects_v2(
Bucket=bucket,
Prefix=key,
)
if "Contents" not in s3_resp:
return False
if len(s3_resp["Contents"]) != 1:
return False
res = s3_resp["Contents"][0]["Key"] == key
return res


def get_partitions_by_path(
ctx: Context, parts_paths: List[str]
) -> List[Dict[str, str]]:
"""
For each path of part match corresponding table and partition.
"""
partitions = set()
for path in parts_paths:
query_string = (
f"SELECT database, table, partition FROM system.parts WHERE path='{path}/'"
)
res = execute_query(ctx, query_string, format_=OutputFormat.JSONCompact)
if "data" not in res:
logging.warning("Not found data for part with path {}", path)
continue

if len(res["data"]) != 1 or len(res["data"][0]) != 3:
continue
table = f"`{res['data'][0][0]}`.`{res['data'][0][1]}`"
partition = res["data"][0][2]
partitions.add((table, partition))

# It's not really necessary, just to make output stable for tests.
partitions_list: List[Tuple[str, str]] = list(partitions)
partitions_list.sort()

result = [
{"table": partition[0], "partition": partition[1]}
for partition in partitions_list
]
return result


def query_with_retry(ctx: Context, query: str, timeout: int, retries: int) -> bool:
"""
Execute clickhouse query with given number of retries.
"""
logging.debug("Execute query: {}", query)
for retry in range(retries):
try:
res = execute_query(ctx, query, timeout=timeout)
if res == "":
break
except Exception as e:
if retry + 1 == retries:
logging.warning("Query {} failed with: {!r}\n", query, e)
return False
continue

logging.info("Query {} finished successfully", query)
return True


def reattach_partition(ctx: Context, table: str, partition: str) -> bool:
"""
Run Detach , Attach for given partition.
"""
logging.debug(f"Going to reattach partition {partition} for table {table}")
detach_query = f"ALTER TABLE {table} DETACH PARTITION '{partition}'"
attach_query = f"ALTER TABLE {table} ATTACH PARTITION '{partition}'"

res = query_with_retry(
ctx,
detach_query,
timeout=ATTACH_DETTACH_TIMEOUT,
retries=ATTACH_DETACH_QUERY_RETRY,
)
if not res:
return res

# To avoid keeping detached partitions, perform the attach query with double attempts.
res = query_with_retry(
ctx,
attach_query,
timeout=ATTACH_DETTACH_TIMEOUT,
retries=2 * ATTACH_DETACH_QUERY_RETRY,
)
if not res:
return res
return True
90 changes: 90 additions & 0 deletions ch_tools/chadmin/internal/object_storage/s3_object_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import re
from dataclasses import dataclass
from pathlib import Path
from typing import List

MAX_METADATA_FILE_SIZE = 10 * 1024


@dataclass
class S3ObjectLocalInfo:
"""
Information about the S3 object stored locally in the metadata file.
"""

key: str
size: int


@dataclass
class S3ObjectLocalMetaData:
"""
Parsed content of metadata file stored on the local disk.
"""

version: int
total_size: int
objects: List[S3ObjectLocalInfo]
ref_counter: int
read_only: bool

@classmethod
def from_string(cls, value: str) -> "S3ObjectLocalMetaData":
lines = value.splitlines()
idx = 0

matches = re.match(r"^[123]$", lines[idx])
if not matches:
raise ValueError(f"Incorrect metadata version. Line: `{lines[idx]}`")
version = int(matches[0])
idx += 1

matches = re.match(r"^(\d+)\s+(\d+)$", lines[idx])
if not matches:
raise ValueError(
f"Incorrect metadata about the objects count and total size. Line: `{lines[idx]}`"
)
object_count, total_size = int(matches[1]), int(matches[2])
idx += 1

objects: List[S3ObjectLocalInfo] = []
for _ in range(object_count):
matches = re.match(r"^(\d+)\s+(\S+)$", lines[idx])
if not matches:
raise ValueError(
f"Incorrect metadata about object size and name. Line: `{lines[idx]}`"
)
objects.append(S3ObjectLocalInfo(key=matches[2], size=int(matches[1])))
idx += 1

matches = re.match(r"^\d+$", lines[idx])
if not matches:
raise ValueError(
f"Incorrect metadata about refcounter. Line: `{lines[idx]}`"
)
refcounter = int(lines[idx])
idx += 1

matches = re.match("^[01]$", lines[idx])
if not matches:
raise ValueError(
f"Incorrect metadata about readonly flag. Line: `{lines[idx]}`"
)
read_only = bool(int(matches[0]))

return cls(
version=version,
total_size=total_size,
objects=objects,
ref_counter=refcounter,
read_only=read_only,
)

@classmethod
def from_file(cls, path: Path) -> "S3ObjectLocalMetaData":
if path.stat().st_size > MAX_METADATA_FILE_SIZE:
raise ValueError(
f"Metadata file too large. Its size must not exceed {MAX_METADATA_FILE_SIZE} bytes"
)
with path.open(encoding="latin-1") as file:
return cls.from_string(file.read())
65 changes: 65 additions & 0 deletions tests/features/data_storage_group.feature
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,68 @@ Feature: chadmin data-store commands
- path: /var/lib/clickhouse/disks/object_storage/data/db1/test1
deleted: 'Yes'
"""

Scenario Outline: Reattach partitions with broken parts from zero copy
When we execute queries on clickhouse01
"""
SYSTEM STOP MERGES;
DROP DATABASE IF EXISTS test_db;
CREATE DATABASE test_db;
CREATE TABLE test_db.table1 (a int, b int) ENGINE=ReplicatedMergeTree('/clickhouse/{database}/{table}', '{replica}') ORDER BY a PARTITION BY a
SETTINGS storage_policy='object_storage', allow_remote_fs_zero_copy_replication=1 <additional_table_settings>;
INSERT INTO test_db.table1 SELECT 1, 1;
INSERT INTO test_db.table1 SELECT 1, 2;
INSERT INTO test_db.table1 SELECT 2, 1;
INSERT INTO test_db.table1 SELECT 3, 1;
CREATE TABLE test_db.table2 (a int, b int) ENGINE=ReplicatedMergeTree('/clickhouse/{database}/{table}', '{replica}') ORDER BY a PARTITION BY a
SETTINGS storage_policy='object_storage', allow_remote_fs_zero_copy_replication=1 <additional_table_settings>;
INSERT INTO test_db.table2 SELECT 1, 1;
INSERT INTO test_db.table2 SELECT 2, 1;
INSERT INTO test_db.table2 SELECT 3, 1;
"""
# Imitate "bad" behavior of zero copy, when s3 objects are removed earlier than needed.
And we remove key from s3 for partitions database test_db on clickhouse01
"""
test_db:
table1: ['1', '2']
table2: ['1', '2', '3']
"""
When we execute command on clickhouse01
"""
chadmin --format yaml data-store detect-broken-partitions
"""
Then we get response contains
"""
- table: '`test_db`.`table1`'
partition: '1'
- table: '`test_db`.`table1`'
partition: '2'
- table: '`test_db`.`table2`'
partition: '1'
- table: '`test_db`.`table2`'
partition: '2'
- table: '`test_db`.`table2`'
partition: '3'
"""
When we execute command on clickhouse01
"""
chadmin --format yaml data-store detect-broken-partitions --reattach
"""
When we execute queries on clickhouse01
"""
SELECT * FROM test_db.table1;
SELECT * FROM test_db.table2;
"""

@require_version_24.4
Examples:
| additional_table_settings |
| , disable_detach_partition_for_zero_copy_replication = 0|

@require_version_less_than_24.4
Examples:
| additional_table_settings |
| |
Loading

0 comments on commit eeb827f

Please sign in to comment.