Skip to content

Commit

Permalink
Add executor_threads parameter to async client (#408)
Browse files Browse the repository at this point in the history
* Add executor_threads parameter to async client

* Bump version
  • Loading branch information
genzgd authored Oct 7, 2024
1 parent 85f5b09 commit ffd01f3
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 23 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ release (0.9.0), unrecognized arguments/keywords for these methods of creating a
instead of being passed as ClickHouse server settings. This is in conjunction with some refactoring in Client construction.
The supported method of passing ClickHouse server settings is to prefix such arguments/query parameters with`ch_`.

## 0.8.3, 2024-10-07
### Improvement
- Add an optional `executor_threads` argument to the `get_async_client` method. This controls the number of concurrent
threads that each AsyncClient has available for queries. Defaults to "number of CPU cores plus four". Closes
https://github.com/ClickHouse/clickhouse-connect/issues/407

## 0.8.2, 2024-10-04
### Bug Fix
- Ensure lz4 compression does not exit on an empty block. May fix https://github.com/ClickHouse/clickhouse-connect/issues/403.
Expand Down
2 changes: 1 addition & 1 deletion clickhouse_connect/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = '0.8.2'
version = '0.8.3'
5 changes: 4 additions & 1 deletion clickhouse_connect/driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ async def create_async_client(*,
dsn: Optional[str] = None,
settings: Optional[Dict[str, Any]] = None,
generic_args: Optional[Dict[str, Any]] = None,
executor_threads: Optional[int] = None,
**kwargs) -> AsyncClient:
"""
The preferred method to get an async ClickHouse Connect Client instance.
Expand All @@ -154,6 +155,8 @@ async def create_async_client(*,
:param settings: ClickHouse server settings to be used with the session/every request
:param generic_args: Used internally to parse DBAPI connection strings into keyword arguments and ClickHouse settings.
It is not recommended to use this parameter externally
:param: executor_threads 'max_worker' threads used by the client ThreadPoolExecutor. If not set, the default
of 4 + detected CPU cores will be used
:param kwargs -- Recognized keyword arguments (used by the HTTP client), see below
:param compress: Enable compression for ClickHouse HTTP inserts and query results. True will select the preferred
Expand Down Expand Up @@ -194,4 +197,4 @@ def _create_client():

loop = asyncio.get_running_loop()
_client = await loop.run_in_executor(None, _create_client)
return AsyncClient(client=_client)
return AsyncClient(client=_client, executor_threads=executor_threads)
47 changes: 26 additions & 21 deletions clickhouse_connect/driver/asyncclient.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import io
import os
from concurrent.futures.thread import ThreadPoolExecutor
from datetime import tzinfo
from typing import Optional, Union, Dict, Any, Sequence, Iterable, Generator, BinaryIO

Expand All @@ -20,10 +22,13 @@ class AsyncClient:
Internally, each of the methods that uses IO is wrapped in a call to EventLoop.run_in_executor.
"""

def __init__(self, *, client: Client):
def __init__(self, *, client: Client, executor_threads: int = 0):
if isinstance(client, HttpClient):
client.headers['User-Agent'] = client.headers['User-Agent'].replace('mode:sync;', 'mode:async;')
self.client = client
if executor_threads == 0:
executor_threads = min(32, (os.cpu_count() or 1) + 4) # Mimic the default behavior
self.executor = ThreadPoolExecutor(max_workers=executor_threads)


def set_client_setting(self, key, value):
Expand Down Expand Up @@ -88,7 +93,7 @@ def _query():
external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query)
result = await loop.run_in_executor(self.executor, _query)
return result

async def query_column_block_stream(self,
Expand Down Expand Up @@ -117,7 +122,7 @@ def _query_column_block_stream():
external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_column_block_stream)
result = await loop.run_in_executor(self.executor, _query_column_block_stream)
return result

async def query_row_block_stream(self,
Expand Down Expand Up @@ -146,7 +151,7 @@ def _query_row_block_stream():
external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_row_block_stream)
result = await loop.run_in_executor(self.executor, _query_row_block_stream)
return result

