Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

secure clickhouse connections 2 #6575

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
24 changes: 24 additions & 0 deletions snuba/cli/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,24 @@
type=int,
help="Clickhouse native port to write to.",
)
@click.option(
"--clickhouse-secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--clickhouse-ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
@click.option(
"--clickhouse-verify",
type=bool,
default=False,
help="Verify ClickHouse SSL cert.",
)
@click.option(
"--dry-run",
type=bool,
Expand All @@ -36,6 +54,9 @@ def cleanup(
*,
clickhouse_host: Optional[str],
clickhouse_port: Optional[int],
clickhouse_secure: bool,
clickhouse_ca_certs: Optional[str],
clickhouse_verify: Optional[bool],
dry_run: bool,
storage_name: str,
log_level: Optional[str] = None,
Expand Down Expand Up @@ -67,6 +88,9 @@ def cleanup(
clickhouse_user,
clickhouse_password,
database,
clickhouse_secure,
clickhouse_ca_certs,
clickhouse_verify,
)
elif not cluster.is_single_node():
raise click.ClickException("Provide ClickHouse host and port for cleanup")
Expand Down
24 changes: 24 additions & 0 deletions snuba/cli/migrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,12 +318,33 @@ def reverse_in_progress(
required=True,
default=os.environ.get("CLICKHOUSE_DATABASE", "default"),
)
@click.option(
"--secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
@click.option(
"--verify",
type=bool,
default=False,
help="Verify ClickHouse SSL cert.",
)
def add_node(
node_type: str,
storage_set_names: Sequence[str],
host_name: str,
port: int,
database: str,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
) -> None:
"""
Runs all migrations on a brand new ClickHouse node. This should be performed
Expand Down Expand Up @@ -364,6 +385,9 @@ def add_node(
user=user,
password=password,
database=database,
secure=secure,
ca_certs=ca_certs,
verify=verify,
)


Expand Down
24 changes: 24 additions & 0 deletions snuba/cli/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,24 @@
type=int,
help="Clickhouse native port to write to.",
)
@click.option(
"--clickhouse-secure",
type=bool,
default=False,
help="If true, an encrypted connection will be used",
)
@click.option(
"--clickhouse-ca-certs",
type=str,
default=None,
help="An optional path to certificates directory.",
)
@click.option(
"--clickhouse-verify",
type=bool,
default=False,
help="Verify ClickHouse SSL cert.",
)
@click.option(
"--storage",
"storage_name",
Expand All @@ -44,6 +62,9 @@ def optimize(
*,
clickhouse_host: Optional[str],
clickhouse_port: Optional[int],
clickhouse_secure: bool,
clickhouse_ca_certs: Optional[str],
clickhouse_verify: Optional[bool],
storage_name: str,
default_parallel_threads: int,
log_level: Optional[str] = None,
Expand Down Expand Up @@ -79,6 +100,9 @@ def optimize(
clickhouse_user,
clickhouse_password,
database,
clickhouse_secure,
clickhouse_ca_certs,
clickhouse_verify,
send_receive_timeout=ClickhouseClientSettings.OPTIMIZE.value.timeout,
)
elif not storage.get_cluster().is_single_node():
Expand Down
17 changes: 12 additions & 5 deletions snuba/clickhouse/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import rapidjson
import sentry_sdk
from urllib3.connectionpool import HTTPConnectionPool
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from urllib3.exceptions import HTTPError

from snuba import settings, state
Expand Down Expand Up @@ -161,7 +161,7 @@ class HTTPWriteBatch:
def __init__(
self,
executor: ThreadPoolExecutor,
pool: HTTPConnectionPool,
pool: HTTPConnectionPool | HTTPSConnectionPool,
metrics: MetricsBackend,
user: str,
password: str,
Expand Down Expand Up @@ -293,6 +293,9 @@ def __init__(
port: int,
user: str,
password: str,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
metrics: MetricsBackend,
statement: InsertStatement,
encoding: Optional[str],
Expand All @@ -302,9 +305,13 @@ def __init__(
max_connections: int = 1,
block_connections: bool = False,
):
self.__pool = HTTPConnectionPool(
host, port, maxsize=max_connections, block=block_connections
)
self.__pool: HTTPSConnectionPool | HTTPConnectionPool
if secure:
self.__pool = HTTPSConnectionPool(
host, port, ca_certs=ca_certs, verify=verify
)
else:
self.__pool = HTTPConnectionPool(host, port)
self.__executor = ThreadPoolExecutor()
self.__metrics = metrics

Expand Down
9 changes: 9 additions & 0 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ def __init__(
user: str,
password: str,
database: str,
secure: bool = False,
ca_certs: Optional[str] = None,
verify: Optional[bool] = False,
connect_timeout: int = 1,
send_receive_timeout: Optional[int] = 300,
max_pool_size: int = settings.CLICKHOUSE_MAX_POOL_SIZE,
Expand All @@ -91,6 +94,9 @@ def __init__(
self.user = user
self.password = password
self.database = database
self.secure = secure
self.ca_certs = ca_certs
self.verify = verify
self.connect_timeout = connect_timeout
self.send_receive_timeout = send_receive_timeout
self.client_settings = client_settings
Expand Down Expand Up @@ -380,6 +386,9 @@ def _create_conn(self, use_fallback_host: bool = False) -> Client:
user=self.user,
password=self.password,
database=self.database,
secure=self.secure,
ca_certs=self.ca_certs,
verify=self.verify,
connect_timeout=self.connect_timeout,
send_receive_timeout=self.send_receive_timeout,
settings=self.client_settings,
Expand Down
46 changes: 44 additions & 2 deletions snuba/clusters/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,16 @@ def get_batch_writer(
ClickhouseWriterOptions = Optional[Mapping[str, Any]]


CacheKey = Tuple[ClickhouseNode, ClickhouseClientSettings, str, str, str]
CacheKey = Tuple[
ClickhouseNode,
ClickhouseClientSettings,
str,
str,
str,
bool,
Optional[str],
Optional[bool],
]


class ConnectionCache:
Expand All @@ -177,10 +186,22 @@ def get_node_connection(
user: str,
password: str,
database: str,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
) -> ClickhousePool:
with self.__lock:
settings, timeout = client_settings.value
cache_key = (node, client_settings, user, password, database)
cache_key = (
node,
client_settings,
user,
password,
database,
secure,
ca_certs,
verify,
)
if cache_key not in self.__cache:
self.__cache[cache_key] = ClickhousePool(
node.host_name,
Expand All @@ -190,6 +211,9 @@ def get_node_connection(
database,
client_settings=settings,
send_receive_timeout=timeout,
secure=secure,
ca_certs=ca_certs,
verify=verify,
)

return self.__cache[cache_key]
Expand Down Expand Up @@ -226,6 +250,9 @@ def __init__(
password: str,
database: str,
http_port: int,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
storage_sets: Set[str],
single_node: bool,
# The cluster name and distributed cluster name only apply if single_node is set to False
Expand All @@ -246,6 +273,9 @@ def __init__(
self.__password = password
self.__database = database
self.__http_port = http_port
self.__secure = secure
self.__ca_certs = ca_certs
self.__verify = verify
self.__single_node = single_node
self.__cluster_name = cluster_name
self.__distributed_cluster_name = distributed_cluster_name
Expand Down Expand Up @@ -290,6 +320,9 @@ def get_node_connection(
self.__user,
self.__password,
self.__database,
self.__secure,
self.__ca_certs,
self.__verify,
)

def get_deleter(self) -> Reader:
Expand Down Expand Up @@ -331,6 +364,9 @@ def get_batch_writer(
block_connections=self.__block_connections,
user=self.__user,
password=self.__password,
secure=self.__secure,
ca_certs=self.__ca_certs,
verify=self.__verify,
metrics=metrics,
statement=insert_statement.with_database(self.__database),
encoding=encoding,
Expand Down Expand Up @@ -413,6 +449,9 @@ def get_http_port(self) -> int:
password=cluster.get("password", ""),
database=cluster.get("database", "default"),
http_port=cluster["http_port"],
secure=cluster["secure"],
ca_certs=cluster["ca_certs"],
verify=cluster["verify"],
storage_sets=cluster["storage_sets"],
single_node=cluster["single_node"],
cluster_name=cluster["cluster_name"] if "cluster_name" in cluster else None,
Expand Down Expand Up @@ -459,6 +498,9 @@ def _build_sliced_cluster(cluster: Mapping[str, Any]) -> ClickhouseCluster:
password=cluster.get("password", ""),
database=cluster.get("database", "default"),
http_port=cluster["http_port"],
secure=cluster["secure"],
ca_certs=cluster["ca_certs"],
verify=cluster["verify"],
storage_sets={
storage_tuple[0] for storage_tuple in cluster["storage_set_slices"]
},
Expand Down
6 changes: 6 additions & 0 deletions snuba/migrations/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,9 @@ def add_node(
user: str,
password: str,
database: str,
secure: bool = False,
ca_certs: Optional[str] = None,
verify: Optional[bool] = False,
) -> None:
client_settings = ClickhouseClientSettings.MIGRATE.value
clickhouse = ClickhousePool(
Expand All @@ -580,6 +583,9 @@ def add_node(
user,
password,
database,
secure,
ca_certs,
verify,
client_settings=client_settings.settings,
send_receive_timeout=client_settings.timeout,
)
Expand Down
3 changes: 3 additions & 0 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@
"password": os.environ.get("CLICKHOUSE_PASSWORD", ""),
"database": os.environ.get("CLICKHOUSE_DATABASE", "default"),
"http_port": int(os.environ.get("CLICKHOUSE_HTTP_PORT", 8123)),
"secure": os.environ.get("CLICKHOUSE_SECURE", "0") == "1",
"ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"),
"verify": os.environ.get("CLICKHOUSE_VERIFY"),
"storage_sets": {
"cdc",
"discover",
Expand Down
3 changes: 3 additions & 0 deletions snuba/settings/settings_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
"password": os.environ.get("CLICKHOUSE_PASSWORD", ""),
"database": os.environ.get("CLICKHOUSE_DATABASE", "default"),
"http_port": int(os.environ.get("CLICKHOUSE_HTTP_PORT", 8123)),
"secure": os.environ.get("CLICKHOUSE_SECURE", "0") == "1",
"ca_certs": os.environ.get("CLICKHOUSE_CA_CERTS"),
"verify": os.environ.get("CLICKHOUSE_VERIFY"),
"storage_sets": {
"cdc",
"discover",
Expand Down
6 changes: 6 additions & 0 deletions tests/clusters/fake_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ def __init__(
password: str,
database: str,
http_port: int,
secure: bool,
ca_certs: Optional[str],
verify: Optional[bool],
storage_sets: Set[str],
single_node: bool,
# The cluster name and distributed cluster name only apply if single_node is set to False
Expand All @@ -83,6 +86,9 @@ def __init__(
password=password,
database=database,
http_port=http_port,
secure=secure,
ca_certs=ca_certs,
verify=verify,
storage_sets=storage_sets,
single_node=single_node,
cluster_name=cluster_name,
Expand Down
Loading
Loading