Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat postgres: add order by to postgresql cache policy #580

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions postgresql/functional_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,6 @@ add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}-connlimit-max)

add_subdirectory(metrics)
add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}-metrics)

add_subdirectory(cache_order_by)
add_dependencies(${PROJECT_NAME} ${PROJECT_NAME}-cache-order-by)
6 changes: 6 additions & 0 deletions postgresql/functional_tests/cache_order_by/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
project(userver-postgresql-tests-cache-order-by CXX)

add_executable(${PROJECT_NAME} "service.cpp")
target_link_libraries(${PROJECT_NAME} userver-postgresql)

userver_chaos_testsuite_add()
18 changes: 18 additions & 0 deletions postgresql/functional_tests/cache_order_by/config_vars.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# yaml
server-name: test-core-metrics 1.0
service-name: test_core_metrics
logger-level: info

config-server-url: http://localhost:8083/
server-port: 8185
monitor-server-port: 8186

testsuite-enabled: false

userver-dumps-root: /var/cache/test_core_metrics/userver-dumps/
access-log-path: /var/log/test_core_metrics/access.log
access-tskv-log-path: /var/log/test_core_metrics/access_tskv.log
default-log-path: /var/log/test_core_metrics/server.log
secdist-path: /etc/test_core_metrics/secure_data.json

config-cache: /var/cache/test_core_metrics/config_cache.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE IF NOT EXISTS key_value_table (
key VARCHAR,
value VARCHAR,
updated TIMESTAMPTZ NOT NULL DEFAULT NOW()
)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
108 changes: 108 additions & 0 deletions postgresql/functional_tests/cache_order_by/service.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#include <userver/clients/dns/component.hpp>
#include <userver/testsuite/testsuite_support.hpp>

#include <userver/utest/using_namespace_userver.hpp>

#include <userver/clients/http/component.hpp>
#include <userver/components/minimal_server_component_list.hpp>
#include <userver/server/handlers/http_handler_base.hpp>
#include <userver/server/handlers/server_monitor.hpp>
#include <userver/server/handlers/tests_control.hpp>
#include <userver/testsuite/testpoint.hpp>
#include <userver/utils/daemon_run.hpp>

#include <userver/storages/postgres/cluster.hpp>
#include <userver/storages/postgres/component.hpp>
#include <userver/storages/postgres/dist_lock_component_base.hpp>

#include <userver/cache/base_postgres_cache.hpp>

namespace pg::cache_order_by {

struct KeyValue {
std::string key;
std::string value;
};

struct AscCachePolicy {
static constexpr std::string_view kName = "asc-pg-cache";

using ValueType = KeyValue;
static constexpr auto kKeyMember = &KeyValue::key;
static constexpr const char* kQuery =
"SELECT key, value FROM key_value_table";
static constexpr const char* kUpdatedField = "updated";
using UpdatedFieldType = storages::postgres::TimePointTz;
static constexpr const char* kOrderBy = "updated ASC";
};

using AscCache = components::PostgreCache<AscCachePolicy>;

struct DescCachePolicy {
static constexpr std::string_view kName = "desc-pg-cache";

using ValueType = KeyValue;
static constexpr auto kKeyMember = &KeyValue::key;
static constexpr const char* kQuery =
"SELECT key, value FROM key_value_table";
static constexpr const char* kUpdatedField = "updated";
using UpdatedFieldType = storages::postgres::TimePointTz;
static constexpr const char* kOrderBy = "updated DESC";
};

using DescCache = components::PostgreCache<DescCachePolicy>;

class CacheHandler final : public server::handlers::HttpHandlerJsonBase {
public:
static constexpr std::string_view kName = "handler-cache-order-by-postgres";

CacheHandler(const components::ComponentConfig& config,
const components::ComponentContext& context);

formats::json::Value HandleRequestJsonThrow(
const server::http::HttpRequest& request, const formats::json::Value&,
server::request::RequestContext&) const override;

private:
const AscCache& asc_cache_;
const DescCache& desc_cache_;
};

CacheHandler::CacheHandler(const components::ComponentConfig& config,
const components::ComponentContext& context)
: HttpHandlerJsonBase(config, context),
asc_cache_{context.FindComponent<AscCache>()},
desc_cache_{context.FindComponent<DescCache>()} {}

formats::json::Value CacheHandler::HandleRequestJsonThrow(
const server::http::HttpRequest& request, const formats::json::Value&,
server::request::RequestContext&) const {
const auto& key = request.GetArg("key");
const auto& order = request.GetArg("order");
if (order == "asc") {
const auto cache_data = asc_cache_.Get();
return formats::json::MakeObject("result", cache_data->at(key).value);
}
if (order == "desc") {
const auto cache_data = desc_cache_.Get();
return formats::json::MakeObject("result", cache_data->at(key).value);
}
return {};
}

} // namespace pg::cache_order_by

