Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tests): sync client verification tests #1046

Draft
wants to merge 2 commits into
base: cross_sync2_pr3_generated_sync
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cross_sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ Generation can be initiated using `nox -s generate_sync`
from the root of the project. This will find all classes with the `__CROSS_SYNC_OUTPUT__ = "path/to/output"`
annotation, and generate a sync version of classes marked with `@CrossSync.convert_sync` at the output path.

There is a unit test at `tests/unit/data/test_sync_up_to_date.py` that verifies that the generated code is up to date

## Architecture

CrossSync is made up of two parts:
Expand Down
12 changes: 11 additions & 1 deletion .github/workflows/conformance.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,15 @@ jobs:
matrix:
test-version: [ "v0.0.2" ]
py-version: [ 3.8 ]
client-type: [ "async", "legacy" ]
client-type: [ "async", "sync", "legacy" ]
include:
- client-type: "sync"
# sync client does not support concurrent streams
test_args: "-skip _Generic_MultiStream"
- client-type: "legacy"
# legacy client is synchtonous and does not support concurrent streams
# legacy client does not expose mutate_row. Disable those tests
test_args: "-skip _Generic_MultiStream -skip TestMutateRow_"
fail-fast: false
name: "${{ matrix.client-type }} client / python ${{ matrix.py-version }} / test tag ${{ matrix.test-version }}"
steps:
Expand All @@ -53,4 +61,6 @@ jobs:
env:
CLIENT_TYPE: ${{ matrix.client-type }}
PYTHONUNBUFFERED: 1
TEST_ARGS: ${{ matrix.test_args }}
PROXY_PORT: 9999

10 changes: 1 addition & 9 deletions .kokoro/conformance.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,7 @@ set -eo pipefail
## cd to the parent directory, i.e. the root of the git repo
cd $(dirname $0)/..

PROXY_ARGS=""
TEST_ARGS=""
if [[ "${CLIENT_TYPE^^}" == "LEGACY" ]]; then
echo "Using legacy client"
# legacy client does not expose mutate_row. Disable those tests
TEST_ARGS="-skip TestMutateRow_"
fi

