Skip to content

Commit

Permalink
Move setting to mergetree
Browse files Browse the repository at this point in the history
  • Loading branch information
MikhailBurdukov committed Feb 4, 2024
1 parent 778efb8 commit b90a5b9
Show file tree
Hide file tree
Showing 22 changed files with 53 additions and 105 deletions.
1 change: 0 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,6 @@ class IColumn;
M(Bool, mutations_execute_nondeterministic_on_initiator, false, "If true nondeterministic function are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(Bool, mutations_execute_subqueries_on_initiator, false, "If true scalar subqueries are executed on initiator and replaced to literals in UPDATE and DELETE queries", 0) \
M(UInt64, mutations_max_literal_size_to_replace, 16384, "The maximum size of serialized literal in bytes to replace in UPDATE and DELETE queries", 0) \
M(UInt64, max_postpone_time_for_failed_mutations, 0ul, "The maximum postpone time for failed mutations in ms.", 0) \
\
M(Float, create_replicated_merge_tree_fault_injection_probability, 0.0f, "The probability of a fault injection during table creation after creating metadata in ZooKeeper", 0) \
\
Expand Down
16 changes: 0 additions & 16 deletions src/IO/ReadHelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,22 +104,6 @@ bool checkString(const char * s, ReadBuffer & buf)
return true;
}

bool checkStringWithPositionSaving(const char * s, ReadBuffer & buf)
{
auto initial_position = buf.position();
for (; *s; ++s)
{
if (buf.eof() || *buf.position() != *s)
{
buf.position() = initial_position;
return false;
}
++buf.position();
}
buf.position() = initial_position;
return true;
}


bool checkStringCaseInsensitive(const char * s, ReadBuffer & buf)
{
Expand Down
3 changes: 0 additions & 3 deletions src/IO/ReadHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,9 +216,6 @@ inline void assertString(const String & s, ReadBuffer & buf)
}

bool checkString(const char * s, ReadBuffer & buf);

bool checkStringWithPositionSaving(const char * s, ReadBuffer & buf);

inline bool checkString(const String & s, ReadBuffer & buf)
{
return checkString(s.c_str(), buf);
Expand Down
6 changes: 0 additions & 6 deletions src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2942,12 +2942,6 @@ BackgroundTaskSchedulingSettings Context::getBackgroundMoveTaskSchedulingSetting
return task_settings;
}

size_t Context::getMaxPostponeTimeForFailedMutations() const
{
const auto & config = getConfigRef();
return config.getUInt("max_postpone_time_for_failed_mutations", 0ull);
}

