Skip to content

Commit

Permalink
Fix StorageURL forgetting headers on server restart
Browse files Browse the repository at this point in the history
  • Loading branch information
al13n321 committed Jan 19, 2024
1 parent 6a75641 commit fd36127
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 101 deletions.
1 change: 1 addition & 0 deletions src/Storages/StorageFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class StorageFactory : private boost::noncopyable, public IHints<>
struct Arguments
{
const String & engine_name;
/// Mutable to allow replacing constant expressions with literals, and other transformations.
ASTs & engine_args;
ASTStorage * storage_def;
const ASTCreateQuery & query;
Expand Down
32 changes: 10 additions & 22 deletions src/Storages/StorageS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1488,33 +1488,21 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// S3('url', 'aws_access_key_id', 'aws_secret_access_key', 'session_token', 'format', 'compression')
/// with optional headers() function

if (engine_args.empty() || engine_args.size() > 6)
size_t count = StorageURL::evalArgsAndCollectHeaders(engine_args, configuration.headers_from_ast, local_context);

if (count == 0 || count > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Storage S3 requires 1 to 5 arguments: "
"url, [NOSIGN | access_key_id, secret_access_key], name of used format and [compression_method]");

auto * header_it = StorageURL::collectHeaders(engine_args, configuration.headers_from_ast, local_context);
if (header_it != engine_args.end())
engine_args.erase(header_it);

for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, local_context);

/// Size -> argument indexes
static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_engine_args
{
{1, {{}}},
{6, {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}}}
};

std::unordered_map<std::string_view, size_t> engine_args_to_idx;
bool no_sign_request = false;

/// For 2 arguments we support 2 possible variants:
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
if (engine_args.size() == 2)
if (count == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
Expand All @@ -1524,10 +1512,10 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
}
/// For 3 arguments we support 2 possible variants:
/// - s3(source, format, compression_method)
/// - s3(source, access_key_id, access_key_id)
/// - s3(source, access_key_id, secret_access_key)
/// - s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or format name.
else if (engine_args.size() == 3)
else if (count == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
Expand All @@ -1545,7 +1533,7 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// - s3(source, access_key_id, secret_access_key, format)
/// - s3(source, NOSIGN, format, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN or not.
else if (engine_args.size() == 4)
else if (count == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(engine_args[1], "access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
Expand All @@ -1569,7 +1557,7 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
/// For 5 arguments we support 2 possible variants:
/// - s3(source, access_key_id, secret_access_key, session_token, format)
/// - s3(source, access_key_id, secret_access_key, format, compression)
else if (engine_args.size() == 5)
else if (count == 5)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "session_token/format");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
Expand All @@ -1581,9 +1569,9 @@ StorageS3::Configuration StorageS3::getConfiguration(ASTs & engine_args, Context
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}};
}
}
else
else if (count == 6)
{
engine_args_to_idx = size_to_engine_args[engine_args.size()];
engine_args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"compression_method", 5}};
}

/// This argument is always the first
Expand Down
22 changes: 12 additions & 10 deletions src/Storages/StorageURL.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1324,7 +1324,7 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum
return format_settings;
}

ASTs::iterator StorageURL::collectHeaders(
size_t StorageURL::evalArgsAndCollectHeaders(
ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context)
{
ASTs::iterator headers_it = url_function_args.end();
Expand Down Expand Up @@ -1382,7 +1382,11 @@ ASTs::iterator StorageURL::collectHeaders(
(*arg_it) = evaluateConstantExpressionOrIdentifierAsLiteral((*arg_it), context);
}

return headers_it;
if (headers_it == url_function_args.end())
return url_function_args.size();

std::rotate(headers_it, std::next(headers_it), url_function_args.end());
return url_function_args.size() - 1;
}

void StorageURL::processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection)
Expand Down Expand Up @@ -1412,21 +1416,19 @@ StorageURL::Configuration StorageURL::getConfiguration(ASTs & args, ContextPtr l
if (auto named_collection = tryGetNamedCollectionWithOverrides(args, local_context))
{
StorageURL::processNamedCollectionResult(configuration, *named_collection);
collectHeaders(args, configuration.headers, local_context);
evalArgsAndCollectHeaders(args, configuration.headers, local_context);
}
else
{
if (args.empty() || args.size() > 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, bad_arguments_error_message);
size_t count = evalArgsAndCollectHeaders(args, configuration.headers, local_context);

auto * header_it = collectHeaders(args, configuration.headers, local_context);
if (header_it != args.end())
args.erase(header_it);
if (count == 0 || count > 3)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, bad_arguments_error_message);

configuration.url = checkAndGetLiteralArgument<String>(args[0], "url");
if (args.size() > 1)
if (count > 1)
configuration.format = checkAndGetLiteralArgument<String>(args[1], "format");
if (args.size() == 3)
if (count == 3)
configuration.compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
}

