Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add SSL/TLS support for ClickHouse connections #6459

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions snuba/cli/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@
type=int,
help="Clickhouse native port to write to.",
)
@click.option(
"--clickhouse-secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--clickhouse-ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
@click.option(
"--clickhouse-verify",
type=bool,
default=False,
help="Verify ClickHouse SSL cert.",
)
@click.option(
"--dry-run",
type=bool,
Expand All @@ -36,6 +54,9 @@ def cleanup(
*,
clickhouse_host: Optional[str],
clickhouse_port: Optional[int],
clickhouse_secure: bool,
clickhouse_ca_certs: Optional[str],
clickhouse_verify: Optional[bool],
dry_run: bool,
storage_name: str,
log_level: Optional[str] = None,
Expand Down Expand Up @@ -67,6 +88,9 @@ def cleanup(
clickhouse_user,
clickhouse_password,
database,
clickhouse_secure,
clickhouse_ca_certs,
clickhouse_verify,
)
elif not cluster.is_single_node():
raise click.ClickException("Provide ClickHouse host and port for cleanup")
Expand Down
24 changes: 24 additions & 0 deletions snuba/cli/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,33 @@ def reverse_in_progress(
required=True,
default=os.environ.get("CLICKHOUSE_DATABASE", "default"),
)
@click.option(
"--secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
@click.option(
"--verify",
type=bool,
default=False,
help="Verify ClickHouse SSL cert.",
)
def add_node(
node_type: str,
storage_set_names: Sequence[str],
host_name: str,
port: int,
database: str,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
) -> None:
"""
Runs all migrations on a brand new ClickHouse node. This should be performed
Expand Down Expand Up @@ -364,6 +385,9 @@ def add_node(
user=user,
password=password,
database=database,
secure=secure,
ca_certs=ca_certs,
verify=verify,
)


Expand Down
24 changes: 24 additions & 0 deletions snuba/cli/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@
type=int,
help="Clickhouse native port to write to.",
)
@click.option(
"--clickhouse-secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--clickhouse-ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
@click.option(
"--clickhouse-verify",
type=bool,
default=False,
help="Verify ClickHouse SSL cert.",
)
@click.option(
"--storage",
"storage_name",
Expand All @@ -44,6 +62,9 @@ def optimize(
*,
clickhouse_host: Optional[str],
clickhouse_port: Optional[int],
clickhouse_secure: bool,
clickhouse_ca_certs: Optional[str],
clickhouse_verify: Optional[bool],
storage_name: str,
default_parallel_threads: int,
log_level: Optional[str] = None,
Expand Down Expand Up @@ -79,6 +100,9 @@ def optimize(
clickhouse_user,
clickhouse_password,
database,
clickhouse_secure,
clickhouse_ca_certs,
clickhouse_verify,
send_receive_timeout=ClickhouseClientSettings.OPTIMIZE.value.timeout,
)
elif not storage.get_cluster().is_single_node():
Expand Down
17 changes: 12 additions & 5 deletions snuba/clickhouse/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import rapidjson
import sentry_sdk
from urllib3.connectionpool import HTTPConnectionPool
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from urllib3.exceptions import HTTPError

from snuba import settings, state
Expand Down Expand Up @@ -161,7 +161,7 @@ class HTTPWriteBatch:
def __init__(
self,
executor: ThreadPoolExecutor,
pool: HTTPConnectionPool,
pool: HTTPConnectionPool | HTTPSConnectionPool,
metrics: MetricsBackend,
user: str,
password: str,
Expand Down Expand Up @@ -293,6 +293,9 @@ def __init__(
port: int,
user: str,
password: str,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
metrics: MetricsBackend,
statement: InsertStatement,
encoding: Optional[str],
Expand All @@ -302,9 +305,13 @@ def __init__(
max_connections: int = 1,
block_connections: bool = False,
):
self.__pool = HTTPConnectionPool(
host, port, maxsize=max_connections, block=block_connections
)
self.__pool: HTTPSConnectionPool | HTTPConnectionPool
if secure:
self.__pool = HTTPSConnectionPool(
host, port, ca_certs=ca_certs, verify=verify
)
else:
self.__pool = HTTPConnectionPool(host, port)
self.__executor = ThreadPoolExecutor()
self.__metrics = metrics

Expand Down
9 changes: 9 additions & 0 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def __init__(
user: str,
password: str,
database: str,
secure: bool = False,
ca_certs: Optional[str] = None,
verify: Optional[bool] = False,
connect_timeout: int = 1,
send_receive_timeout: Optional[int] = 300,
max_pool_size: int = settings.CLICKHOUSE_MAX_POOL_SIZE,
Expand All @@ -91,6 +94,9 @@ def __init__(
self.user = user
self.password = password
self.database = database
self.secure = secure
self.ca_certs = ca_certs
self.verify = verify
self.connect_timeout = connect_timeout
self.send_receive_timeout = send_receive_timeout
self.client_settings = client_settings
Expand Down Expand Up @@ -380,6 +386,9 @@ def _create_conn(self, use_fallback_host: bool = False) -> Client:
user=self.user,
password=self.password,
database=self.database,
secure=self.secure,
ca_certs=self.ca_certs,
verify=self.verify,
connect_timeout=self.connect_timeout,
send_receive_timeout=self.send_receive_timeout,
settings=self.client_settings,
Expand Down
46 changes: 44 additions & 2 deletions snuba/clusters/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,16 @@ def get_batch_writer(
ClickhouseWriterOptions = Optional[Mapping[str, Any]]


CacheKey = Tuple[ClickhouseNode, ClickhouseClientSettings, str, str, str]
CacheKey = Tuple[
ClickhouseNode,
ClickhouseClientSettings,
str,
str,
str,
bool,
Optional[str],
Optional[bool],
]


class ConnectionCache:
Expand All @@ -177,10 +186,22 @@ def get_node_connection(
user: str,
password: str,
database: str,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
) -> ClickhousePool:
with self.__lock:
settings, timeout = client_settings.value
cache_key = (node, client_settings, user, password, database)
cache_key = (
node,
client_settings,
user,
password,
database,
secure,
ca_certs,
verify,
)
if cache_key not in self.__cache:
self.__cache[cache_key] = ClickhousePool(
node.host_name,
Expand All @@ -190,6 +211,9 @@ def get_node_connection(
database,
client_settings=settings,
send_receive_timeout=timeout,
secure=secure,
ca_certs=ca_certs,
verify=verify,
)

return self.__cache[cache_key]
Expand Down Expand Up @@ -226,6 +250,9 @@ def __init__(
password: str,
database: str,
http_port: int,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
storage_sets: Set[str],
single_node: bool,
# The cluster name and distributed cluster name only apply if single_node is set to False
Expand All @@ -246,6 +273,9 @@ def __init__(
self.__password = password
self.__database = database
self.__http_port = http_port
self.__secure = secure
self.__ca_certs = ca_certs
self.__verify = verify
self.__single_node = single_node
self.__cluster_name = cluster_name
self.__distributed_cluster_name = distributed_cluster_name
Expand Down Expand Up @@ -290,6 +320,9 @@ def get_node_connection(
self.__user,
self.__password,
self.__database,
self.__secure,
self.__ca_certs,
self.__verify,
)

def get_deleter(self) -> Reader:
Expand Down Expand Up @@ -331,6 +364,9 @@ def get_batch_writer(
block_connections=self.__block_connections,
user=self.__user,
password=self.__password,
secure=self.__secure,
ca_certs=self.__ca_certs,
verify=self.__verify,
metrics=metrics,
statement=insert_statement.with_database(self.__database),
encoding=encoding,
Expand Down Expand Up @@ -413,6 +449,9 @@ def get_http_port(self) -> int:
password=cluster.get("password", ""),
database=cluster.get("database", "default"),
http_port=cluster["http_port"],
secure=cluster["secure"],
ca_certs=cluster["ca_certs"],
verify=cluster["verify"],
storage_sets=cluster["storage_sets"],
single_node=cluster["single_node"],
cluster_name=cluster["cluster_name"] if "cluster_name" in cluster else None,
Expand Down Expand Up @@ -459,6 +498,9 @@ def _build_sliced_cluster(cluster: Mapping[str, Any]) -> ClickhouseCluster:
password=cluster.get("password", ""),
database=cluster.get("database", "default"),
http_port=cluster["http_port"],
secure=cluster["secure"],
ca_certs=cluster["ca_certs"],
verify=cluster["verify"],
storage_sets={
storage_tuple[0] for storage_tuple in cluster["storage_set_slices"]
},
Expand Down
6 changes: 6 additions & 0 deletions snuba/migrations/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ def add_node(
user: str,
password: str,
database: str,
secure: bool = False,
ca_certs: Optional[str] = None,
verify: Optional[bool] = False,
) -> None:
client_settings = ClickhouseClientSettings.MIGRATE.value
clickhouse = ClickhousePool(
Expand All @@ -580,6 +583,9 @@ def add_node(
user,
password,
database,
secure,
ca_certs,
verify,
client_settings=client_settings.settings,
send_receive_timeout=client_settings.timeout,
)
Expand Down
3 changes: 3 additions & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@
"password": os.environ.get("CLICKHOUSE_PASSWORD", ""),
"database": os.environ.get("CLICKHOUSE_DATABASE", "default"),
"http_port": int(os.environ.get("CLICKHOUSE_HTTP_PORT", 8123)),
"secure": os.environ.get("CLICKHOUSE_SECURE", "0") == "1",
"ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"),
"verify": os.environ.get("CLICKHOUSE_VERIFY"),
"storage_sets": {
"cdc",
"discover",
Expand Down
3 changes: 3 additions & 0 deletions snuba/settings/settings_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
"password": os.environ.get("CLICKHOUSE_PASSWORD", ""),
"database": os.environ.get("CLICKHOUSE_DATABASE", "default"),
"http_port": int(os.environ.get("CLICKHOUSE_HTTP_PORT", 8123)),
"secure": os.environ.get("CLICKHOUSE_SECURE", "0") == "1",
"ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"),
"verify": os.environ.get("CLICKHOUSE_VERIFY"),
"storage_sets": {
"cdc",
"discover",
Expand Down
3 changes: 3 additions & 0 deletions test_distributed_migrations/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def pytest_configure() -> None:
distributed_cluster_name=cluster_node["distributed_cluster_name"]
if "distributed_cluster_name" in cluster_node
else None,
secure=cluster_node.get("secure", False),
ca_certs=cluster_node.get("ca_certs", None),
verify=cluster_node.get("verify", False),
)

database_name = cluster_node["database"]
Expand Down
6 changes: 6 additions & 0 deletions tests/clusters/fake_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def __init__(
password: str,
database: str,
http_port: int,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
storage_sets: Set[str],
single_node: bool,
# The cluster name and distributed cluster name only apply if single_node is set to False
Expand All @@ -83,6 +86,9 @@ def __init__(
password=password,
database=database,
http_port=http_port,
secure=secure,
ca_certs=ca_certs,
verify=verify,
storage_sets=storage_sets,
single_node=single_node,
cluster_name=cluster_name,
Expand Down
Loading
Loading