diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f91dd95ee..4e24cb833 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -149,7 +149,6 @@ jobs: chmod a+x builder ./builder build -p ${{ env.PACKAGE_NAME }} - localhost-test-linux: runs-on: ubuntu-20.04 # latest steps: @@ -179,3 +178,33 @@ jobs: run: | python -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')" python builder.pyz build -p aws-c-http --cmake-extra=-DENABLE_LOCALHOST_INTEGRATION_TESTS=ON + + localhost-canary-linux: + runs-on: ubuntu-20.04 # latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Build and test + run: | + python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')" + python3 builder.pyz build -p aws-c-http --cmake-extra=-DAWS_BUILD_CANARY=ON + + localhost-canary-mac: + runs-on: macos-11 # latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Build and test + run: | + python3 -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')" + python3 builder.pyz build -p aws-c-http --cmake-extra=-DAWS_BUILD_CANARY=ON + + localhost-canary-win: + runs-on: windows-2022 # latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Build and test + run: | + python -c "from urllib.request import urlretrieve; urlretrieve('${{ env.BUILDER_HOST }}/${{ env.BUILDER_SOURCE }}/${{ env.BUILDER_VERSION }}/builder.pyz?run=${{ env.RUN }}', 'builder.pyz')" + python builder.pyz build -p aws-c-http --cmake-extra=-DAWS_BUILD_CANARY=ON diff --git a/CMakeLists.txt b/CMakeLists.txt index cedc6ae98..bc4d6856a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -7,6 +7,7 @@ endif() option(ENABLE_PROXY_INTEGRATION_TESTS "Whether to run the proxy integration tests that rely on pre-configured proxy" OFF) option(ENABLE_LOCALHOST_INTEGRATION_TESTS "Whether to run the integration tests that rely on pre-configured localhost" OFF) +option(AWS_BUILD_CANARY "Whether to build the canary for benchmark the performance of our http client" OFF) if (DEFINED CMAKE_PREFIX_PATH) file(TO_CMAKE_PATH "${CMAKE_PREFIX_PATH}" CMAKE_PREFIX_PATH) @@ -101,5 +102,8 @@ if (NOT BYO_CRYPTO AND BUILD_TESTING) add_subdirectory(tests) if (NOT CMAKE_CROSSCOMPILING) add_subdirectory(bin/elasticurl) + if (AWS_BUILD_CANARY) + add_subdirectory(bin/canary) + endif() endif() endif() diff --git a/bin/canary/CMakeLists.txt b/bin/canary/CMakeLists.txt new file mode 100644 index 000000000..2fa8642d3 --- /dev/null +++ b/bin/canary/CMakeLists.txt @@ -0,0 +1,28 @@ +project(canary C) + +list(APPEND CMAKE_MODULE_PATH "${CMAKE_INSTALL_PREFIX}/lib/cmake") + +file(GLOB CANARY_SRC + "*.c" + ) + +set(PROJECT_NAME canary) +add_executable(${PROJECT_NAME} ${CANARY_SRC}) +aws_set_common_properties(${PROJECT_NAME}) + +target_include_directories(${PROJECT_NAME} PUBLIC + $ + $) + +target_link_libraries(${PROJECT_NAME} aws-c-http) + +if (BUILD_SHARED_LIBS AND NOT WIN32) + message(INFO " canary will be built with shared libs, but you may need to set LD_LIBRARY_PATH=${CMAKE_INSTALL_PREFIX}/lib to run the application") +endif() + +install(TARGETS ${PROJECT_NAME} + EXPORT ${PROJECT_NAME}-targets + COMPONENT Runtime + RUNTIME + DESTINATION bin + COMPONENT Runtime) diff --git a/bin/canary/main.c b/bin/canary/main.c new file mode 100644 index 000000000..4ba0a3559 --- /dev/null +++ b/bin/canary/main.c @@ -0,0 +1,526 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#ifdef _MSC_VER +# pragma warning(disable : 4204) /* Declared initializers */ +# pragma warning(disable : 4221) /* Local var in declared initializer */ +#endif + +#define DEFINE_HEADER(NAME, VALUE) \ + { .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(NAME), .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(VALUE), } + +/* TODO: Make those configurable from cmd line */ +const struct aws_byte_cursor uri_cursor = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("http://localhost:3280/"); +const int rate_secs = 30; /* Time interval to collect data */ +const int streams_per_connection = 20; +const int max_connections = 8; +const int num_data_to_collect = 5; /* The number of data to collect */ +const enum aws_log_level log_level = AWS_LOG_LEVEL_NONE; +const bool direct_connection = false; /* If true, will create one connection and make requests from that connection. + * If false, will use stream manager to acquire streams */ + +const double rate_threshold = + 4000; /* From the previous tests. All platforms seem to be larger than 4000, but it could various. TODO: Maybe + gather the number of previous test run, and be platform specific. */ + +struct aws_http_canary_helper { + struct aws_task task; + struct aws_event_loop *eventloop; + + int num_collected; /* number of data collected */ + uint64_t rate_ns; /* Collect data per rate_ns */ + + struct aws_atomic_var canary_finished; + + double *results; +}; + +struct canary_ctx { + struct aws_allocator *allocator; + const char *verb; + struct aws_uri uri; + struct aws_mutex mutex; + struct aws_condition_variable c_var; + + enum aws_log_level log_level; + struct aws_http_canary_helper helper; + struct aws_event_loop_group *el_group; + struct aws_http2_stream_manager *manager; + + bool is_shutdown_complete; + struct aws_atomic_var streams_failed; + struct aws_atomic_var streams_completed; + + int batch_size; + struct aws_atomic_var batch_completed; + + struct aws_http_connection *connection; +}; + +/************************* Data collector ******************************************/ + +static void s_collect_data_task(struct aws_task *task, void *arg, enum aws_task_status status) { + (void)status; + (void)task; + + struct canary_ctx *app_ctx = arg; + struct aws_http_canary_helper *helper = &app_ctx->helper; + + /* collect data */ + size_t stream_completed = aws_atomic_exchange_int(&app_ctx->streams_completed, 0); + + /* TODO: maybe collect the data somewhere instead of just printing it out. */ + double rate = (double)stream_completed / rate_secs; + helper->results[helper->num_collected] = rate; + ++helper->num_collected; + printf("Loop %d: The stream completed per second is %f\n", helper->num_collected, rate); + if (helper->num_collected >= num_data_to_collect) { + /* done */ + double sum = 0; + for (int i = 0; i < num_data_to_collect; i++) { + sum += helper->results[i]; + } + double avg = sum / num_data_to_collect; + printf("In average, the stream completed per second is %f\n", avg); + aws_mem_release(app_ctx->allocator, helper->results); + if (avg < rate_threshold) { + + fprintf(stderr, "The average result is lower than threshold (%f). Failed\n", rate_threshold); + exit(1); + } + + aws_atomic_store_int(&helper->canary_finished, 1); + } else { + /* keep running */ + uint64_t now = 0; + aws_high_res_clock_get_ticks(&now); + aws_event_loop_schedule_task_future(helper->eventloop, &helper->task, now + helper->rate_ns); + } +} + +void aws_http_canary_helper_init(struct canary_ctx *app_ctx, struct aws_http_canary_helper *helper) { + + helper->eventloop = aws_event_loop_group_get_next_loop(app_ctx->el_group); + helper->rate_ns = aws_timestamp_convert(rate_secs, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL); + aws_atomic_init_int(&helper->canary_finished, 0); + aws_task_init(&helper->task, s_collect_data_task, app_ctx, "data_collector"); + helper->results = aws_mem_calloc(app_ctx->allocator, num_data_to_collect, sizeof(double)); + uint64_t now = 0; + aws_high_res_clock_get_ticks(&now); + + aws_event_loop_schedule_task_future(helper->eventloop, &helper->task, now + helper->rate_ns); +} + +/************************* Stream callbacks ******************************************/ + +static void s_on_stream_acquired(struct aws_http_stream *stream, int error_code, void *user_data) { + (void)stream; + (void)user_data; + + if (error_code) { + fprintf(stderr, "stream failed to be acquired from stream manager %s\n", aws_error_debug_str(error_code)); + exit(1); + } +} + +static void s_on_stream_complete(struct aws_http_stream *stream, int error_code, void *user_data) { + (void)stream; + + struct canary_ctx *app_ctx = user_data; + aws_mutex_lock(&app_ctx->mutex); + aws_atomic_fetch_add(&app_ctx->batch_completed, 1); + if (error_code) { + fprintf(stderr, "stream failed to complete %s\n", aws_error_debug_str(error_code)); + exit(1); + } else { + aws_atomic_fetch_add(&app_ctx->streams_completed, 1); + } + + aws_mutex_unlock(&app_ctx->mutex); + aws_http_stream_release(stream); + aws_condition_variable_notify_one(&app_ctx->c_var); +} + +/************************* Stream manager ops ******************************************/ + +static bool s_are_batch_completed(void *context) { + struct canary_ctx *app_ctx = context; + size_t completed = aws_atomic_load_int(&app_ctx->batch_completed); + return (int)completed >= app_ctx->batch_size; +} + +static int s_wait_on_batch_complete(struct canary_ctx *app_ctx) { + + aws_mutex_lock(&app_ctx->mutex); + int signal_error = + aws_condition_variable_wait_pred(&app_ctx->c_var, &app_ctx->mutex, s_are_batch_completed, app_ctx); + aws_mutex_unlock(&app_ctx->mutex); + + return signal_error; +} + +static void s_run_stream_manager_test(struct canary_ctx *app_ctx, struct aws_http_message *request) { + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .user_data = app_ctx, + .on_complete = s_on_stream_complete, + }; + + struct aws_http2_stream_manager_acquire_stream_options acquire_stream_option = { + .options = &request_options, + .callback = s_on_stream_acquired, + .user_data = app_ctx, + }; + + bool keep_loop = true; + while (keep_loop) { + /* Loop a batch of requests to be made and completed */ + aws_atomic_store_int(&app_ctx->batch_completed, 0); + + for (int i = 0; i < app_ctx->batch_size; ++i) { + aws_http2_stream_manager_acquire_stream(app_ctx->manager, &acquire_stream_option); + } + /* once the data finished collected during waiting, no more data will be collected, still wait for all + requests + * made to be completed. */ + s_wait_on_batch_complete(app_ctx); + size_t streams_failed = aws_atomic_load_int(&app_ctx->streams_failed); + if (streams_failed > 0) { + fprintf( + stderr, "%zu stream failed to complete %s\n", streams_failed, aws_error_debug_str(aws_last_error())); + exit(1); + } + + size_t finished = aws_atomic_load_int(&app_ctx->helper.canary_finished); + if (finished) { + keep_loop = false; + } + } +} + +static void s_on_shutdown_complete(void *user_data) { + struct canary_ctx *app_ctx = user_data; + + aws_mutex_lock(&app_ctx->mutex); + app_ctx->is_shutdown_complete = true; + aws_mutex_unlock(&app_ctx->mutex); + aws_condition_variable_notify_one(&app_ctx->c_var); +} + +/************************* direct connection ops ******************************************/ + +static void s_run_direct_connection_test(struct canary_ctx *app_ctx, struct aws_http_message *request) { + struct aws_http_make_request_options request_options = { + .self_size = sizeof(request_options), + .request = request, + .user_data = app_ctx, + .on_complete = s_on_stream_complete, + }; + + bool keep_loop = true; + while (keep_loop) { + /* Loop a batch of requests to be made and completed */ + aws_atomic_store_int(&app_ctx->batch_completed, 0); + + for (int i = 0; i < app_ctx->batch_size; ++i) { + struct aws_http_stream *stream = aws_http_connection_make_request(app_ctx->connection, &request_options); + aws_http_stream_activate(stream); + } + /* once the data finished collected during waiting, no more data will be collected, still wait for all + requests + * made to be completed. */ + s_wait_on_batch_complete(app_ctx); + size_t streams_failed = aws_atomic_load_int(&app_ctx->streams_failed); + if (streams_failed > 0) { + fprintf( + stderr, "%zu stream failed to complete %s\n", streams_failed, aws_error_debug_str(aws_last_error())); + exit(1); + } + + size_t finished = aws_atomic_load_int(&app_ctx->helper.canary_finished); + if (finished) { + keep_loop = false; + } + } +} + +static void s_on_connection_shutdown(struct aws_http_connection *connection, int error_code, void *user_data) { + (void)connection; + (void)error_code; + struct canary_ctx *app_ctx = user_data; + + aws_mutex_lock(&app_ctx->mutex); + app_ctx->is_shutdown_complete = true; + aws_mutex_unlock(&app_ctx->mutex); + aws_condition_variable_notify_one(&app_ctx->c_var); +} + +static void s_on_client_connection_setup(struct aws_http_connection *connection, int error_code, void *user_data) { + if (error_code) { + fprintf(stderr, "Failed to create connection with error %s\n", aws_error_debug_str(aws_last_error())); + exit(1); + } + struct canary_ctx *app_ctx = user_data; + + aws_mutex_lock(&app_ctx->mutex); + app_ctx->connection = connection; + aws_mutex_unlock(&app_ctx->mutex); + aws_condition_variable_notify_one(&app_ctx->c_var); +} + +static bool s_is_connected(void *context) { + struct canary_ctx *app_ctx = context; + return app_ctx->connection != NULL; +} + +/************************* general ops ******************************************/ + +static bool s_is_shutdown_complete(void *context) { + struct canary_ctx *app_ctx = context; + return app_ctx->is_shutdown_complete; +} + +static struct aws_http_message *s_create_request(struct canary_ctx *app_ctx) { + struct aws_http_message *request = aws_http2_message_new_request(app_ctx->allocator); + + struct aws_http_header request_headers_src[] = { + DEFINE_HEADER(":method", "GET"), + { + .name = aws_byte_cursor_from_c_str(":scheme"), + .value = *aws_uri_scheme(&app_ctx->uri), + }, + { + .name = aws_byte_cursor_from_c_str(":path"), + .value = *aws_uri_path(&app_ctx->uri), + }, + { + .name = aws_byte_cursor_from_c_str(":authority"), + .value = *aws_uri_host_name(&app_ctx->uri), + }, + }; + aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src)); + return request; +} + +static void s_run_canary(struct canary_ctx *app_ctx) { + aws_http_canary_helper_init(app_ctx, &app_ctx->helper); + struct aws_http_message *request = s_create_request(app_ctx); + + if (direct_connection) { + s_run_direct_connection_test(app_ctx, request); + } else { + s_run_stream_manager_test(app_ctx, request); + } + + aws_http_message_release(request); +} + +int main(int argc, char **argv) { + (void)argc; + (void)argv; + + struct aws_allocator *allocator = aws_default_allocator(); + + aws_http_library_init(allocator); + + struct canary_ctx app_ctx; + AWS_ZERO_STRUCT(app_ctx); + app_ctx.allocator = allocator; + app_ctx.batch_size = max_connections * streams_per_connection; + app_ctx.log_level = log_level; + + aws_mutex_init(&app_ctx.mutex); + aws_condition_variable_init(&app_ctx.c_var); + + struct aws_logger logger; + AWS_ZERO_STRUCT(logger); + + if (app_ctx.log_level) { + struct aws_logger_standard_options options = { + .level = app_ctx.log_level, + .file = stderr, + }; + + if (aws_logger_init_standard(&logger, allocator, &options)) { + fprintf(stderr, "Failed to initialize logger with error %s\n", aws_error_debug_str(aws_last_error())); + exit(1); + } + + aws_logger_set(&logger); + } + if (aws_uri_init_parse(&app_ctx.uri, allocator, &uri_cursor)) { + fprintf(stderr, "Failed to create uri %s\n", aws_error_debug_str(aws_last_error())); + exit(1); + } + + aws_atomic_store_int(&app_ctx.streams_completed, 0); + aws_atomic_store_int(&app_ctx.streams_failed, 0); + + bool use_tls = true; + uint16_t port = 443; + + if (!app_ctx.uri.scheme.len && (app_ctx.uri.port == 80 || app_ctx.uri.port == 8080)) { + use_tls = false; + } else { + if (aws_byte_cursor_eq_c_str_ignore_case(&app_ctx.uri.scheme, "http")) { + use_tls = false; + } + } + + struct aws_tls_ctx *tls_ctx = NULL; + struct aws_tls_ctx_options tls_ctx_options; + AWS_ZERO_STRUCT(tls_ctx_options); + struct aws_tls_connection_options tls_connection_options; + AWS_ZERO_STRUCT(tls_connection_options); + struct aws_tls_connection_options *tls_options = NULL; + + if (use_tls) { + aws_tls_ctx_options_init_default_client(&tls_ctx_options, allocator); + aws_tls_ctx_options_set_verify_peer(&tls_ctx_options, false); + + if (aws_tls_ctx_options_set_alpn_list(&tls_ctx_options, "h2;http/1.1")) { + fprintf(stderr, "Failed to load alpn list with error %s.", aws_error_debug_str(aws_last_error())); + exit(1); + } + + tls_ctx = aws_tls_client_ctx_new(allocator, &tls_ctx_options); + + if (!tls_ctx) { + fprintf(stderr, "Failed to initialize TLS context with error %s.", aws_error_debug_str(aws_last_error())); + exit(1); + } + + aws_tls_connection_options_init_from_ctx(&tls_connection_options, tls_ctx); + if (aws_tls_connection_options_set_server_name(&tls_connection_options, allocator, &app_ctx.uri.host_name)) { + fprintf(stderr, "Failed to set servername with error %s.", aws_error_debug_str(aws_last_error())); + exit(1); + } + tls_options = &tls_connection_options; + + if (app_ctx.uri.port) { + port = app_ctx.uri.port; + } + } else { + port = 80; + if (app_ctx.uri.port) { + port = app_ctx.uri.port; + } + } + + app_ctx.el_group = aws_event_loop_group_new_default(allocator, 0, NULL); + + struct aws_host_resolver_default_options resolver_options = { + .el_group = app_ctx.el_group, + .max_entries = 8, + }; + + struct aws_host_resolver *resolver = aws_host_resolver_new_default(allocator, &resolver_options); + + struct aws_client_bootstrap_options bootstrap_options = { + .event_loop_group = app_ctx.el_group, + .host_resolver = resolver, + }; + struct aws_client_bootstrap *bootstrap = aws_client_bootstrap_new(allocator, &bootstrap_options); + + struct aws_socket_options socket_options = { + .type = AWS_SOCKET_STREAM, + .connect_timeout_ms = 3000, + .keep_alive_timeout_sec = 0, + .keepalive = false, + .keep_alive_interval_sec = 0, + }; + if (!direct_connection) { + struct aws_http2_stream_manager_options sm_options = { + .bootstrap = bootstrap, + .socket_options = &socket_options, + .tls_connection_options = use_tls ? tls_options : NULL, + .host = app_ctx.uri.host_name, + .port = port, + .max_connections = max_connections, + .max_concurrent_streams_per_connection = streams_per_connection, + .http2_prior_knowledge = !use_tls, + .shutdown_complete_user_data = &app_ctx, + .shutdown_complete_callback = s_on_shutdown_complete, + }; + app_ctx.manager = aws_http2_stream_manager_new(allocator, &sm_options); + } else { + struct aws_http_client_connection_options http_client_options = { + .self_size = sizeof(struct aws_http_client_connection_options), + .socket_options = &socket_options, + .allocator = allocator, + .port = port, + .host_name = app_ctx.uri.host_name, + .bootstrap = bootstrap, + .initial_window_size = SIZE_MAX, + .tls_options = tls_options, + .user_data = &app_ctx, + .on_setup = s_on_client_connection_setup, + .on_shutdown = s_on_connection_shutdown, + }; + if (aws_http_client_connect(&http_client_options)) { + exit(1); + } + aws_mutex_lock(&app_ctx.mutex); + aws_condition_variable_wait_pred(&app_ctx.c_var, &app_ctx.mutex, s_is_connected, &app_ctx); + aws_mutex_unlock(&app_ctx.mutex); + } + + /* Really do the job */ + s_run_canary(&app_ctx); + + if (!direct_connection) { + aws_http2_stream_manager_release(app_ctx.manager); + } else { + aws_http_connection_release(app_ctx.connection); + } + + aws_mutex_lock(&app_ctx.mutex); + aws_condition_variable_wait_pred(&app_ctx.c_var, &app_ctx.mutex, s_is_shutdown_complete, &app_ctx); + aws_mutex_unlock(&app_ctx.mutex); + + aws_client_bootstrap_release(bootstrap); + aws_host_resolver_release(resolver); + aws_event_loop_group_release(app_ctx.el_group); + + if (tls_ctx) { + aws_tls_connection_options_clean_up(&tls_connection_options); + aws_tls_ctx_release(tls_ctx); + aws_tls_ctx_options_clean_up(&tls_ctx_options); + } + + aws_http_library_clean_up(); + + if (app_ctx.log_level) { + aws_logger_clean_up(&logger); + } + + aws_uri_clean_up(&app_ctx.uri); + + return 0; +} diff --git a/builder.json b/builder.json index a313381af..d9f1e9362 100644 --- a/builder.json +++ b/builder.json @@ -18,6 +18,7 @@ "pre_build_steps": ["local-server-setup"], "test_steps": [ "aws-c-http-test", - ["{python}", "{source_dir}/integration-testing/http_client_test.py", "{install_dir}/bin/elasticurl{exe}"] + ["{python}", "{source_dir}/integration-testing/http_client_test.py", "{install_dir}/bin/elasticurl{exe}"], + ["{python}", "{source_dir}/integration-testing/http_client_canary.py", "{install_dir}/bin/canary{exe}"] ] } diff --git a/include/aws/http/private/h2_connection.h b/include/aws/http/private/h2_connection.h index 6d42b8316..bea809dc8 100644 --- a/include/aws/http/private/h2_connection.h +++ b/include/aws/http/private/h2_connection.h @@ -98,6 +98,17 @@ struct aws_h2_connection { * Reduce the space after receiving a flow-controlled frame. Increment after sending WINDOW_UPDATE for * connection */ size_t window_size_self; + /* The self window size dropped before the client send window update automatically. + * When manual management for connection window is off, the dropped size equals to the size of data frame + * received. + * When manual management for connection window is on, the dropped size equals to the size of all the padding in + * the data frame received */ + uint32_t window_size_self_dropped; + /* The threshold to send out a window update frame. When the window_size_self_dropped is larger than the + * threshold, client will automatically send a WINDOW_UPDATE frame with the dropped size to keep flow continues. + * TODO: expose this to user + */ + uint32_t window_size_self_dropped_threshold; /* Highest self-initiated stream-id that peer might have processed. * Defaults to max stream-id, may be lowered when GOAWAY frame received. */ diff --git a/include/aws/http/private/h2_stream.h b/include/aws/http/private/h2_stream.h index 62de106c3..d48ff048d 100644 --- a/include/aws/http/private/h2_stream.h +++ b/include/aws/http/private/h2_stream.h @@ -84,6 +84,18 @@ struct aws_h2_stream { * We allow this value exceed the max window size (int64 can hold much more than 0x7FFFFFFF), * We leave it up to the remote peer to detect whether the max window size has been exceeded. */ int64_t window_size_self; + /* The self window size dropped before the client send window update automatically. + * When manual management for stream window is off, the dropped size equals to the size of data frame + * received. + * When manual management for stream window is on, the dropped size equals to the size of all the padding in + * the data frame received */ + uint32_t window_size_self_dropped; + /* The threshold to send out a window update frame. When the window_size_self_dropped is larger than the + * threshold, client will automatically send a WINDOW_UPDATE frame with the dropped size to keep flow continues. + * TODO: expose this to user + */ + uint32_t window_size_self_dropped_threshold; + struct aws_http_message *outgoing_message; /* All queued writes. If the message provides a body stream, it will be first in this list * This list can drain, which results in the stream being put to sleep (moved to waiting_streams_list in diff --git a/integration-testing/http_client_canary.py b/integration-testing/http_client_canary.py new file mode 100644 index 000000000..d3032ff5f --- /dev/null +++ b/integration-testing/http_client_canary.py @@ -0,0 +1,51 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0. + +import sys +import os.path +import subprocess + +TIMEOUT = 300 + +canary_args = sys.argv[1:] +if not canary_args: + print('You must pass the canary cmd prefix') + sys.exit(-1) + +program_to_run = canary_args[0] + +if not os.path.exists(program_to_run): + print(f'the {program_to_run} is not found, skip canary test') + sys.exit(0) + +# We don't have args to pass to canary yet. TODO add args for canary + + +def run_command(args): + # gather all stderr and stdout to a single string that we print only if things go wrong + process = subprocess.Popen( + args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + timedout = False + try: + output = process.communicate(timeout=TIMEOUT)[0] + except subprocess.TimeoutExpired: + timedout = True + process.kill() + output = process.communicate()[0] + finally: + if process.returncode != 0 or timedout: + args_str = subprocess.list2cmdline(args) + print(args_str) + for line in output.splitlines(): + print(line.decode()) + if timedout: + raise RuntimeError("Timeout happened after {secs} secs from: {cmd}".format( + secs=TIMEOUT, cmd=args_str)) + else: + raise RuntimeError("Return code {code} from: {cmd}".format( + code=process.returncode, cmd=args_str)) + else: + print(output.decode("utf-8")) + + +run_command(canary_args) diff --git a/integration-testing/http_client_test.py b/integration-testing/http_client_test.py index 1bd08739b..888cdb2a8 100644 --- a/integration-testing/http_client_test.py +++ b/integration-testing/http_client_test.py @@ -19,7 +19,7 @@ if 'bin' in program_to_run: if not os.path.exists(program_to_run): - print('the program_to_run is not found, skip integration test') + print(f'the {program_to_run} is not found, skip integration test') sys.exit(0) # Remove args from sys.argv so that unittest doesn't also try to parse them. diff --git a/source/h2_connection.c b/source/h2_connection.c index 15ea192f8..7850e0433 100644 --- a/source/h2_connection.c +++ b/source/h2_connection.c @@ -384,6 +384,8 @@ static struct aws_h2_connection *s_connection_new( connection->thread_data.window_size_peer = AWS_H2_INIT_WINDOW_SIZE; connection->thread_data.window_size_self = AWS_H2_INIT_WINDOW_SIZE; + connection->thread_data.window_size_self_dropped_threshold = 0; + connection->thread_data.goaway_received_last_stream_id = AWS_H2_STREAM_ID_MAX; connection->thread_data.goaway_sent_last_stream_id = AWS_H2_STREAM_ID_MAX; @@ -1246,17 +1248,20 @@ struct aws_h2err s_decoder_on_data_begin( /* Automatically update the full amount we just received */ auto_window_update = payload_len; } - - if (auto_window_update != 0) { - if (s_connection_send_update_window(connection, auto_window_update)) { + if (total_padding_bytes) { + CONNECTION_LOGF(TRACE, connection, "%" PRIu32 " Bytes of padding received.", total_padding_bytes); + } + connection->thread_data.window_size_self_dropped += auto_window_update; + if (connection->thread_data.window_size_self_dropped > connection->thread_data.window_size_self_dropped_threshold) { + if (s_connection_send_update_window(connection, connection->thread_data.window_size_self_dropped)) { return aws_h2err_from_last_error(); } + connection->thread_data.window_size_self_dropped = 0; CONNECTION_LOGF( TRACE, connection, - "Automatically updating connection window by %" PRIu32 "(%" PRIu32 " due to padding).", - auto_window_update, - total_padding_bytes); + "Automatically updating connection window by %" PRIu32 ".", + connection->thread_data.window_size_self_dropped); } return AWS_H2ERR_SUCCESS; @@ -1762,6 +1767,8 @@ static void s_handler_installed(struct aws_channel_handler *handler, struct aws_ aws_linked_list_push_back( &connection->thread_data.outgoing_frames_queue, &connection_window_update_frame->node); connection->thread_data.window_size_self += initial_window_update_size; + /* For automatic window management, we only update connectio windows when it droped blow 50% of MAX. */ + connection->thread_data.window_size_self_dropped_threshold = AWS_H2_WINDOW_UPDATE_MAX / 2; } aws_h2_try_write_outgoing_frames(connection); return; diff --git a/source/h2_stream.c b/source/h2_stream.c index 85232db00..1f3a2f598 100644 --- a/source/h2_stream.c +++ b/source/h2_stream.c @@ -713,6 +713,10 @@ int aws_h2_stream_on_activated(struct aws_h2_stream *stream, enum aws_h2_stream_ stream->thread_data.window_size_self = connection->thread_data.settings_self[AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE]; + if (!connection->base.stream_manual_window_management) { + stream->thread_data.window_size_self_dropped_threshold = + connection->thread_data.settings_self[AWS_HTTP2_SETTINGS_INITIAL_WINDOW_SIZE] / 2; + } if (with_data) { /* If stream has DATA to send, put it in the outgoing_streams_list, and we'll send data later */ stream->thread_data.state = AWS_H2_STREAM_STATE_OPEN; @@ -1078,17 +1082,21 @@ struct aws_h2err aws_h2_stream_on_decoder_data_begin( /* Automatically update the full amount we just received */ auto_window_update = payload_len; } + if (total_padding_bytes) { + AWS_H2_STREAM_LOGF(TRACE, stream, "%" PRIu32 " Bytes of padding received.", total_padding_bytes); + } + stream->thread_data.window_size_self_dropped += auto_window_update; - if (auto_window_update != 0) { - if (s_stream_send_update_window(stream, auto_window_update)) { + if (stream->thread_data.window_size_self_dropped > stream->thread_data.window_size_self_dropped_threshold) { + if (s_stream_send_update_window(stream, stream->thread_data.window_size_self_dropped)) { return aws_h2err_from_last_error(); } + stream->thread_data.window_size_self_dropped = 0; AWS_H2_STREAM_LOGF( TRACE, stream, - "Automatically updating stream window by %" PRIu32 "(%" PRIu32 " due to padding).", - auto_window_update, - total_padding_bytes); + "Automatically updating stream window by %" PRIu32 ".", + stream->thread_data.window_size_self_dropped); } } diff --git a/tests/py_localhost/non_tls_server.py b/tests/py_localhost/non_tls_server.py index 4f50973f2..9b9f5b88b 100644 --- a/tests/py_localhost/non_tls_server.py +++ b/tests/py_localhost/non_tls_server.py @@ -16,7 +16,6 @@ def send_response(conn, event): stream_id=stream_id, headers=[ (':status', '200'), - ('content-length', str(len(response_data))), ], ) conn.send_data( diff --git a/tests/test_h2_client.c b/tests/test_h2_client.c index 0c6c18406..9a9142613 100644 --- a/tests/test_h2_client.c +++ b/tests/test_h2_client.c @@ -2463,24 +2463,18 @@ TEST_CASE(h2_client_stream_send_window_update) { const char *body_src = "hello"; ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, body_src, false /*end_stream*/)); - /* check that 2 WINDOW_UPDATE frames have been sent. - * 1 for the connection, and 1 for the stream */ + /* check that 1 WINDOW_UPDATE frames have been sent. + * 1 for the connection, and no window_update for the stream as window only updates when the window smaller than + * half of the original window */ testing_channel_drain_queued_tasks(&s_tester.testing_channel); ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); struct h2_decoded_frame *stream_window_update_frame = h2_decode_tester_find_stream_frame( &s_tester.peer.decode, AWS_H2_FRAME_T_WINDOW_UPDATE, stream_id, 0 /*idx*/, NULL); - ASSERT_NOT_NULL(stream_window_update_frame); - ASSERT_UINT_EQUALS(5, stream_window_update_frame->window_size_increment); + ASSERT_NULL(stream_window_update_frame); - struct h2_decoded_frame *connection_window_update_frame = h2_decode_tester_find_stream_frame( - &s_tester.peer.decode, - AWS_H2_FRAME_T_WINDOW_UPDATE, - 0 /*stream_id*/, - initial_window_update_index + 1 /*idx*/, - NULL); - ASSERT_NOT_NULL(connection_window_update_frame); - ASSERT_UINT_EQUALS(5, connection_window_update_frame->window_size_increment); + /* For testing automatic window update, we have localhost_integ_h2_download_stress that downloads a file from one + * connection and one stream. If the window was not properly updated, that should fail */ /* clean up */ aws_http_headers_release(response_headers); @@ -3941,17 +3935,11 @@ TEST_CASE(h2_client_manual_window_management_user_send_conn_window_update) { } else { ASSERT_SUCCESS(h2_fake_peer_send_data_frame(&s_tester.peer, stream_id, body_cursor, false /*end_stream*/)); } - /* manually update the stream and connection flow-control window. */ - aws_http_stream_update_window(stream_tester.stream, body_size); + /* manually update connection flow-control window. */ aws_http2_connection_update_window(s_tester.connection, (uint32_t)body_size); testing_channel_drain_queued_tasks(&s_tester.testing_channel); ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer)); - struct h2_decoded_frame *stream_window_update_frame = h2_decode_tester_find_stream_frame( - &s_tester.peer.decode, AWS_H2_FRAME_T_WINDOW_UPDATE, stream_id, 0 /*idx*/, NULL); - ASSERT_NOT_NULL(stream_window_update_frame); - ASSERT_UINT_EQUALS(body_size, stream_window_update_frame->window_size_increment); - struct h2_decoded_frame *connection_window_update_frame = h2_decode_tester_find_stream_frame( &s_tester.peer.decode, AWS_H2_FRAME_T_WINDOW_UPDATE, 0 /*stream_id*/, 0 /*idx*/, NULL); ASSERT_NOT_NULL(connection_window_update_frame); diff --git a/tests/test_localhost_integ.c b/tests/test_localhost_integ.c index 0424c7771..213df2ea7 100644 --- a/tests/test_localhost_integ.c +++ b/tests/test_localhost_integ.c @@ -402,12 +402,6 @@ static int s_localhost_integ_h2_upload_stress(struct aws_allocator *allocator, v s_tester.alloc = allocator; size_t length = 2500000000UL; -#ifdef AWS_OS_LINUX - /* Using Python hyper h2 server frame work, met a weird upload performance issue on Linux. Our client against nginx - * platform has not met the same issue. We assume it's because the server framework implementation. Use lower - * number of linux */ - length = 250000000UL; -#endif struct aws_string *http_localhost_host = NULL; if (aws_get_environment_value(allocator, s_http_localhost_env_var, &http_localhost_host) ||