Expand Down
5 changes: 4 additions & 1 deletion src/Storages/StorageURL.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,10 @@ class StorageURL : public IStorageURLBase

static Configuration getConfiguration(ASTs & args, ContextPtr context);

static ASTs::iterator collectHeaders(ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context);
/// Does evaluateConstantExpressionOrIdentifierAsLiteral() on all arguments.
/// If `headers(...)` argument is present, parses it and moves it to the end of the array.
/// Returns number of arguments excluding `headers(...)`.
static size_t evalArgsAndCollectHeaders(ASTs & url_function_args, HTTPHeaderEntries & header_entries, ContextPtr context);

static void processNamedCollectionResult(Configuration & configuration, const NamedCollection & collection);
};
Expand Down
93 changes: 35 additions & 58 deletions src/TableFunctions/TableFunctionS3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,11 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
else
{

auto * header_it = StorageURL::collectHeaders(args, configuration.headers_from_ast, context);
if (header_it != args.end())
args.erase(header_it);
size_t count = StorageURL::evalArgsAndCollectHeaders(args, configuration.headers_from_ast, context);

if (args.empty() || args.size() > 7)
if (count == 0 || count > 7)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());

for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);

/// Size -> argument indexes
static std::unordered_map<size_t, std::unordered_map<std::string_view, size_t>> size_to_args
{
{1, {{}}},
{7, {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}, {"compression_method", 6}}}
};

std::unordered_map<std::string_view, size_t> args_to_idx;

bool no_sign_request = false;
Expand All @@ -92,7 +80,7 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
/// - s3(source, format)
/// - s3(source, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
if (args.size() == 2)
if (count == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
Expand All @@ -102,10 +90,10 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
/// For 3 arguments we support 3 possible variants:
/// - s3(source, format, structure)
/// - s3(source, access_key_id, access_key_id)
/// - s3(source, access_key_id, secret_access_key)
/// - s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's a format name or not.
else if (args.size() == 3)
else if (count == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
Expand All @@ -120,11 +108,11 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
/// For 4 arguments we support 4 possible variants:
/// - s3(source, format, structure, compression_method),
/// - s3(source, access_key_id, access_key_id, format),
/// - s3(source, access_key_id, access_key_id, session_token)
/// - s3(source, access_key_id, secret_access_key, format),
/// - s3(source, access_key_id, secret_access_key, session_token)
/// - s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd and 4-th argument: check if it's a format name or not.
else if (args.size() == 4)
else if (count == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/access_key_id/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
Expand All @@ -150,12 +138,12 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
}
/// For 5 arguments we support 3 possible variants:
/// - s3(source, access_key_id, access_key_id, format, structure)
/// - s3(source, access_key_id, access_key_id, session_token, format)
/// - s3(source, access_key_id, secret_access_key, format, structure)
/// - s3(source, access_key_id, secret_access_key, session_token, format)
/// - s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or no,
/// and by the 4-th argument, check if it's a format name or not
else if (args.size() == 5)
else if (count == 5)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "NOSIGN/access_key_id");
if (boost::iequals(second_arg, "NOSIGN"))
Expand All @@ -177,10 +165,10 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
}
// For 6 arguments we support 2 possible variants:
/// - s3(source, access_key_id, access_key_id, format, structure, compression_method)
/// - s3(source, access_key_id, access_key_id, session_token, format, structure)
/// - s3(source, access_key_id, secret_access_key, format, structure, compression_method)
/// - s3(source, access_key_id, secret_access_key, session_token, format, structure)
/// We can distinguish them by looking at the 4-th argument: check if it's a format name or not
else if (args.size() == 6)
else if (count == 6)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(args[3], "format/session_token");
if (fourth_arg == "auto" || FormatFactory::instance().getAllFormats().contains(fourth_arg))
Expand All @@ -192,9 +180,9 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}};
}
}
else
else if (count == 7)
{
args_to_idx = size_to_args[args.size()];
args_to_idx = {{"access_key_id", 1}, {"secret_access_key", 2}, {"session_token", 3}, {"format", 4}, {"structure", 5}, {"compression_method", 6}};
}