BackgroundSchedulePool & Context::getSchedulePool() const
{
callOnce(shared->schedule_pool_initialized, [&] {
Expand Down
3 changes: 0 additions & 3 deletions src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -1010,9 +1010,6 @@ class Context: public ContextData, public std::enable_shared_from_this<Context>
BackgroundTaskSchedulingSettings getBackgroundProcessingTaskSchedulingSettings() const;
BackgroundTaskSchedulingSettings getBackgroundMoveTaskSchedulingSettings() const;

// Setting for backoff policy for failed mutation tasks.
size_t getMaxPostponeTimeForFailedMutations() const;

BackgroundSchedulePool & getBufferFlushSchedulePool() const;
BackgroundSchedulePool & getSchedulePool() const;
BackgroundSchedulePool & getMessageBrokerSchedulePool() const;
Expand Down
1 change: 0 additions & 1 deletion src/Storages/MergeTree/MergeTreeData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,6 @@ MergeTreeData::MergeTreeData(
, parts_mover(this)
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
, mutation_backoff_policy(getContext())
{
context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded();

Expand Down
4 changes: 0 additions & 4 deletions src/Storages/MergeTree/MergeTreeData.h
Original file line number Diff line number Diff line change
Expand Up @@ -1413,10 +1413,6 @@ class MergeTreeData : public IStorage, public WithMutableContext
mutable std::mutex parts_info_lock;

public:
explicit PartMutationBackoffPolicy(ContextPtr global_context_)
: WithContext(global_context_)
{
}

void removeFromFailedByVersion(UInt64 mutation_version)
{
Expand Down
12 changes: 1 addition & 11 deletions src/Storages/MergeTree/MergeTreeMutationEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_)
}

MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings, size_t max_postpone_time_)
const TransactionID & tid_, const WriteSettings & settings)
: create_time(time(nullptr))
, commands(std::move(commands_))
, disk(std::move(disk_))
, path_prefix(path_prefix_)
, file_name("tmp_mutation_" + toString(tmp_number) + ".txt")
, is_temp(true)
, max_postpone_time(max_postpone_time_)
, tid(tid_)
{
try
Expand All @@ -66,10 +65,6 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
*out << "commands: ";
commands.writeText(*out, /* with_pure_metadata_commands = */ false);
*out << "\n";
*out << "max postpone time: ";
*out << max_postpone_time;
*out << "\n";

if (tid.isPrehistoric())
{
csn = Tx::PrehistoricCSN;
Expand Down Expand Up @@ -141,11 +136,6 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat
commands.readText(*buf);
*buf >> "\n";

if (!buf->eof() && checkStringWithPositionSaving("max postpone time: ", *buf))
{
*buf >> "max postpone time: " >> max_postpone_time >> "\n";
}

if (buf->eof())
{
tid = Tx::PrehistoricTID;
Expand Down
3 changes: 1 addition & 2 deletions src/Storages/MergeTree/MergeTreeMutationEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ struct MergeTreeMutationEntry
String file_name;
bool is_temp = false;

size_t max_postpone_time;
UInt64 block_number = 0;

String latest_failed_part;
Expand All @@ -39,7 +38,7 @@ struct MergeTreeMutationEntry

/// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number,
const TransactionID & tid_, const WriteSettings & settings, size_t max_postpone_time);
const TransactionID & tid_, const WriteSettings & settings);
MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete;
MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default;

Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ struct Settings;
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \
M(UInt64, max_postpone_time_for_failed_mutations, 0ul, "The maximum postpone time for failed mutations in ms.", 0) \
\
/** Compatibility settings */ \
M(Bool, allow_suspicious_indices, false, "Reject primary/secondary indexes and sorting keys with identical expressions", 0) \
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/MergeTree/ReplicatedMergeMutateTaskBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ bool ReplicatedMergeMutateTaskBase::executeStep()
status.latest_fail_time = time(nullptr);
status.latest_fail_reason = getExceptionMessage(saved_exception, false);
if (result_data_version == it->first)
storage.mutation_backoff_policy.addPartMutationFailure(src_part, source_part_info.mutation + 1, log_entry->max_postpone_time);
storage.mutation_backoff_policy.addPartMutationFailure(src_part, result_data_version, storage.getSettings()->max_postpone_time_for_failed_mutations);
}
}
}
Expand Down
5 changes: 1 addition & 4 deletions src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const

if (isAlterMutation())
out << "\nalter_version\n" << alter_version;
out << "\nmax_postpone_time\n" << max_postpone_time;
break;

case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global
Expand Down Expand Up @@ -319,9 +318,7 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in, MergeTreeDataFor
{
in >> "\n";

if (checkString("max_postpone_time\n", in))
in >> max_postpone_time;
else if (checkString("alter_version\n", in))
if (checkString("alter_version\n", in))
in >> alter_version;
else if (checkString("to_uuid\n", in))
in >> new_part_uuid;
Expand Down
1 change: 0 additions & 1 deletion src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ struct ReplicatedMergeTreeLogEntryData
size_t num_postponed = 0; /// The number of times the action was postponed.
String postpone_reason; /// The reason why the action was postponed, if it was postponed.
time_t last_postpone_time = 0; /// The time of the last time the action was postponed.
size_t max_postpone_time = 0;

/// Creation time or the time to copy from the general log to the queue of a particular replica.
time_t create_time = 0;
Expand Down
6 changes: 0 additions & 6 deletions src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ void ReplicatedMergeTreeMutationEntry::writeText(WriteBuffer & out) const

out << "alter version: ";
out << alter_version;
out << "\n";

out << "max postpone time: ";
out << max_postpone_time;
}

