From d74f624b2de8f2afb606977b56e20abee94ac19d Mon Sep 17 00:00:00 2001 From: Victor Chang Date: Wed, 13 Nov 2024 17:08:43 -0800 Subject: [PATCH] Enable Data Flow Tracking Signed-off-by: Victor Chang --- .../CMakeLists.txt | 16 +++++ .../grpc_endoscopy_tool_tracking/README.md | 70 +++++++++++++++++++ .../cpp/CMakeLists.txt | 3 +- .../cpp/cloud/app_cloud_main.cpp | 23 +++++- .../cpp/cloud/grpc_service.hpp | 6 +- .../cpp/edge/app_edge_main.cpp | 70 +++++++++++++++++-- ...p_edge.hpp => app_edge_multi_fragment.hpp} | 6 +- .../cpp/edge/app_edge_single_fragment.hpp | 8 +-- .../cpp/endoscopy_tool_tracking.yaml | 10 ++- .../cpp/edge/video_input_fragment.hpp | 1 + .../cpp/endoscopy_tool_tracking.yaml | 1 + .../cpp/metadata.json | 2 +- .../server/application_factory.cpp | 1 + .../server/application_factory.hpp | 1 + 14 files changed, 197 insertions(+), 21 deletions(-) rename applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/{app_edge.hpp => app_edge_multi_fragment.hpp} (93%) diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/CMakeLists.txt b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/CMakeLists.txt index 3de3985c..33c69d8d 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/CMakeLists.txt +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/CMakeLists.txt @@ -16,4 +16,20 @@ cmake_minimum_required(VERSION 3.20) project(grpc_endoscopy_tool_tracking LANGUAGES NONE) +# Download the endoscopy sample data +if(HOLOHUB_DOWNLOAD_DATASETS) + include(holoscan_download_data) + holoscan_download_data(endoscopy + URL https://api.ngc.nvidia.com/v2/resources/nvidia/clara-holoscan/holoscan_endoscopy_sample_data/versions/20230222/zip + DOWNLOAD_NAME holoscan_endoscopy_sample_data_20230222.zip + URL_MD5 d54f84a562d29ed560a87d2607eba973 + DOWNLOAD_DIR ${HOLOHUB_DATA_DIR} + GENERATE_GXF_ENTITIES + GXF_ENTITIES_HEIGHT 480 + GXF_ENTITIES_WIDTH 854 + GXF_ENTITIES_CHANNELS 3 + GXF_ENTITIES_FRAMERATE 30 + ) +endif() + add_subdirectory(cpp) diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/README.md b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/README.md index b2edc672..3093c860 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/README.md +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/README.md @@ -35,6 +35,76 @@ The data is automatically downloaded when building the application. * Building and running the application from the top level Holohub directory: + +### Configuration + +The Edge application runs in a single-fragment mode by default. However, it can be configured to run in a mult-fragment mode, as in the picture above. + +To switch to multi-fragment mode, edit the [endoscopy_tool_tracking.yaml](./cpp/endoscopy_tool_tracking.yaml) YAML file and change `multifragment` to `true`. + +```yaml + +application: + multifragment: false + benchmarking: false +``` + +[Data Flow Tracking](https://docs.nvidia.com/holoscan/sdk-user-guide/flow_tracking.html) can also be enabled by editing the [endoscopy_tool_tracking.yaml](./cpp/endoscopy_tool_tracking.yaml) YAML file and change `benchmarking` to `true`. This enables the built-in mechanism to profile the application and analyze the fine-grained timing properties and data flow between operators. + +For example, on the server side, when a client disconnects, it will output the results for that session: + +```bash +Data Flow Tracking Results: +Total paths: 1 + +Path 1: grpc_request_op,format_converter,lstm_inferer,tool_tracking_postprocessor,grpc_response_op +Number of messages: 663 +Min Latency Message No: 249 +Min end-to-end Latency (ms): 1.868 +Avg end-to-end Latency (ms): 2.15161 +Max Latency Message No: 371 +Max end-to-end Latency (ms): 4.19 + +Number of source messages [format: source operator->transmitter name: number of messages]: +grpc_request_op->output: 683 +``` + +Similarly, on the client side, when it completes playing the video, it will print the results: + +```bash +Data Flow Tracking Results: +Total paths: 3 + +Path 1: incoming_responses,visualizer_op +Number of messages: 663 +Min Latency Message No: 249 +Min end-to-end Latency (ms): 0.214 +Avg end-to-end Latency (ms): 0.374005 +Max Latency Message No: 378 +Max end-to-end Latency (ms): 2.751 + +Path 2: replayer,outgoing_requests +Number of messages: 663 +Min Latency Message No: 379 +Min end-to-end Latency (ms): 24.854 +Avg end-to-end Latency (ms): 27.1886 +Max Latency Message No: 142 +Max end-to-end Latency (ms): 28.003 + +Path 3: replayer,visualizer_op +Number of messages: 663 +Min Latency Message No: 372 +Min end-to-end Latency (ms): 30.966 +Avg end-to-end Latency (ms): 33.325 +Max Latency Message No: 397 +Max end-to-end Latency (ms): 35.479 + +Number of source messages [format: source operator->transmitter name: number of messages]: +incoming_responses->output: 683 +replayer->output: 683 +``` + + ### C++ ```bash diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/CMakeLists.txt b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/CMakeLists.txt index eac40128..d60975c5 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/CMakeLists.txt +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/CMakeLists.txt @@ -27,7 +27,8 @@ add_executable(grpc_endoscopy_tool_tracking_cloud add_executable(grpc_endoscopy_tool_tracking_edge edge/app_edge_main.cpp - edge/app_edge.hpp + edge/app_edge_single_fragment.hpp + edge/app_edge_multi_fragment.hpp edge/video_input_fragment.hpp edge/viz_fragment.hpp ) diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/cloud/app_cloud_main.cpp b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/cloud/app_cloud_main.cpp index 30fac352..f9cb5195 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/cloud/app_cloud_main.cpp +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/cloud/app_cloud_main.cpp @@ -70,6 +70,21 @@ void signal_handler(int signum) { myThread.join(); } +/** Helper function to parse benchmarking setting from the configuration file */ +void parse_config(const std::string& config_path, bool& benchmarking) { + auto config = holoscan::Config(config_path); + auto& yaml_nodes = config.yaml_nodes(); + for (const auto& yaml_node : yaml_nodes) { + try { + auto application = yaml_node["application"]; + if (application.IsMap()) { benchmarking = application["benchmarking"].as(); } + } catch (std::exception& e) { + HOLOSCAN_LOG_ERROR("Error parsing configuration file: {}", e.what()); + benchmarking = false; + } + } +} + /** Main function */ /** * @file app_cloud_main.cpp @@ -127,12 +142,15 @@ int main(int argc, char** argv) { } } + bool benchmarking = false; + parse_config(config_path, benchmarking); + // Register each gRPC service with a Holoscan application: // - the callback function (create_application_instance_func) is used to create a new instance of // the application when a new RPC call is received. ApplicationFactory::get_instance()->register_application( "EntityStream", - [config_path, data_directory]( + [config_path, data_directory, benchmarking]( std::queue> incoming_request_queue, std::queue> outgoing_response_queue) { @@ -141,6 +159,9 @@ int main(int argc, char** argv) { incoming_request_queue, outgoing_response_queue); application_instance.instance->config(config_path); application_instance.instance->set_data_path(data_directory); + if (benchmarking) { + application_instance.tracker = &application_instance.instance->track(); + } application_instance.future = application_instance.instance->run_async(); return application_instance; }); diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/cloud/grpc_service.hpp b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/cloud/grpc_service.hpp index 5d1054ae..55279752 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/cloud/grpc_service.hpp +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/cloud/grpc_service.hpp @@ -15,8 +15,8 @@ * limitations under the License. */ -#ifndef GRPC_H264_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP -#define GRPC_H264_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP +#ifndef GRPC_GRPC_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP +#define GRPC_GRPC_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP #include #include @@ -102,4 +102,4 @@ class GrpcService { }; } // namespace holohub::grpc_h264_endoscopy_tool_tracking -#endif /* GRPC_H264_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP */ +#endif /* GRPC_GRPC_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP */ diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_main.cpp b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_main.cpp index d9e81129..16fd04b2 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_main.cpp +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_main.cpp @@ -25,7 +25,7 @@ #include "app_edge_single_fragment.hpp" // Run the edge app with two fragments -// #include "app_edge.hpp" +#include "app_edge_multi_fragment.hpp" using namespace holoscan; using namespace holohub::grpc_h264_endoscopy_tool_tracking; @@ -55,6 +55,25 @@ bool parse_arguments(int argc, char** argv, std::string& data_path, std::string& return true; } +/** Helper function to parse fragment mode and benchmarking settings from the configuration file */ +void parse_config(const std::string& config_path, bool& multi_fragment_mode, bool& benchmarking) { + auto config = holoscan::Config(config_path); + auto& yaml_nodes = config.yaml_nodes(); + for (const auto& yaml_node : yaml_nodes) { + try { + auto application = yaml_node["application"]; + if (application.IsMap()) { + multi_fragment_mode = application["multifragment"].as(); + benchmarking = application["benchmarking"].as(); + } + } catch (std::exception& e) { + HOLOSCAN_LOG_ERROR("Error parsing configuration file: {}", e.what()); + multi_fragment_mode = false; + benchmarking = false; + } + } +} + /** Main function */ /** * @file app_edge_main.cpp @@ -110,13 +129,50 @@ int main(int argc, char** argv) { } } - auto app = holoscan::make_application(); + bool multi_fragment_mode = false; + bool benchmarking = false; + parse_config(config_path, multi_fragment_mode, benchmarking); + if (multi_fragment_mode) { + HOLOSCAN_LOG_INFO("Running application in multi-fragment mode"); + auto app = holoscan::make_application(); + + HOLOSCAN_LOG_INFO("Using configuration file from {}", config_path); + app->config(config_path); + + HOLOSCAN_LOG_INFO("Using input data from {}", data_directory); + app->set_datapath(data_directory); + + std::unordered_map trackers; + if (benchmarking) { + HOLOSCAN_LOG_INFO("Benchmarking enabled"); + trackers = app->track_distributed(); + } + + app->run(); + + if (benchmarking) { + for (const auto& [name, tracker] : trackers) { + std::cout << "Fragment: " << name << std::endl; + tracker->print(); + } + } + } else { + HOLOSCAN_LOG_INFO("Running application in single fragment mode"); + auto app = holoscan::make_application(); + + HOLOSCAN_LOG_INFO("Using configuration file from {}", config_path); + app->config(config_path); - HOLOSCAN_LOG_INFO("Using configuration file from {}", config_path); - app->config(config_path); + HOLOSCAN_LOG_INFO("Using input data from {}", data_directory); + app->set_datapath(data_directory); - HOLOSCAN_LOG_INFO("Using input data from {}", data_directory); - app->set_datapath(data_directory); - app->run(); + DataFlowTracker* tracker = nullptr; + if (benchmarking) { + HOLOSCAN_LOG_INFO("Benchmarking enabled"); + tracker = &app->track(); + } + app->run(); + if (benchmarking) { tracker->print(); } + } return 0; } diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge.hpp b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_multi_fragment.hpp similarity index 93% rename from applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge.hpp rename to applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_multi_fragment.hpp index ef0803bb..f1b17568 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge.hpp +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_multi_fragment.hpp @@ -30,7 +30,7 @@ namespace holohub::grpc_h264_endoscopy_tool_tracking { using namespace holoscan; /** - * @class AppEdge + * @class AppEdgeMultiFragment * @brief A two-fragment application for the H.264 endoscopy tool tracking application. * * This class inherits from the holoscan::Application and is a client application offloads the @@ -39,9 +39,9 @@ using namespace holoscan; * two systems, separating the input from the visualization. For example, a video surveillance * camera capturing and streaming input to another system displaying the footage. */ -class AppEdge : public holoscan::Application { +class AppEdgeMultiFragment : public holoscan::Application { public: - explicit AppEdge(const std::vector& argv = {}) : Application(argv) {} + explicit AppEdgeMultiFragment(const std::vector& argv = {}) : Application(argv) {} void set_datapath(const std::string& path) { datapath_ = path; } void compose() { diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_single_fragment.hpp b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_single_fragment.hpp index ef15ffc9..b354bd22 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_single_fragment.hpp +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/edge/app_edge_single_fragment.hpp @@ -31,7 +31,7 @@ namespace holohub::grpc_h264_endoscopy_tool_tracking { using namespace holoscan; /** - * @class AppEdge + * @class AppEdgeSingleFragment * @brief A two-fragment application for the H.264 endoscopy tool tracking application. * * This class inherits from the holoscan::Application and is a client application offloads the @@ -40,10 +40,10 @@ using namespace holoscan; * on two systems, separating the input from the visualization. For example, a video surveillance * camera capturing and streaming input to another system displaying the footage. */ -class AppEdge : public holoscan::Application { +class AppEdgeSingleFragment : public holoscan::Application { public: - explicit AppEdge(const std::vector& argv = {}) : Application(argv) {} - ~AppEdge() { + explicit AppEdgeSingleFragment(const std::vector& argv = {}) : Application(argv) {} + ~AppEdgeSingleFragment() { entity_client_service_->stop_entity_stream(); } diff --git a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/endoscopy_tool_tracking.yaml b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/endoscopy_tool_tracking.yaml index 2ab62c63..2909fd6c 100644 --- a/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/endoscopy_tool_tracking.yaml +++ b/applications/distributed/grpc/grpc_endoscopy_tool_tracking/cpp/endoscopy_tool_tracking.yaml @@ -17,11 +17,19 @@ extensions: - ../../../../../lib/gxf_extensions/libgxf_lstm_tensor_rt_inference.so +application: + title: Endoscopy Tool Tracking - gRPC + version: 1.0 + inputFormats: [] + outputFormats: ["screen"] + multifragment: false # default: false, true to run in multi-fragment mode, false otherwise + benchmarking: false # default: false, true to enable Data Flow Benchmarking, false otherwise + replayer: basename: "surgical_video" frame_rate: 0 # as specified in timestamps repeat: false # default: false - realtime: false # default: true + realtime: true # default: true count: 0 # default: 0 (no frame count restriction) format_converter: diff --git a/applications/distributed/grpc/grpc_h264_endoscopy_tool_tracking/cpp/edge/video_input_fragment.hpp b/applications/distributed/grpc/grpc_h264_endoscopy_tool_tracking/cpp/edge/video_input_fragment.hpp index a6be0322..f6cbac59 100644 --- a/applications/distributed/grpc/grpc_h264_endoscopy_tool_tracking/cpp/edge/video_input_fragment.hpp +++ b/applications/distributed/grpc/grpc_h264_endoscopy_tool_tracking/cpp/edge/video_input_fragment.hpp @@ -119,6 +119,7 @@ class VideoInputFragment : public holoscan::Fragment { entity_client_service_ = std::make_shared( from_config("grpc_client.server_address").as(), from_config("grpc_client.rpc_timeout").as(), + from_config("grpc_client.interrupt").as(), request_queue_, response_queue_, outgoing_requests); diff --git a/applications/distributed/grpc/grpc_h264_endoscopy_tool_tracking/cpp/endoscopy_tool_tracking.yaml b/applications/distributed/grpc/grpc_h264_endoscopy_tool_tracking/cpp/endoscopy_tool_tracking.yaml index 67ff5483..f04fcbc1 100644 --- a/applications/distributed/grpc/grpc_h264_endoscopy_tool_tracking/cpp/endoscopy_tool_tracking.yaml +++ b/applications/distributed/grpc/grpc_h264_endoscopy_tool_tracking/cpp/endoscopy_tool_tracking.yaml @@ -27,6 +27,7 @@ grpc_server: grpc_client: server_address: localhost:50051 rpc_timeout: 5 + interrupt: true scheduler: worker_thread_number: 8 diff --git a/applications/distributed/ucx/ucx_endoscopy_tool_tracking/cpp/metadata.json b/applications/distributed/ucx/ucx_endoscopy_tool_tracking/cpp/metadata.json index cc675163..0550769b 100644 --- a/applications/distributed/ucx/ucx_endoscopy_tool_tracking/cpp/metadata.json +++ b/applications/distributed/ucx/ucx_endoscopy_tool_tracking/cpp/metadata.json @@ -32,7 +32,7 @@ ] }, "run": { - "command": "/ucx_endoscopy_tool_tracking", + "command": "/ucx_endoscopy_tool_tracking --data /endoscopy", "workdir": "holohub_bin" } } diff --git a/operators/grpc_operators/server/application_factory.cpp b/operators/grpc_operators/server/application_factory.cpp index 8c78c8c1..618922c3 100644 --- a/operators/grpc_operators/server/application_factory.cpp +++ b/operators/grpc_operators/server/application_factory.cpp @@ -70,6 +70,7 @@ void ApplicationFactory::destroy_application_instance( if (instance.instance == application_instance) { instance.instance->stop_streaming(); instance.future.wait_for(std::chrono::seconds(1)); + if (instance.tracker != nullptr) { instance.tracker->print(); } application_instances_.erase(service_name); HOLOSCAN_LOG_INFO("Application instance deleted for {}", service_name); return; diff --git a/operators/grpc_operators/server/application_factory.hpp b/operators/grpc_operators/server/application_factory.hpp index d6efcd71..2f1fa49e 100644 --- a/operators/grpc_operators/server/application_factory.hpp +++ b/operators/grpc_operators/server/application_factory.hpp @@ -50,6 +50,7 @@ class HoloscanGrpcApplication; struct ApplicationInstance { std::shared_ptr instance; std::future future; + DataFlowTracker* tracker = nullptr; }; /**