int main(int argc, char* argv[]) {
const auto component_list =
components::MinimalServerComponentList()
.Append<server::handlers::ServerMonitor>()
.Append<pg::cache_order_by::CacheHandler>()
.Append<pg::cache_order_by::AscCache>()
.Append<pg::cache_order_by::DescCache>()
.Append<components::HttpClient>()
.Append<components::Postgres>("key-value-database")
.Append<components::TestsuiteSupport>()
.Append<server::handlers::TestsControl>()
.Append<clients::dns::Component>();
return utils::DaemonMain(argc, argv, component_list);
}
74 changes: 74 additions & 0 deletions postgresql/functional_tests/cache_order_by/static_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# yaml
components_manager:
components:
handler-cache-order-by-postgres:
path: /cache/order-by
task_processor: main-task-processor
method: GET

key-value-database:
dbconnection: 'postgresql://testsuite@localhost:15433/pg_key_value'
blocking_task_processor: fs-task-processor
dns_resolver: async

asc-pg-cache:
pgcomponent: key-value-database
update-interval: 10s
desc-pg-cache:
pgcomponent: key-value-database
update-interval: 10s

testsuite-support:

http-client:
fs-task-processor: main-task-processor

tests-control:
method: POST
path: /tests/{action}
skip-unregistered-testpoints: true
task_processor: main-task-processor
testpoint-timeout: 10s
testpoint-url: $mockserver/testpoint
throttling_enabled: false

server:
listener:
port: 8187
task_processor: main-task-processor
listener-monitor:
port: $monitor-server-port
port#fallback: 8086
connection:
in_buffer_size: 32768
requests_queue_size_threshold: 100
task_processor: main-task-processor
logging:
fs-task-processor: fs-task-processor
loggers:
default:
file_path: '@stderr'
level: debug
overflow_behavior: discard

handler-server-monitor:
path: /service/monitor
method: GET
task_processor: main-task-processor

dynamic-config:
defaults:
POSTGRES_STATEMENT_METRICS_SETTINGS:
key-value-database:
max_statement_metrics: 5

dns-client:
fs-task-processor: fs-task-processor

task_processors:
main-task-processor:
worker_threads: 4
fs-task-processor:
worker_threads: 4

default_task_processor: main-task-processor
14 changes: 14 additions & 0 deletions postgresql/functional_tests/cache_order_by/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import pytest

from testsuite.databases.pgsql import discover


pytest_plugins = ['pytest_userver.plugins.postgresql']


@pytest.fixture(scope='session')
def pgsql_local(service_source_dir, pgsql_local_create):
databases = discover.find_schemas(
'pg', [service_source_dir.joinpath('schemas/postgresql')],
)
return pgsql_local_create(list(databases.values()))
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import pytest

@pytest.mark.parametrize(
'order, expected',
(
pytest.param(
'asc',
'one3',
id='asc',
),
pytest.param(
'desc',
'one1',
id='desc',
),
)
)
async def test_cache_order_by(service_client, pgsql, order, expected):
cursor = pgsql['key_value'].cursor()
cursor.execute('''
INSERT INTO key_value_table (
key,
value,
updated
) VALUES (
'one',
'one1',
'2024-01-01'
), (
'one',
'one2',
'2024-01-02'
), (
'one',
'one3',
'2024-01-03'
), (
'two',
'two1',
'2024-01-04'
)
''')
response = await service_client.get('/cache/order-by', params={
'key': 'one',
'order': order,
})
assert response.status == 200
assert response.json() == {
"result": expected,
}
63 changes: 46 additions & 17 deletions postgresql/include/userver/cache/base_postgres_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ using HasWhere = decltype(T::kWhere);
template <typename T>
inline constexpr bool kHasWhere = meta::kIsDetected<HasWhere, T>;