void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
Expand Down Expand Up @@ -61,9 +58,6 @@ void ReplicatedMergeTreeMutationEntry::readText(ReadBuffer & in)
commands.readText(in);
if (checkString("\nalter version: ", in))
in >> alter_version;
if (checkString("\nmax postpone time: ", in))
in >> max_postpone_time;

}

String ReplicatedMergeTreeMutationEntry::toString() const
Expand Down
2 changes: 0 additions & 2 deletions src/Storages/MergeTree/ReplicatedMergeTreeMutationEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ struct ReplicatedMergeTreeMutationEntry
std::shared_ptr<const IBackupEntry> backup() const;

String getBlockNumbersForLogs() const;

size_t max_postpone_time = 0;
};

using ReplicatedMergeTreeMutationEntryPtr = std::shared_ptr<const ReplicatedMergeTreeMutationEntry>;
Expand Down
6 changes: 2 additions & 4 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2481,7 +2481,7 @@ bool ReplicatedMergeTreeMergePredicate::partParticipatesInReplaceRange(const Mer
}


std::optional<ReplicatedMergeTreeMergePredicate::DesiredMutationDescription> ReplicatedMergeTreeMergePredicate::getDesiredMutationDescription(const MergeTreeData::DataPartPtr & part) const
std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const
{
/// Assigning mutations is easier than assigning merges because mutations appear in the same order as
/// the order of their version numbers (see StorageReplicatedMergeTree::mutate).
Expand Down Expand Up @@ -2509,7 +2509,6 @@ std::optional<ReplicatedMergeTreeMergePredicate::DesiredMutationDescription> Re

Int64 current_version = queue.getCurrentMutationVersion(part->info.partition_id, part->info.getDataVersion());
Int64 max_version = in_partition->second.begin()->first;
size_t mutation_postpone_time = 0ul;

int alter_version = -1;
bool barrier_found = false;
Expand All @@ -2528,7 +2527,6 @@ std::optional<ReplicatedMergeTreeMergePredicate::DesiredMutationDescription> Re
}

max_version = mutation_version;
mutation_postpone_time = mutation_status->entry->max_postpone_time;
if (current_version < max_version)
++mutations_count;

Expand Down Expand Up @@ -2562,7 +2560,7 @@ std::optional<ReplicatedMergeTreeMergePredicate::DesiredMutationDescription> Re
LOG_TRACE(queue.log, "Will apply {} mutations and mutate part {} to version {} (the last version is {})",
mutations_count, part->name, max_version, in_partition->second.rbegin()->first);

return DesiredMutationDescription({max_version, alter_version, mutation_postpone_time});
return std::make_pair(max_version, alter_version);
}


Expand Down
9 changes: 1 addition & 8 deletions src/Storages/MergeTree/ReplicatedMergeTreeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -563,19 +563,12 @@ class ReplicatedMergeTreeMergePredicate : public BaseMergePredicate<ActiveDataPa
/// We should not drop part in this case, because replication queue may stuck without that part.
bool partParticipatesInReplaceRange(const MergeTreeData::DataPartPtr & part, String & out_reason) const;


struct DesiredMutationDescription
{
Int64 mutation_version;
int32_t alter_version;
size_t max_postpone_time;
};
/// Return nonempty optional of desired mutation version and alter version.
/// If we have no alter (modify/drop) mutations in mutations queue, than we return biggest possible
/// mutation version (and -1 as alter version). In other case, we return biggest mutation version with
/// smallest alter version. This required, because we have to execute alter mutations sequentially and
/// don't glue them together. Alter is rare operation, so it shouldn't affect performance.
std::optional<DesiredMutationDescription> getDesiredMutationDescription(const MergeTreeData::DataPartPtr & part) const;
std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;

