diff --git a/BUILDING.md b/BUILDING.md index d6834db5e..76f001ec6 100644 --- a/BUILDING.md +++ b/BUILDING.md @@ -26,7 +26,8 @@ But RPP is header-only library, so, without enabling any extra options is just c - `RPP_BUILD_TESTS` - (ON/OFF) build unit tests (default OFF) - `RPP_BUILD_EXAMPLES` - (ON/OFF) build examples of usage of RPP (default OFF) - `RPP_BUILD_SFML_CODE` - (ON/OFF) build RPP code related to SFML or not (default OFF) - requires SFML to be installed -- `RPP_BUILD_QT_CODE` - (ON/OFF) build RPPQT related code (examples/tests)(rppqt module doesn't requires this one) (default OFF) - requires QT5/6 to be installed +- `RPP_BUILD_QT_CODE` - (ON/OFF) build QT related code (examples/tests)(rppqt module doesn't requires this one) (default OFF) - requires QT5/6 to be installed +- `RPP_BUILD_GRPC_CODE` - (ON/OFF) build GRPC related code (examples/tests)(rppgrpc module doesn't requires this one) (default OFF) - requires grpc++/protobuf to be installed By default, it provides rpp and rppqt INTERFACE modules. diff --git a/CMakePresets.json b/CMakePresets.json index 09a71abae..50eaa16eb 100644 --- a/CMakePresets.json +++ b/CMakePresets.json @@ -138,6 +138,13 @@ "RPP_BUILD_QT_CODE" : "ON" } }, + { + "name" : "build-grpc", + "hidden": true, + "cacheVariables": { + "RPP_BUILD_GRPC_CODE" : "ON" + } + }, { "name" : "use-conan", "hidden": true, @@ -149,6 +156,7 @@ }, { "name": "build-dir", + "hidden": true, "binaryDir": "${sourceDir}/build" }, @@ -160,7 +168,7 @@ }, { "name": "ci-coverage-gcc", - "inherits": ["ci-build", "build-tests", "build-qt", "ci-unix", "ci-gcc"], + "inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "ci-unix", "ci-gcc"], "cacheVariables": { "RPP_ENABLE_COVERAGE": "ON", "CMAKE_CXX_FLAGS": "-O0 -g --coverage -fkeep-inline-functions -fkeep-static-functions -fprofile-arcs -ftest-coverage -fno-inline -fno-omit-frame-pointer -fno-optimize-sibling-calls", @@ -171,7 +179,7 @@ }, { "name": "ci-coverage-clang", - "inherits": ["ci-build", "build-tests", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "RPP_ENABLE_COVERAGE": "ON", "CMAKE_CXX_FLAGS": "-fprofile-instr-generate -fcoverage-mapping", @@ -189,51 +197,51 @@ }, { "name": "ci-sanitize-asan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=address -fno-optimize-sibling-calls -fsanitize-address-use-after-scope -fno-omit-frame-pointer -g -O1" } }, { "name": "ci-sanitize-lsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=leak -fno-omit-frame-pointer -g -O1" } }, { "name": "ci-sanitize-msan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=memory -fno-optimize-sibling-calls -fsanitize-memory-track-origins=2 -fno-omit-frame-pointer -g -O2" } }, { "name": "ci-sanitize-ubsan", - "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "ci-unix", "ci-clang"], + "inherits": ["ci-build", "build-tests", "build-benchmarks", "build-qt", "build-grpc", "ci-unix", "ci-clang"], "cacheVariables": { "CMAKE_CXX_FLAGS": "-fsanitize=undefined" } }, { "name": "ci-macos-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-unix"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix"] }, { "name": "ci-ubuntu-clang-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang", "cppcheck", "clang-tidy"] }, { "name": "ci-ubuntu-gcc-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-gcc", "cppcheck", "clang-tidy"] }, { "name": "ci-windows-tests", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-win64"] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-win64"] }, { "name": "ci-ubuntu-clang-tests-no-checks", - "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-sfml", "ci-unix", "ci-clang" ] + "inherits": ["ci-build", "build-tests", "build-examples", "build-qt", "build-grpc", "build-sfml", "ci-unix", "ci-clang" ] }, diff --git a/Doxyfile b/Doxyfile index 92b818846..393db99f7 100644 --- a/Doxyfile +++ b/Doxyfile @@ -947,6 +947,7 @@ INPUT = src/rpp \ src/extensions \ src/examples/rpp/doxygen \ src/examples/rppqt/doxygen \ + src/examples/rppgrpc/doxygen \ docs # This tag can be used to specify the character encoding of the source files @@ -1078,7 +1079,8 @@ EXCLUDE_SYMBOLS = # command). EXAMPLE_PATH = src/examples/rpp/doxygen \ - src/examples/rppqt/doxygen + src/examples/rppqt/doxygen \ + src/examples/rppgrpc/doxygen # If the value of the EXAMPLE_PATH tag contains directories, you can use the # EXAMPLE_PATTERNS tag to specify one or more wildcard pattern (like *.cpp and diff --git a/cmake/dependencies.cmake b/cmake/dependencies.cmake index d343ba044..8bad63ed2 100644 --- a/cmake/dependencies.cmake +++ b/cmake/dependencies.cmake @@ -14,6 +14,33 @@ macro(rpp_handle_3rdparty TARGET_NAME) set_target_properties(${TARGET_NAME} PROPERTIES INTERFACE_SYSTEM_INCLUDE_DIRECTORIES $) endmacro() +macro(rpp_fetch_library_extended NAME URL TAG TARGET_NAME) + Include(FetchContent) + set(BUILD_SHARED_LIBS OFF CACHE INTERNAL "Build SHARED libraries") + + Set(FETCHCONTENT_QUIET FALSE) + + FetchContent_Declare( + ${NAME} + GIT_REPOSITORY ${URL} + GIT_TAG ${TAG} + GIT_SHALLOW TRUE + GIT_PROGRESS TRUE + GIT_SUBMODULES "" + ) + + FetchContent_MakeAvailable(${NAME}) + rpp_handle_3rdparty(${TARGET_NAME}) +endmacro() + +macro(rpp_fetch_library NAME URL TAG) + find_package(${NAME} QUIET) + if (NOT ${NAME}_FOUND) + message("-- RPP: Fetching ${NAME}...") + rpp_fetch_library_extended(${NAME} ${URL} ${TAG} ${NAME}) + endif() +endmacro() + # ===================== SFML ======================= if (RPP_BUILD_SFML_CODE AND RPP_BUILD_EXAMPLES) find_package(SFML COMPONENTS graphics system window REQUIRED) @@ -42,32 +69,43 @@ if (RPP_BUILD_QT_CODE AND (RPP_BUILD_TESTS OR RPP_BUILD_EXAMPLES)) endmacro() endif() -macro(rpp_fetch_library_extended NAME URL TAG TARGET_NAME) - Include(FetchContent) - set(BUILD_SHARED_LIBS OFF CACHE INTERNAL "Build SHARED libraries") +# ========================== GRPC ==================================== +if (RPP_BUILD_GRPC_CODE AND (RPP_BUILD_TESTS OR RPP_BUILD_EXAMPLES)) + find_package(Protobuf CONFIG REQUIRED) + find_package(gRPC CONFIG REQUIRED) - Set(FETCHCONTENT_QUIET FALSE) + rpp_handle_3rdparty(gRPC::grpc++) + rpp_handle_3rdparty(protobuf::protobuf) - FetchContent_Declare( - ${NAME} - GIT_REPOSITORY ${URL} - GIT_TAG ${TAG} - GIT_SHALLOW TRUE - GIT_PROGRESS TRUE - GIT_SUBMODULES "" - ) + macro(rpp_add_proto_target TARGET FILES) + add_library(${TARGET} STATIC ${FILES}) - FetchContent_MakeAvailable(${NAME}) - rpp_handle_3rdparty(${TARGET_NAME}) -endmacro() + target_link_libraries(${TARGET} + PUBLIC + gRPC::grpc++ + protobuf::libprotobuf + ${grpc_LIBRARIES_TARGETS} + ) -macro(rpp_fetch_library NAME URL TAG) - find_package(${NAME} QUIET) - if (NOT ${NAME}_FOUND) - message("-- RPP: Fetching ${NAME} from ${URL} by ${TAG}...") - rpp_fetch_library_extended(${NAME} ${URL} ${TAG} ${NAME}) - endif() -endmacro() + target_include_directories(${TARGET} PUBLIC ${CMAKE_CURRENT_BINARY_DIR}) + + get_target_property(grpc_cpp_plugin_location gRPC::grpc_cpp_plugin LOCATION ) + protobuf_generate(TARGET ${TARGET} OUT_VAR PROTO_FILES LANGUAGE cpp ) + + protobuf_generate( + TARGET ${TARGET} + LANGUAGE grpc + OUT_VAR GRPC_PROTO_FILES + # PROTOC_OUT_DIR "${PROTO_BINARY_DIR}" + PLUGIN protoc-gen-grpc=${grpc_cpp_plugin_location} + GENERATE_EXTENSIONS .grpc.pb.h .grpc.pb.cc) + + + set_target_properties(${TARGET} PROPERTIES INTERFACE_SYSTEM_INCLUDE_DIRECTORIES $) + set_target_properties(${TARGET} PROPERTIES CXX_CLANG_TIDY "") + set_target_properties(${TARGET} PROPERTIES CXX_CPPCHECK "") + endmacro() +endif() # ==================== RXCPP ======================= if (RPP_BUILD_RXCPP AND RPP_BUILD_BENCHMARKS) diff --git a/cmake/install-rules.cmake b/cmake/install-rules.cmake index 500b20bab..f38560340 100644 --- a/cmake/install-rules.cmake +++ b/cmake/install-rules.cmake @@ -8,6 +8,7 @@ install( DIRECTORY src/rpp src/extensions/rppqt + src/extensions/rppgrpc DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}" COMPONENT @@ -25,6 +26,11 @@ install( EXPORT RPPTargets INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppqt" ) +install( + TARGETS rppgrpc + EXPORT RPPTargets + INCLUDES DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/rppgrpc" +) write_basic_package_version_file( "${package}ConfigVersion.cmake" diff --git a/cmake/variables.cmake b/cmake/variables.cmake index 8d91fdb67..93a57a500 100644 --- a/cmake/variables.cmake +++ b/cmake/variables.cmake @@ -82,6 +82,7 @@ endfunction() # ------------ Options to tweak --------------------- option(RPP_BUILD_SFML_CODE "Enable SFML support in examples/code." OFF) option(RPP_BUILD_QT_CODE "Enable QT support in examples/code." OFF) +option(RPP_BUILD_GRPC_CODE "Enable grpc++ support in examples/code." OFF) if (RPP_DEVELOPER_MODE) option(RPP_BUILD_TESTS "Build unit tests tree." OFF) @@ -100,6 +101,9 @@ if (RPP_DEVELOPER_MODE) if (RPP_BUILD_BENCHMARKS) set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_benchmarks=True") endif() + if (RPP_BUILD_GRPC_CODE) + set(CONAN_ARGS "${CONAN_ARGS};-o rpp/*:with_grpc=True") + endif() endif() if(RPP_ENABLE_COVERAGE) diff --git a/conanfile.py b/conanfile.py index f7bb8f74a..355f386da 100644 --- a/conanfile.py +++ b/conanfile.py @@ -32,10 +32,10 @@ def requirements(self): if self.options.with_sfml: self.requires("sfml/2.6.1", options={"audio": False}) - # if self.options.with_grpc: - # self.requires("grpc/1.54.3", transitive_libs=True, transitive_headers=True) - # self.requires("protobuf/3.21.12") - # self.requires("libmount/2.39", override=True) + if self.options.with_grpc: + self.requires("grpc/1.54.3", transitive_libs=True, transitive_headers=True) + self.requires("protobuf/3.21.12") + self.requires("libmount/2.39", override=True) if self.options.with_cmake: self.tool_requires("cmake/3.29.3") diff --git a/src/examples/CMakeLists.txt b/src/examples/CMakeLists.txt index 041d2c3ef..fa5f9428a 100644 --- a/src/examples/CMakeLists.txt +++ b/src/examples/CMakeLists.txt @@ -3,3 +3,7 @@ add_subdirectory(rpp) if(RPP_BUILD_QT_CODE) add_subdirectory(rppqt) endif() + +if (RPP_BUILD_GRPC_CODE) + add_subdirectory(rppgrpc) +endif() diff --git a/src/examples/rppgrpc/CMakeLists.txt b/src/examples/rppgrpc/CMakeLists.txt new file mode 100644 index 000000000..36e62e1b1 --- /dev/null +++ b/src/examples/rppgrpc/CMakeLists.txt @@ -0,0 +1,2 @@ +# add_subdirectory(communication) +add_subdirectory(doxygen) diff --git a/src/examples/rppgrpc/communication/CMakeLists.txt b/src/examples/rppgrpc/communication/CMakeLists.txt new file mode 100644 index 000000000..779b39cad --- /dev/null +++ b/src/examples/rppgrpc/communication/CMakeLists.txt @@ -0,0 +1,9 @@ +set(TARGET rppgrpc_communication) + +rpp_add_proto_target(${TARGET}_proto protocol.proto) + +add_executable(${TARGET}_server server.cpp) +target_link_libraries(${TARGET}_server PRIVATE ${TARGET}_proto rppgrpc) + +add_executable(${TARGET}_client client.cpp) +target_link_libraries(${TARGET}_client PRIVATE ${TARGET}_proto rppgrpc) diff --git a/src/examples/rppgrpc/communication/client.cpp b/src/examples/rppgrpc/communication/client.cpp new file mode 100644 index 000000000..b3ad2d9e8 --- /dev/null +++ b/src/examples/rppgrpc/communication/client.cpp @@ -0,0 +1,78 @@ +#include + +#include +#include + +#include "protocol.grpc.pb.h" +#include "protocol.pb.h" + + +int main() +{ + auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); + if (!channel) + { + std::cout << "NO CHANNEL" << std::endl; + return 0; + } + auto stub = TestService::NewStub(channel); + if (!stub) + { + std::cout << "NO STUB" << std::endl; + return 0; + } + + std::array ctx{}; + auto d = rpp::composite_disposable_wrapper::make(); + + rpp::subjects::publish_subject bidi_requests{}; + rpp::subjects::publish_subject bidi_responses{}; + bidi_responses.get_observable().subscribe(d, [](const Response& v) { + std::cout << "[BidireactionalResponse]: " << v.ShortDebugString() << std::endl; + }); + + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::Bidirectional, + *stub->async(), + &ctx[0], + bidi_requests.get_observable() + | rpp::ops::take_while([](const std::string& v) { return v != "0"; }) + | rpp::ops::map([](const std::string& v) { + Request i{}; + i.set_value(std::string{"BidiRequest "} + v); + return i; + }), + bidi_responses.get_observer()); + + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ClientSide, + *stub->async(), + &ctx[1], + bidi_responses.get_observable() + | rpp::ops::map([](const Response& response) { + Request request{}; + request.set_value(std::string{"ClientSideRequest "} + response.value()); + return request; + }), + rpp::make_lambda_observer(d, [](const Response& v) { + std::cout << "[ClientsideResponse]: " << v.ShortDebugString() << std::endl; + })); + Request req{}; + rppgrpc::add_client_reactor(&TestService::StubInterface::async_interface::ServerSide, + *stub->async(), + &ctx[2], + &req, + rpp::make_lambda_observer(d, [](const Response& v) { + std::cout << "[ServerSideResponse]: " << v.ShortDebugString() << std::endl; + })); + + std::cout << "SUBSCRIBED" << std::endl; + + std::string in{}; + while (!d.is_disposed()) + { + std::getline(std::cin, in); + bidi_requests.get_observer().on_next(in); + in.clear(); + } + + return 0; +} diff --git a/src/examples/rppgrpc/communication/protocol.proto b/src/examples/rppgrpc/communication/protocol.proto new file mode 100644 index 000000000..8703bde05 --- /dev/null +++ b/src/examples/rppgrpc/communication/protocol.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message Request { + string Value = 1; +} + +message Response { + string Value = 1; +} + +service TestService { + rpc ServerSide(Request) returns (stream Response) {} + rpc ClientSide(stream Request) returns (Response) {} + rpc Bidirectional(stream Request) returns (stream Response) {} +} diff --git a/src/examples/rppgrpc/communication/server.cpp b/src/examples/rppgrpc/communication/server.cpp new file mode 100644 index 000000000..90654870f --- /dev/null +++ b/src/examples/rppgrpc/communication/server.cpp @@ -0,0 +1,68 @@ + +#include + +#include +#include + +#include "protocol.grpc.pb.h" +#include "protocol.pb.h" + +class Service : public TestService::CallbackService +{ +public: + Service() + { + client_side_requests.get_observable().subscribe([](const Request& s) { std::cout << "[ClientSideRequest]: " << s.ShortDebugString() << std::endl; }); + } + + grpc::ServerBidiReactor<::Request, ::Response>* Bidirectional(::grpc::CallbackServerContext* /*context*/) override + { + rpp::subjects::publish_subject response{}; + rpp::subjects::publish_subject request{}; + request.get_observable() + | rpp::ops::subscribe([](const Request& s) { std::cout << "[BidireactionalRequest]: " << s.ShortDebugString() << std::endl; }); + request.get_observable() + | rpp::ops::map([](const Request& request) { + Response response{}; + response.set_value(std::string{"BidiResponse "} + request.value()); + return response; + }) + | rpp::ops::subscribe(response.get_observer()); + return rppgrpc::make_server_reactor(response.get_observable(), request.get_observer()); + } + + ::grpc::ServerReadReactor<::Request>* ClientSide(::grpc::CallbackServerContext* /*context*/, ::Response* /*response*/) override + { + return rppgrpc::make_server_reactor(client_side_requests.get_observer()); + } + + ::grpc::ServerWriteReactor<::Response>* ServerSide(::grpc::CallbackServerContext* /*context*/, const ::Request* /*request*/) override + { + return rppgrpc::make_server_reactor(client_side_requests.get_observable() + | rpp::ops::map([](const Request& v) { + Response response{}; + response.set_value(std::string{"ServerSideResponse "} + v.value()); + return response; + })); + } + +private: + rpp::subjects::publish_subject client_side_requests{}; +}; + +int main() +{ + Service service{}; + grpc::ServerBuilder builder{}; + + std::string server_address("localhost:50051"); + + builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); + builder.RegisterService(&service); + + auto server(builder.BuildAndStart()); + std::cout << "Server listening on " << server_address << std::endl; + server->Wait(); + + return 0; +} diff --git a/src/examples/rppgrpc/doxygen/CMakeLists.txt b/src/examples/rppgrpc/doxygen/CMakeLists.txt new file mode 100644 index 000000000..e99bba244 --- /dev/null +++ b/src/examples/rppgrpc/doxygen/CMakeLists.txt @@ -0,0 +1,11 @@ +file(GLOB_RECURSE FILES "*.cpp") + +rpp_add_proto_target(doyxgen_grpc_proto protocol.proto) + +foreach(SOURCE ${FILES}) + get_filename_component(BASE_NAME ${SOURCE} NAME_WE) + set(TARGET ${BASE_NAME}_doxygen_sample) + add_executable(${TARGET} ${SOURCE}) + target_link_libraries(${TARGET} PRIVATE rpp rppgrpc doyxgen_grpc_proto) + set_target_properties(${TARGET} PROPERTIES FOLDER Examples/rppqt/Doxygen) +endforeach() diff --git a/src/examples/rppgrpc/doxygen/client_reactor.cpp b/src/examples/rppgrpc/doxygen/client_reactor.cpp new file mode 100644 index 000000000..db5562478 --- /dev/null +++ b/src/examples/rppgrpc/doxygen/client_reactor.cpp @@ -0,0 +1,60 @@ +#include + +#include +#include + +#include "protocol.grpc.pb.h" +/** + * \example client_reactor.cpp + **/ + +int main() // NOLINT(bugprone-exception-escape) +{ + { + //! [bidi_reactor] + auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); + auto stub = TestService::NewStub(channel); + + grpc::ClientContext ctx{}; + const auto reactor = new rppgrpc::client_bidi_reactor(); + stub->async()->Bidirectional(&ctx, reactor); + reactor->get_observable().subscribe([](const Response&) {}); + + reactor->init(); + + reactor->get_observer().on_next(Request{}); + //! [bidi_reactor] + } + { + //! [read_reactor] + auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); + auto stub = TestService::NewStub(channel); + + grpc::ClientContext ctx{}; + const auto reactor = new rppgrpc::client_read_reactor(); + Request req{}; + stub->async()->ServerSide(&ctx, &req, reactor); + reactor->get_observable().subscribe([](const Response&) {}); + + reactor->init(); + //! [read_reactor] + } + { + //! [write_reactor] + auto channel = grpc::CreateChannel("localhost:50051", grpc::InsecureChannelCredentials()); + auto stub = TestService::NewStub(channel); + + grpc::ClientContext ctx{}; + const auto reactor = new rppgrpc::client_write_reactor(); + Response resp{}; + stub->async()->ClientSide(&ctx, &resp, reactor); + reactor->get_observable().subscribe([](const rpp::utils::none&) {}); + + reactor->init(); + + reactor->get_observer().on_next(Request{}); + //! [write_reactor] + } + + return 0; +} diff --git a/src/examples/rppgrpc/doxygen/protocol.proto b/src/examples/rppgrpc/doxygen/protocol.proto new file mode 100644 index 000000000..8703bde05 --- /dev/null +++ b/src/examples/rppgrpc/doxygen/protocol.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message Request { + string Value = 1; +} + +message Response { + string Value = 1; +} + +service TestService { + rpc ServerSide(Request) returns (stream Response) {} + rpc ClientSide(stream Request) returns (Response) {} + rpc Bidirectional(stream Request) returns (stream Response) {} +} diff --git a/src/extensions/CMakeLists.txt b/src/extensions/CMakeLists.txt index 63cf503d3..397ebb5b2 100644 --- a/src/extensions/CMakeLists.txt +++ b/src/extensions/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(rppqt) +add_subdirectory(rppgrpc) diff --git a/src/extensions/rppgrpc/CMakeLists.txt b/src/extensions/rppgrpc/CMakeLists.txt new file mode 100644 index 000000000..68720d72c --- /dev/null +++ b/src/extensions/rppgrpc/CMakeLists.txt @@ -0,0 +1,11 @@ +# ReactivePlusPlus library +# +# Copyright Aleksey Loginov 2022 - present. +# Distributed under the Boost Software License, Version 1.0. +# (See accompanying file LICENSE_1_0.txt or copy at +# https://www.boost.org/LICENSE_1_0.txt) +# +# Project home: https://github.com/victimsnino/ReactivePlusPlus +# + +rpp_add_library(rppgrpc) diff --git a/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp new file mode 100644 index 000000000..76a70c8ea --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/client_reactor.hpp @@ -0,0 +1,297 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +#include +#include +#include + +#include + +namespace rppgrpc +{ + /** + * @brief RPP's based implementation for grpc client bidirectional reactor. + * @details To use it you need: + * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - pass it to `stub->async()->GrpcBidirectionalStream(ctx, reactor);` + * - call `reactor->init()` method for actual starting of grpc logic + * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + */ + template + class client_bidi_reactor final : public grpc::ClientBidiReactor + { + using Base = grpc::ClientBidiReactor; + + public: + client_bidi_reactor() + { + m_requests.get_observable().subscribe( + [this] T>(T&& message) { + std::lock_guard lock{m_write_mutex}; + m_write.push_back(std::forward(message)); + if (m_write.size() == 1) + Base::StartWrite(&m_write.front()); + }, + [this](const std::exception_ptr&) { + std::lock_guard lock{m_write_mutex}; + m_finished = true; + + if (m_write.size() == 0) + Base::StartWritesDone(); + }, + [this]() { + std::lock_guard lock{m_write_mutex}; + m_finished = true; + + if (m_write.size() == 0) + Base::StartWritesDone(); + }); + } + + void init() + { + Base::StartCall(); + Base::StartRead(&m_read); + } + + auto get_observer() + { + return m_requests.get_observer(); + } + + auto get_observable() + { + return m_observer.get_observable(); + } + + private: + using Base::StartCall; + using Base::StartRead; + + void OnReadDone(bool ok) override + { + if (!ok) + return; + + m_observer.get_observer().on_next(m_read); + Base::StartRead(&m_read); + } + + void OnWriteDone(bool ok) override + { + if (!ok) + return; + + std::lock_guard lock{m_write_mutex}; + m_write.pop_front(); + + if (!m_write.empty()) + { + Base::StartWrite(&m_write.front()); + } + else if (m_finished) + { + Base::StartWritesDone(); + } + } + + void OnDone(const grpc::Status& s) override + { + m_requests.get_disposable().dispose(); + + if (s.ok()) + { + m_observer.get_observer().on_completed(); + } + else + { + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + } + delete this; + } + + private: + rpp::subjects::serialized_publish_subject m_requests{}; + + rpp::subjects::publish_subject m_observer; + Response m_read{}; + + std::mutex m_write_mutex{}; + std::deque m_write{}; + bool m_finished{}; + }; + + /** + * @brief RPP's based implementation for grpc client write reactor + * @details To use it you need: + * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - pass it to `stub->async()->GrpcWriteStream(ctx, &request, reactor);` + * - call `reactor->init()` method for actual starting of grpc logic + * - to pass values TO stream you can emit values to observer obtained via `reactor->get_observer()` + * - reactor provides `reactor->get_observable()` method but such as observable emits nothing and can be used only to be notified about completion/error + */ + template + class client_write_reactor final : public grpc::ClientWriteReactor + { + using Base = grpc::ClientWriteReactor; + + public: + client_write_reactor() + { + m_requests.get_observable().subscribe( + [this] T>(T&& message) { + std::lock_guard lock{m_write_mutex}; + m_write.push_back(std::forward(message)); + if (m_write.size() == 1) + Base::StartWrite(&m_write.front()); + }, + [this](const std::exception_ptr&) { + std::lock_guard lock{m_write_mutex}; + m_finished = true; + + if (m_write.size() == 0) + Base::StartWritesDone(); + }, + [this]() { + std::lock_guard lock{m_write_mutex}; + m_finished = true; + + if (m_write.size() == 0) + Base::StartWritesDone(); + }); + } + + void init() + { + Base::StartCall(); + } + + auto get_observer() + { + return m_requests.get_observer(); + } + + auto get_observable() + { + return m_observer.get_observable(); + } + + private: + using Base::StartCall; + + void OnWriteDone(bool ok) override + { + if (!ok) + return; + + std::lock_guard lock{m_write_mutex}; + m_write.pop_front(); + + if (!m_write.empty()) + { + Base::StartWrite(&m_write.front()); + } + else if (m_finished) + { + Base::StartWritesDone(); + } + } + + void OnDone(const grpc::Status& s) override + { + m_requests.get_disposable().dispose(); + + if (s.ok()) + { + m_observer.get_observer().on_completed(); + } + else + { + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + } + delete this; + } + + private: + rpp::subjects::serialized_publish_subject m_requests{}; + rpp::subjects::publish_subject m_observer; + + std::mutex m_write_mutex{}; + std::deque m_write{}; + bool m_finished{}; + }; + + /** + * @brief RPP's based implementation for grpc client read reactor. + * @details To use it you need: + * - create it via `new` operator OR be sure it is alive while it is used inside grpc. + * - pass it to `stub->async()->GrpcReadStream(ctx, &response, reactor);` + * - call `reactor->init()` method for actual starting of grpc logic + * - to access values FROM stream you can subscribe to observable obtained via `reactor->get_observable()` (same observable WOULD emit on_completed in case of successful stream termination and on_error in case of some errors with grpc stream) + */ + template + class client_read_reactor final : public grpc::ClientReadReactor + { + using Base = grpc::ClientReadReactor; + + public: + client_read_reactor() + { + } + + void init() + { + Base::StartCall(); + Base::StartRead(&m_read); + } + + auto get_observable() + { + return m_observer.get_observable(); + } + + private: + using Base::StartCall; + using Base::StartRead; + + void OnReadDone(bool ok) override + { + if (!ok) + return; + + m_observer.get_observer().on_next(m_read); + Base::StartRead(&m_read); + } + + void OnDone(const grpc::Status& s) override + { + if (s.ok()) + { + m_observer.get_observer().on_completed(); + } + else + { + m_observer.get_observer().on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{s.error_message()})); + } + delete this; + } + + private: + rpp::subjects::publish_subject m_observer; + Response m_read{}; + }; +} // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/fwd.hpp b/src/extensions/rppgrpc/rppgrpc/fwd.hpp new file mode 100644 index 000000000..d807537f8 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/fwd.hpp @@ -0,0 +1,30 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +/** + * @defgroup rppgrpc RPPGRPC + * @brief RppGrpc is extension of RPP which enables support of grpc library. + */ + +namespace rppgrpc +{ + template + class client_bidi_reactor; + + template + class client_write_reactor; + + template + class client_read_reactor; +} // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp new file mode 100644 index 000000000..78306d613 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/rppgrpc.hpp @@ -0,0 +1,15 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include +#include +// #include diff --git a/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp new file mode 100644 index 000000000..44f7689cb --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/server_reactor.hpp @@ -0,0 +1,113 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include + +#include +#include +#include + +#include + +namespace rppgrpc::details +{ + template + class server_bidi_reactor final : public grpc::ServerBidiReactor, Response> + { + using Request = rpp::utils::extract_observer_type_t; + using Base = grpc::ServerBidiReactor; + + public: + template Observable, rpp::constraint::decayed_same_as TObserver> + server_bidi_reactor(const Observable& messages, TObserver&& events) + : m_observer{std::forward(events)} + , m_disposable{messages.subscribe_with_disposable([this] T>(T&& message) { + std::lock_guard lock{m_write_mutex}; + m_write.push_back(std::forward(message)); + if (m_write.size() == 1) + Base::StartWrite(&m_write.front()); }, + [this](const std::exception_ptr&) { + Base::Finish(grpc::Status{grpc::StatusCode::INTERNAL, "Internal error happens"}); + }, + [this]() { + Base::Finish(grpc::Status::OK); + })} + { + Base::StartSendInitialMetadata(); + Base::StartRead(&m_read); + } + + private: + void OnReadDone(bool ok) override + { + if (!ok) + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnReadDone is not ok"})); + Base::Finish(grpc::Status::CANCELLED); + return; + } + m_observer.on_next(m_read); + Base::StartRead(&m_read); + } + + void OnWriteDone(bool ok) override + { + if (!ok) + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnWriteDone is not ok"})); + Base::Finish(grpc::Status::CANCELLED); + return; + } + + std::lock_guard lock{m_write_mutex}; + m_write.pop_front(); + + if (!m_write.empty()) + { + Base::StartWrite(&m_write.front()); + } + } + + void OnDone() override + { + m_observer.on_completed(); + Destroy(); + } + + void OnCancel() override + { + m_observer.on_error(std::make_exception_ptr(rppgrpc::utils::reactor_failed{"OnCancel called"})); + Base::Finish(grpc::Status::CANCELLED); + } + + private: + void Destroy() + { + m_disposable.dispose(); + delete this; + } + + private: + Observer m_observer; + rpp::disposable_wrapper m_disposable; + + Request m_read{}; + + std::mutex m_write_mutex{}; + std::deque m_write{}; + }; +} // namespace rppgrpc::details +namespace rppgrpc +{ +} // namespace rppgrpc diff --git a/src/extensions/rppgrpc/rppgrpc/utils/exceptions.hpp b/src/extensions/rppgrpc/rppgrpc/utils/exceptions.hpp new file mode 100644 index 000000000..6e3124496 --- /dev/null +++ b/src/extensions/rppgrpc/rppgrpc/utils/exceptions.hpp @@ -0,0 +1,22 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +namespace rppgrpc::utils +{ + struct reactor_failed : public std::runtime_error + { + using std::runtime_error::runtime_error; + }; + +} // namespace rppgrpc::utils diff --git a/src/rpp/rpp/subjects/behavior_subject.hpp b/src/rpp/rpp/subjects/behavior_subject.hpp index 261548c88..3ab5dc960 100644 --- a/src/rpp/rpp/subjects/behavior_subject.hpp +++ b/src/rpp/rpp/subjects/behavior_subject.hpp @@ -17,7 +17,6 @@ #include #include -#include #include namespace rpp::subjects::details diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 855142c86..f28168ae5 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -24,6 +24,11 @@ macro(add_test_target target_name module files) rpp_add_qt_support_to_executable(${TARGET}) endif() + if (${module} STREQUAL rppgrpc) + rpp_add_proto_target(${TARGET}_proto rppgrpc/proto.proto) + target_link_libraries(${TARGET} PRIVATE ${TARGET}_proto) + endif() + target_compile_features(${TARGET} PRIVATE cxx_std_20) if(MSVC) @@ -50,3 +55,7 @@ rpp_register_tests(rpp) if (RPP_BUILD_QT_CODE) rpp_register_tests(rppqt) endif() + +if (RPP_BUILD_GRPC_CODE) + rpp_register_tests(rppgrpc) +endif() diff --git a/src/tests/rppgrpc/proto.proto b/src/tests/rppgrpc/proto.proto new file mode 100644 index 000000000..a7e1c0f32 --- /dev/null +++ b/src/tests/rppgrpc/proto.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +message Request { + uint32 Value = 1; +} + +message Response { + uint32 Value = 1; +} + +service TestService { + rpc ServerSide(Request) returns (stream Response) {} + rpc ClientSide(stream Request) returns (Response) {} + rpc Bidirectional(stream Request) returns (stream Response) {} +} diff --git a/src/tests/rppgrpc/test_async_client.cpp b/src/tests/rppgrpc/test_async_client.cpp new file mode 100644 index 000000000..880314d96 --- /dev/null +++ b/src/tests/rppgrpc/test_async_client.cpp @@ -0,0 +1,415 @@ +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "rpp_trompeloil.hpp" + +#include + +struct service : public trompeloeil::mock_interface +{ + IMPLEMENT_MOCK3(ServerSide); + IMPLEMENT_MOCK3(ClientSide); + IMPLEMENT_MOCK2(Bidirectional); +}; + +void wait(const std::unique_ptr& e) +{ + while (!e->is_satisfied()) + { + std::this_thread::sleep_for(std::chrono::seconds{1}); + } +} + +TEST_CASE("async client reactor") +{ + grpc::ServerBuilder builder{}; + trompeloeil::sequence s{}; + + auto mock_service = std::make_unique(); + + builder.RegisterService(mock_service.get()); + + auto server(builder.BuildAndStart()); + const auto channel = server->InProcessChannel({}); + + const auto stub = TestService::NewStub(channel, {}); + + rpp::subjects::publish_subject subj{}; + mock_observer out_mock{}; + + SECTION("bidirectional") + { + grpc::ClientContext ctx{}; + + const auto bidi_reactor = new rppgrpc::client_bidi_reactor(); + bidi_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + subj.get_observable() | rpp::ops::map([](int v) { Request request{}; request.set_value(v); return request; }) | rpp::ops::subscribe(bidi_reactor->get_observer()); + + stub->async()->Bidirectional(&ctx, bidi_reactor); + SECTION("no stream job - completion") + { + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("error status - error") + { + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("manual server-side cancel") + { + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + ctx.TryCancel(); + }); + + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + + SECTION("manual client-side completion") + { + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Request request{}; + while (_2->Read(&request)) + { + } + }); + + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + wait(last); + t.join(); + } + + SECTION("client-side write + completion") + { + std::promise> results{}; + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + std::vector reads{}; + Request request{}; + while (_2->Read(&request)) + { + reads.push_back(request.value()); + } + results.set_value(reads); + }); + + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + + subj.get_observer().on_next(1); + subj.get_observer().on_next(2); + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + auto f = results.get_future(); + REQUIRE(f.wait_for(std::chrono::seconds{1}) == std::future_status::ready); + CHECK(f.get() == std::vector{1, 2}); + + wait(last); + t.join(); + } + + SECTION("client-side read + completion") + { + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Response response{}; + + for (int v : {1, 2, 3}) + { + response.set_value(v); + _2->Write(response); + } + }); + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + + REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + + SECTION("client-side read-write + completeion") + { + REQUIRE_CALL(*mock_service, Bidirectional(trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Request request{}; + while (_2->Read(&request)) + { + Response response{}; + response.set_value(request.value() * 10); + _2->Write(response); + } + }); + + auto t = std::thread{[&] { + bidi_reactor->init(); + }}; + + REQUIRE_CALL(*out_mock, on_next_rvalue(10)).IN_SEQUENCE(s); + subj.get_observer().on_next(1); + + REQUIRE_CALL(*out_mock, on_next_rvalue(20)).IN_SEQUENCE(s); + subj.get_observer().on_next(2); + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + wait(last); + t.join(); + } + } + SECTION("server-side") + { + grpc::ClientContext ctx{}; + + const auto read_reactor = new rppgrpc::client_read_reactor(); + read_reactor->get_observable() | rpp::ops::map([](const Response& out) { return out.value(); }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + + SECTION("empty request") + { + stub->async()->ServerSide(&ctx, nullptr, read_reactor); + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + wait(last); + t.join(); + } + SECTION("normal request") + { + Request r{}; + stub->async()->ServerSide(&ctx, &r, read_reactor); + + SECTION("no stream job - completion") + { + REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("error status - error") + { + REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("manual server-side cancel") + { + REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + ctx.TryCancel(); + }); + + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + + SECTION("client-side read + completion") + { + REQUIRE_CALL(*mock_service, ServerSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Response response{}; + + for (int v : {1, 2, 3}) + { + response.set_value(v); + _3->Write(response); + } + }); + auto t = std::thread{[&] { + read_reactor->init(); + }}; + + REQUIRE_CALL(*out_mock, on_next_rvalue(1)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(2)).IN_SEQUENCE(s); + REQUIRE_CALL(*out_mock, on_next_rvalue(3)).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + } + } + SECTION("client-side") + { + grpc::ClientContext ctx{}; + + const auto write_reactor = new rppgrpc::client_write_reactor(); + write_reactor->get_observable() | rpp::ops::map([](const rpp::utils::none& out) { return 0; }) | rpp::ops::observe_on(rpp::schedulers::new_thread{}) | rpp::ops::subscribe(out_mock); + subj.get_observable() | rpp::ops::map([](int v) { Request request{}; request.set_value(v); return request; }) | rpp::ops::subscribe(write_reactor->get_observer()); + + Response r{}; + stub->async()->ClientSide(&ctx, &r, write_reactor); + SECTION("no stream job - completion") + { + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::OK).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("error status - error") + { + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)).RETURN(grpc::Status::CANCELLED).IN_SEQUENCE(s); + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + wait(last); + t.join(); + } + + SECTION("manual server-side cancel") + { + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + ctx.TryCancel(); + }); + + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_error(trompeloeil::_)).IN_SEQUENCE(s); + + wait(last); + t.join(); + } + + SECTION("manual client-side completion") + { + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + Request request{}; + while (_2->Read(&request)) + { + } + }); + + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + wait(last); + t.join(); + } + + SECTION("client-side write + completion") + { + std::promise> results{}; + REQUIRE_CALL(*mock_service, ClientSide(trompeloeil::_, trompeloeil::_, trompeloeil::_)) + .RETURN(grpc::Status::OK) + .LR_SIDE_EFFECT({ + std::vector reads{}; + Request request{}; + while (_2->Read(&request)) + { + reads.push_back(request.value()); + } + results.set_value(reads); + }); + + auto t = std::thread{[&] { + write_reactor->init(); + }}; + + subj.get_observer().on_next(1); + subj.get_observer().on_next(2); + + const auto last = NAMED_REQUIRE_CALL(*out_mock, on_completed()).IN_SEQUENCE(s); + subj.get_observer().on_completed(); + + auto f = results.get_future(); + REQUIRE(f.wait_for(std::chrono::seconds{1}) == std::future_status::ready); + CHECK(f.get() == std::vector{1, 2}); + + wait(last); + t.join(); + } + } + server->Shutdown(); + server->Wait(); +}