// Component kOrderBy in policy
template <typename T>
using HasOrderBy = decltype(T::kOrderBy);
template <typename T>
inline constexpr bool kHasOrderBy = meta::kIsDetected<HasOrderBy, T>;

// Update field
template <typename T>
using HasUpdatedField = decltype(T::kUpdatedField);
Expand Down Expand Up @@ -422,6 +428,9 @@ class PostgreCache final

static storages::postgres::Query GetAllQuery();
static storages::postgres::Query GetDeltaQuery();
static std::string GetWhereClause();
static std::string GetDeltaWhereClause();
static std::string GetOrderByClause();

std::chrono::milliseconds ParseCorrection(const ComponentConfig& config);

Expand Down Expand Up @@ -496,32 +505,52 @@ PostgreCache<PostgreCachePolicy>::~PostgreCache() {
}

template <typename PostgreCachePolicy>
storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
storages::postgres::Query query = PolicyCheckerType::GetQuery();
std::string PostgreCache<PostgreCachePolicy>::GetWhereClause() {
if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
return fmt::format(FMT_COMPILE("where {}"), PostgreCachePolicy::kWhere);
} else {
return "";
}
}

template <typename PostgreCachePolicy>
std::string PostgreCache<PostgreCachePolicy>::GetDeltaWhereClause() {
if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
return {fmt::format("{} where {}", query.Statement(),
PostgreCachePolicy::kWhere),
query.GetName()};
return fmt::format(FMT_COMPILE("where ({}) and {} >= $1"),
PostgreCachePolicy::kWhere,
PostgreCachePolicy::kUpdatedField);
} else {
return fmt::format(FMT_COMPILE("where {} >= $1"),
PostgreCachePolicy::kUpdatedField);
}
}

template <typename PostgreCachePolicy>
std::string PostgreCache<PostgreCachePolicy>::GetOrderByClause() {
if constexpr (pg_cache::detail::kHasOrderBy<PostgreCachePolicy>) {
return fmt::format(FMT_COMPILE("order by {}"),
PostgreCachePolicy::kOrderBy);
} else {
return query;
return "";
}
}

template <typename PostgreCachePolicy>
storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetAllQuery() {
storages::postgres::Query query = PolicyCheckerType::GetQuery();
return fmt::format("{} {} {}", query.Statement(), GetWhereClause(),
GetOrderByClause());
}

template <typename PostgreCachePolicy>
storages::postgres::Query PostgreCache<PostgreCachePolicy>::GetDeltaQuery() {
if constexpr (kIncrementalUpdates) {
storages::postgres::Query query = PolicyCheckerType::GetQuery();

if constexpr (pg_cache::detail::kHasWhere<PostgreCachePolicy>) {
return {
fmt::format("{} where ({}) and {} >= $1", query.Statement(),
PostgreCachePolicy::kWhere, PolicyType::kUpdatedField),
query.GetName()};
} else {
return {fmt::format("{} where {} >= $1", query.Statement(),
PolicyType::kUpdatedField),
query.GetName()};
}
return storages::postgres::Query{
fmt::format("{} {} {}", query.Statement(), GetDeltaWhereClause(),
GetOrderByClause()),
query.GetName(),
};
} else {
return GetAllQuery();
}
Expand Down
5 changes: 5 additions & 0 deletions postgresql/src/cache/postgres_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ struct PostgresExamplePolicy {
//
// Required: no
static constexpr bool kMayReturnNull = false;

// Order by clause of the query.
//
// Required: no
static constexpr const char* kOrderBy = "updated asc";
};

} // namespace example
Expand Down
Loading