bool isMutationFinished(const std::string & znode_name, const std::map<String, int64_t> & block_numbers,
std::unordered_set<String> & checked_partitions_cache) const;
Expand Down
5 changes: 2 additions & 3 deletions src/Storages/StorageMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
{
std::lock_guard lock(currently_processing_in_background_mutex);

MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get(), current_tid, getContext()->getWriteSettings(),
query_context->getSettings().max_postpone_time_for_failed_mutations);
MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get(), current_tid, getContext()->getWriteSettings());
version = increment.get();
entry.commit(version);
String mutation_id = entry.file_name;
Expand Down Expand Up @@ -574,7 +573,7 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re

if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
{
mutation_backoff_policy.addPartMutationFailure(failed_part->name, it->first, entry.max_postpone_time);
mutation_backoff_policy.addPartMutationFailure(failed_part->name, it->first, getSettings()->max_postpone_time_for_failed_mutations);
}
}
}
Expand Down
19 changes: 8 additions & 11 deletions src/Storages/StorageReplicatedMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3811,17 +3811,16 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (part->getBytesOnDisk() > max_source_part_size_for_mutation)
continue;

std::optional<ReplicatedMergeTreeMergePredicate::DesiredMutationDescription> desired_mutation_description = merge_pred->getDesiredMutationDescription(part);
if (!desired_mutation_description)
std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred->getDesiredMutationVersion(part);
if (!desired_mutation_version)
continue;

create_result = createLogEntryToMutatePart(
*part,
future_merged_part->uuid,
desired_mutation_description->mutation_version,
desired_mutation_description->alter_version,
merge_pred->getVersion(),
desired_mutation_description->max_postpone_time);
desired_mutation_version->first,
desired_mutation_version->second,
merge_pred->getVersion());

if (create_result == CreateMergeEntryResult::Ok)
return AttemptStatus::EntryCreated;
Expand Down Expand Up @@ -3990,7 +3989,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c


StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::createLogEntryToMutatePart(
const IMergeTreeDataPart & part, const UUID & new_part_uuid, Int64 mutation_version, int32_t alter_version, int32_t log_version, size_t max_postpone_time)
const IMergeTreeDataPart & part, const UUID & new_part_uuid, Int64 mutation_version, int32_t alter_version, int32_t log_version)
{
auto zookeeper = getZooKeeper();

Expand Down Expand Up @@ -4020,7 +4019,7 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.new_part_uuid = new_part_uuid;
entry.create_time = time(nullptr);
entry.alter_version = alter_version;
entry.max_postpone_time = max_postpone_time;

Coordination::Requests ops;
Coordination::Responses responses;

Expand Down Expand Up @@ -7373,7 +7372,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, Conte
ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.source_replica = replica_name;
mutation_entry.commands = commands;
mutation_entry.max_postpone_time = query_context->getSettings().max_postpone_time_for_failed_mutations;

const String mutations_path = fs::path(zookeeper_path) / "mutations";
const auto zookeeper = getZooKeeper();

Expand Down Expand Up @@ -7482,8 +7481,6 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
if (!mutation_entry)
return CancellationCode::NotFound;

mutation_backoff_policy.removeFromFailedByVersion(static_cast<UInt64>(mutation_entry->alter_version));

/// After this point no new part mutations will start and part mutations that still exist
/// in the queue will be skipped.

Expand Down
4 changes: 1 addition & 3 deletions src/Storages/StorageReplicatedMergeTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -754,9 +754,7 @@ class StorageReplicatedMergeTree final : public MergeTreeData
const UUID & new_part_uuid,
Int64 mutation_version,
int32_t alter_version,
int32_t log_version,
size_t max_postpone_time = 0
);
int32_t log_version);

/** Returns an empty string if no one has a part.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
<clickhouse>
<merge_tree>
<max_postpone_time_for_failed_mutations>60000</max_postpone_time_for_failed_mutations>
</merge_tree>
</clickhouse>
Loading

0 comments on commit b90a5b9

Please sign in to comment.