async def query_rows_stream(self,
Expand Down Expand Up @@ -175,7 +180,7 @@ def _query_rows_stream():
external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_rows_stream)
result = await loop.run_in_executor(self.executor, _query_rows_stream)
return result

async def raw_query(self,
Expand All @@ -202,7 +207,7 @@ def _raw_query():
use_database=use_database, external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _raw_query)
result = await loop.run_in_executor(self.executor, _raw_query)
return result

async def raw_stream(self, query: str,
Expand All @@ -228,7 +233,7 @@ def _raw_stream():
use_database=use_database, external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _raw_stream)
result = await loop.run_in_executor(self.executor, _raw_stream)
return result

async def query_np(self,
Expand All @@ -255,7 +260,7 @@ def _query_np():
external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_np)
result = await loop.run_in_executor(self.executor, _query_np)
return result

async def query_np_stream(self,
Expand All @@ -282,7 +287,7 @@ def _query_np_stream():
context=context, external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_np_stream)
result = await loop.run_in_executor(self.executor, _query_np_stream)
return result

async def query_df(self,
Expand Down Expand Up @@ -314,7 +319,7 @@ def _query_df():
external_data=external_data, use_extended_dtypes=use_extended_dtypes)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_df)
result = await loop.run_in_executor(self.executor, _query_df)
return result

async def query_df_stream(self,
Expand Down Expand Up @@ -347,7 +352,7 @@ def _query_df_stream():
external_data=external_data, use_extended_dtypes=use_extended_dtypes)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_df_stream)
result = await loop.run_in_executor(self.executor, _query_df_stream)
return result

def create_query_context(self,
Expand Down Expand Up @@ -433,7 +438,7 @@ def _query_arrow():
use_strings=use_strings, external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_arrow)
result = await loop.run_in_executor(self.executor, _query_arrow)
return result

async def query_arrow_stream(self,
Expand All @@ -457,7 +462,7 @@ def _query_arrow_stream():
use_strings=use_strings, external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _query_arrow_stream)
result = await loop.run_in_executor(self.executor, _query_arrow_stream)
return result

async def command(self,
Expand Down Expand Up @@ -486,7 +491,7 @@ def _command():
use_database=use_database, external_data=external_data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _command)
result = await loop.run_in_executor(self.executor, _command)
return result

async def ping(self) -> bool:
Expand All @@ -499,7 +504,7 @@ def _ping():
return self.client.ping()

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _ping)
result = await loop.run_in_executor(self.executor, _ping)
return result

async def insert(self,
Expand Down Expand Up @@ -537,7 +542,7 @@ def _insert():
column_oriented=column_oriented, settings=settings, context=context)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _insert)
result = await loop.run_in_executor(self.executor, _insert)
return result

async def insert_df(self, table: str = None,
Expand Down Expand Up @@ -572,7 +577,7 @@ def _insert_df():
context=context)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _insert_df)
result = await loop.run_in_executor(self.executor, _insert_df)
return result

async def insert_arrow(self, table: str,
Expand All @@ -591,7 +596,7 @@ def _insert_arrow():
return self.client.insert_arrow(table=table, arrow_table=arrow_table, database=database, settings=settings)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _insert_arrow)
result = await loop.run_in_executor(self.executor, _insert_arrow)
return result

async def create_insert_context(self,
Expand Down Expand Up @@ -625,7 +630,7 @@ def _create_insert_context():
column_oriented=column_oriented, settings=settings, data=data)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _create_insert_context)
result = await loop.run_in_executor(self.executor, _create_insert_context)
return result

async def data_insert(self, context: InsertContext) -> QuerySummary:
Expand All @@ -639,7 +644,7 @@ def _data_insert():
return self.client.data_insert(context=context)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _data_insert)
result = await loop.run_in_executor(self.executor, _data_insert)
return result

async def raw_insert(self, table: str,
Expand All @@ -663,5 +668,5 @@ def _raw_insert():
settings=settings, fmt=fmt, compression=compression)

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, _raw_insert)
result = await loop.run_in_executor(self.executor, _raw_insert)
return result

0 comments on commit ffd01f3

Please sign in to comment.