# Build and start the proxy in a separate process
PROXY_PORT=9999
pushd test_proxy
nohup python test_proxy.py --port $PROXY_PORT --client_type=$CLIENT_TYPE &
proxyPID=$!
Expand All @@ -42,6 +33,7 @@ function cleanup() {
trap cleanup EXIT

# Run the conformance test
echo "running tests with args: $TEST_ARGS"
pushd cloud-bigtable-clients-test/tests
eval "go test -v -proxy_addr=:$PROXY_PORT $TEST_ARGS"
RETURN_CODE=$?
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ def system_emulated(session):


@nox.session(python=SYSTEM_TEST_PYTHON_VERSIONS)
@nox.parametrize("client_type", ["async"])
@nox.parametrize("client_type", ["async", "sync", "legacy"])
def conformance(session, client_type):
# install dependencies
constraints_path = str(
Expand Down
2 changes: 1 addition & 1 deletion test_proxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ python test_proxy.py --port 8080
```

By default, the test_proxy targets the async client. You can change this by passing in the `--client_type` flag.
Valid options are `async` and `legacy`.
Valid options are `async`, `sync`, and `legacy`.

```
python test_proxy.py --client_type=legacy
Expand Down
185 changes: 185 additions & 0 deletions test_proxy/handlers/client_handler_data_sync_autogen.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This file is automatically generated by CrossSync. Do not edit manually.

"""
This module contains the client handler process for proxy_server.py.
"""
import os
from google.cloud.environment_vars import BIGTABLE_EMULATOR
from google.cloud.bigtable.data._cross_sync import CrossSync
from client_handler_data_async import error_safe


class TestProxyClientHandler:
"""
Implements the same methods as the grpc server, but handles the client
library side of the request.

Requests received in TestProxyGrpcServer are converted to a dictionary,
and supplied to the TestProxyClientHandler methods as kwargs.
The client response is then returned back to the TestProxyGrpcServer
"""

def __init__(
self,
data_target=None,
project_id=None,
instance_id=None,
app_profile_id=None,
per_operation_timeout=None,
**kwargs
):
self.closed = False
os.environ[BIGTABLE_EMULATOR] = data_target
self.client = CrossSync._Sync_Impl.DataClient(project=project_id)
self.instance_id = instance_id
self.app_profile_id = app_profile_id
self.per_operation_timeout = per_operation_timeout

def close(self):
self.closed = True

@error_safe
async def ReadRows(self, request, **kwargs):
table_id = request.pop("table_name").split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
result_list = table.read_rows(request, **kwargs)
serialized_response = [row._to_dict() for row in result_list]
return serialized_response

@error_safe
async def ReadRow(self, row_key, **kwargs):
table_id = kwargs.pop("table_name").split("/")[-1]
app_profile_id = self.app_profile_id or kwargs.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
result_row = table.read_row(row_key, **kwargs)
if result_row:
return result_row._to_dict()
else:
return "None"

@error_safe
async def MutateRow(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import Mutation

table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
row_key = request["row_key"]
mutations = [Mutation._from_dict(d) for d in request["mutations"]]
table.mutate_row(row_key, mutations, **kwargs)
return "OK"

@error_safe
async def BulkMutateRows(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import RowMutationEntry

table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
entry_list = [
RowMutationEntry._from_dict(entry) for entry in request["entries"]
]
table.bulk_mutate_rows(entry_list, **kwargs)
return "OK"

@error_safe
async def CheckAndMutateRow(self, request, **kwargs):
from google.cloud.bigtable.data.mutations import Mutation, SetCell

table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
row_key = request["row_key"]
true_mutations = []
for mut_dict in request.get("true_mutations", []):
try:
true_mutations.append(Mutation._from_dict(mut_dict))
except ValueError:
mutation = SetCell("", "", "", 0)
true_mutations.append(mutation)
false_mutations = []
for mut_dict in request.get("false_mutations", []):
try:
false_mutations.append(Mutation._from_dict(mut_dict))
except ValueError:
false_mutations.append(SetCell("", "", "", 0))
predicate_filter = request.get("predicate_filter", None)
result = table.check_and_mutate_row(
row_key,
predicate_filter,
true_case_mutations=true_mutations,
false_case_mutations=false_mutations,
**kwargs
)
return result

@error_safe
async def ReadModifyWriteRow(self, request, **kwargs):
from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule
from google.cloud.bigtable.data.read_modify_write_rules import AppendValueRule

table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
row_key = request["row_key"]
rules = []
for rule_dict in request.get("rules", []):
qualifier = rule_dict["column_qualifier"]
if "append_value" in rule_dict:
new_rule = AppendValueRule(
rule_dict["family_name"], qualifier, rule_dict["append_value"]
)
else:
new_rule = IncrementRule(
rule_dict["family_name"], qualifier, rule_dict["increment_amount"]
)
rules.append(new_rule)
result = table.read_modify_write_row(row_key, rules, **kwargs)
if result:
return result._to_dict()
else:
return "None"

@error_safe
async def SampleRowKeys(self, request, **kwargs):
table_id = request["table_name"].split("/")[-1]
app_profile_id = self.app_profile_id or request.get("app_profile_id", None)
table = self.client.get_table(self.instance_id, table_id, app_profile_id)
kwargs["operation_timeout"] = (
kwargs.get("operation_timeout", self.per_operation_timeout) or 20
)
result = table.sample_row_keys(**kwargs)
return result
17 changes: 15 additions & 2 deletions test_proxy/run_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fi
SCRIPT_DIR=$(realpath $(dirname "$0"))
cd $SCRIPT_DIR

export PROXY_SERVER_PORT=50055
export PROXY_SERVER_PORT=$(shuf -i 50000-60000 -n 1)

# download test suite
if [ ! -d "cloud-bigtable-clients-test" ]; then
Expand All @@ -43,6 +43,19 @@ function finish {
}
trap finish EXIT

if [[ $CLIENT_TYPE == "legacy" ]]; then
echo "Using legacy client"
# legacy client does not expose mutate_row. Disable those tests
TEST_ARGS="-skip TestMutateRow_"
fi

if [[ $CLIENT_TYPE != "async" ]]; then
echo "Using legacy client"
# sync and legacy client do not support concurrent streams
TEST_ARGS="$TEST_ARGS -skip _Generic_MultiStream "
fi

# run tests
pushd cloud-bigtable-clients-test/tests
go test -v -proxy_addr=:$PROXY_SERVER_PORT
echo "Running with $TEST_ARGS"
go test -v -proxy_addr=:$PROXY_SERVER_PORT $TEST_ARGS
5 changes: 4 additions & 1 deletion test_proxy/test_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ def format_dict(input_obj):
if client_type == "legacy":
import client_handler_legacy
client = client_handler_legacy.LegacyTestProxyClientHandler(**json_data)
elif client_type == "sync":
import client_handler_data_sync_autogen
client = client_handler_data_sync_autogen.TestProxyClientHandler(**json_data)
else:
client = client_handler_data_async.TestProxyClientHandlerAsync(**json_data)
client_map[client_id] = client
Expand Down Expand Up @@ -150,7 +153,7 @@ def client_handler_process(request_q, queue_pool, client_type="async"):

p = argparse.ArgumentParser()
p.add_argument("--port", dest='port', default="50055")
p.add_argument("--client_type", dest='client_type', default="async", choices=["async", "legacy"])
p.add_argument("--client_type", dest='client_type', default="async", choices=["async", "sync", "legacy"])

if __name__ == "__main__":
port = p.parse_args().port
Expand Down
Loading
Loading