/// This argument is always the first
Expand Down Expand Up @@ -262,32 +250,24 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
}
else
{
/// If arguments contain headers, just remove it and add to the end of arguments later
/// (header argument can be at any position).
HTTPHeaderEntries tmp_headers;
auto * headers_it = StorageURL::collectHeaders(args, tmp_headers, context);
ASTPtr headers_ast;
if (headers_it != args.end())
{
headers_ast = *headers_it;
args.erase(headers_it);
}
size_t count = StorageURL::evalArgsAndCollectHeaders(args, tmp_headers, context);

if (args.empty() || args.size() > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), args.size());
if (count == 0 || count > getMaxNumberOfArguments())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected 1 to {} arguments in table function, got {}", getMaxNumberOfArguments(), count);

auto structure_literal = std::make_shared<ASTLiteral>(structure);

/// s3(s3_url)
if (args.size() == 1)
if (count == 1)
{
/// Add format=auto before structure argument.
args.push_back(std::make_shared<ASTLiteral>("auto"));
args.push_back(structure_literal);
}
/// s3(s3_url, format) or s3(s3_url, NOSIGN)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN or not.
else if (args.size() == 2)
else if (count == 2)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
/// If there is NOSIGN, add format=auto before structure.
Expand All @@ -296,10 +276,10 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
args.push_back(structure_literal);
}
/// s3(source, format, structure) or
/// s3(source, access_key_id, access_key_id) or
/// s3(source, access_key_id, secret_access_key) or
/// s3(source, NOSIGN, format)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (args.size() == 3)
else if (count == 3)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
Expand All @@ -308,7 +288,7 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
{
args.back() = structure_literal;
args[count - 1] = structure_literal;
}
else
{
Expand All @@ -318,48 +298,45 @@ void TableFunctionS3::addColumnsStructureToArguments(ASTs & args, const String &
}
}
/// s3(source, format, structure, compression_method) or
/// s3(source, access_key_id, access_key_id, format) or
/// s3(source, access_key_id, secret_access_key, format) or
/// s3(source, NOSIGN, format, structure)
/// We can distinguish them by looking at the 2-nd argument: check if it's NOSIGN, format name or neither.
else if (args.size() == 4)
else if (count == 4)
{
auto second_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(second_arg, "NOSIGN"))
{
args.back() = structure_literal;
args[count - 1] = structure_literal;
}
else if (second_arg == "auto" || FormatFactory::instance().getAllFormats().contains(second_arg))
{
args[args.size() - 2] = structure_literal;
args[count - 2] = structure_literal;
}
else
{
args.push_back(structure_literal);
}
}
/// s3(source, access_key_id, access_key_id, format, structure) or
/// s3(source, access_key_id, secret_access_key, format, structure) or
/// s3(source, NOSIGN, format, structure, compression_method)
/// We can distinguish them by looking at the 2-nd argument: check if it's a NOSIGN keyword name or not.
else if (args.size() == 5)
else if (count == 5)
{
auto sedond_arg = checkAndGetLiteralArgument<String>(args[1], "format/NOSIGN");
if (boost::iequals(sedond_arg, "NOSIGN"))
{
args[args.size() - 2] = structure_literal;
args[count - 2] = structure_literal;
}
else
{
args.back() = structure_literal;
args[count - 1] = structure_literal;
}
}
/// s3(source, access_key_id, access_key_id, format, structure, compression)
else if (args.size() == 6)
/// s3(source, access_key_id, secret_access_key, format, structure, compression)
else if (count == 6)
{
args[args.size() - 2] = structure_literal;
args[count - 2] = structure_literal;
}

if (headers_ast)
args.push_back(headers_ast);
}
}

Expand Down
Loading

0 comments on commit fd36127

Please sign in to comment.