Skip to content

Commit

Permalink
Enable Data Flow Tracking
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Chang <[email protected]>
  • Loading branch information
mocsharp committed Nov 14, 2024
1 parent 7b9cca6 commit d74f624
Show file tree
Hide file tree
Showing 14 changed files with 197 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,20 @@
cmake_minimum_required(VERSION 3.20)
project(grpc_endoscopy_tool_tracking LANGUAGES NONE)

# Download the endoscopy sample data
if(HOLOHUB_DOWNLOAD_DATASETS)
include(holoscan_download_data)
holoscan_download_data(endoscopy
URL https://api.ngc.nvidia.com/v2/resources/nvidia/clara-holoscan/holoscan_endoscopy_sample_data/versions/20230222/zip
DOWNLOAD_NAME holoscan_endoscopy_sample_data_20230222.zip
URL_MD5 d54f84a562d29ed560a87d2607eba973
DOWNLOAD_DIR ${HOLOHUB_DATA_DIR}
GENERATE_GXF_ENTITIES
GXF_ENTITIES_HEIGHT 480
GXF_ENTITIES_WIDTH 854
GXF_ENTITIES_CHANNELS 3
GXF_ENTITIES_FRAMERATE 30
)
endif()

add_subdirectory(cpp)
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,76 @@ The data is automatically downloaded when building the application.

* Building and running the application from the top level Holohub directory:


### Configuration

The Edge application runs in a single-fragment mode by default. However, it can be configured to run in a mult-fragment mode, as in the picture above.

To switch to multi-fragment mode, edit the [endoscopy_tool_tracking.yaml](./cpp/endoscopy_tool_tracking.yaml) YAML file and change `multifragment` to `true`.

```yaml

application:
multifragment: false
benchmarking: false
```
[Data Flow Tracking](https://docs.nvidia.com/holoscan/sdk-user-guide/flow_tracking.html) can also be enabled by editing the [endoscopy_tool_tracking.yaml](./cpp/endoscopy_tool_tracking.yaml) YAML file and change `benchmarking` to `true`. This enables the built-in mechanism to profile the application and analyze the fine-grained timing properties and data flow between operators.

For example, on the server side, when a client disconnects, it will output the results for that session:

```bash
Data Flow Tracking Results:
Total paths: 1
Path 1: grpc_request_op,format_converter,lstm_inferer,tool_tracking_postprocessor,grpc_response_op
Number of messages: 663
Min Latency Message No: 249
Min end-to-end Latency (ms): 1.868
Avg end-to-end Latency (ms): 2.15161
Max Latency Message No: 371
Max end-to-end Latency (ms): 4.19
Number of source messages [format: source operator->transmitter name: number of messages]:
grpc_request_op->output: 683
```

Similarly, on the client side, when it completes playing the video, it will print the results:

```bash
Data Flow Tracking Results:
Total paths: 3
Path 1: incoming_responses,visualizer_op
Number of messages: 663
Min Latency Message No: 249
Min end-to-end Latency (ms): 0.214
Avg end-to-end Latency (ms): 0.374005
Max Latency Message No: 378
Max end-to-end Latency (ms): 2.751
Path 2: replayer,outgoing_requests
Number of messages: 663
Min Latency Message No: 379
Min end-to-end Latency (ms): 24.854
Avg end-to-end Latency (ms): 27.1886
Max Latency Message No: 142
Max end-to-end Latency (ms): 28.003
Path 3: replayer,visualizer_op
Number of messages: 663
Min Latency Message No: 372
Min end-to-end Latency (ms): 30.966
Avg end-to-end Latency (ms): 33.325
Max Latency Message No: 397
Max end-to-end Latency (ms): 35.479
Number of source messages [format: source operator->transmitter name: number of messages]:
incoming_responses->output: 683
replayer->output: 683
```


### C++

```bash
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ add_executable(grpc_endoscopy_tool_tracking_cloud

add_executable(grpc_endoscopy_tool_tracking_edge
edge/app_edge_main.cpp
edge/app_edge.hpp
edge/app_edge_single_fragment.hpp
edge/app_edge_multi_fragment.hpp
edge/video_input_fragment.hpp
edge/viz_fragment.hpp
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ void signal_handler(int signum) {
myThread.join();
}

/** Helper function to parse benchmarking setting from the configuration file */
void parse_config(const std::string& config_path, bool& benchmarking) {
auto config = holoscan::Config(config_path);
auto& yaml_nodes = config.yaml_nodes();
for (const auto& yaml_node : yaml_nodes) {
try {
auto application = yaml_node["application"];
if (application.IsMap()) { benchmarking = application["benchmarking"].as<bool>(); }
} catch (std::exception& e) {
HOLOSCAN_LOG_ERROR("Error parsing configuration file: {}", e.what());
benchmarking = false;
}
}
}

/** Main function */
/**
* @file app_cloud_main.cpp
Expand Down Expand Up @@ -127,12 +142,15 @@ int main(int argc, char** argv) {
}
}

bool benchmarking = false;
parse_config(config_path, benchmarking);

// Register each gRPC service with a Holoscan application:
// - the callback function (create_application_instance_func) is used to create a new instance of
// the application when a new RPC call is received.
ApplicationFactory::get_instance()->register_application(
"EntityStream",
[config_path, data_directory](
[config_path, data_directory, benchmarking](
std::queue<std::shared_ptr<nvidia::gxf::Entity>> incoming_request_queue,
std::queue<std::shared_ptr<EntityResponse>>
outgoing_response_queue) {
Expand All @@ -141,6 +159,9 @@ int main(int argc, char** argv) {
incoming_request_queue, outgoing_response_queue);
application_instance.instance->config(config_path);
application_instance.instance->set_data_path(data_directory);
if (benchmarking) {
application_instance.tracker = &application_instance.instance->track();
}
application_instance.future = application_instance.instance->run_async();
return application_instance;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
* limitations under the License.
*/

#ifndef GRPC_H264_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP
#define GRPC_H264_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP
#ifndef GRPC_GRPC_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP
#define GRPC_GRPC_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP

#include <fmt/format.h>
#include <holoscan/holoscan.hpp>
Expand Down Expand Up @@ -102,4 +102,4 @@ class GrpcService {
};
} // namespace holohub::grpc_h264_endoscopy_tool_tracking

#endif /* GRPC_H264_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP */
#endif /* GRPC_GRPC_ENDOSCOPY_TOOL_TRACKING_CPP_CLOUD_GRPC_SERVICE_HPP */
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "app_edge_single_fragment.hpp"

// Run the edge app with two fragments
// #include "app_edge.hpp"
#include "app_edge_multi_fragment.hpp"

using namespace holoscan;
using namespace holohub::grpc_h264_endoscopy_tool_tracking;
Expand Down Expand Up @@ -55,6 +55,25 @@ bool parse_arguments(int argc, char** argv, std::string& data_path, std::string&
return true;
}

/** Helper function to parse fragment mode and benchmarking settings from the configuration file */
void parse_config(const std::string& config_path, bool& multi_fragment_mode, bool& benchmarking) {
auto config = holoscan::Config(config_path);
auto& yaml_nodes = config.yaml_nodes();
for (const auto& yaml_node : yaml_nodes) {
try {
auto application = yaml_node["application"];
if (application.IsMap()) {
multi_fragment_mode = application["multifragment"].as<bool>();
benchmarking = application["benchmarking"].as<bool>();
}
} catch (std::exception& e) {
HOLOSCAN_LOG_ERROR("Error parsing configuration file: {}", e.what());
multi_fragment_mode = false;
benchmarking = false;
}
}
}

/** Main function */
/**
* @file app_edge_main.cpp
Expand Down Expand Up @@ -110,13 +129,50 @@ int main(int argc, char** argv) {
}
}

auto app = holoscan::make_application<AppEdge>();
bool multi_fragment_mode = false;
bool benchmarking = false;
parse_config(config_path, multi_fragment_mode, benchmarking);
if (multi_fragment_mode) {
HOLOSCAN_LOG_INFO("Running application in multi-fragment mode");
auto app = holoscan::make_application<AppEdgeMultiFragment>();

HOLOSCAN_LOG_INFO("Using configuration file from {}", config_path);
app->config(config_path);

HOLOSCAN_LOG_INFO("Using input data from {}", data_directory);
app->set_datapath(data_directory);

std::unordered_map<std::string, DataFlowTracker*> trackers;
if (benchmarking) {
HOLOSCAN_LOG_INFO("Benchmarking enabled");
trackers = app->track_distributed();
}

app->run();

if (benchmarking) {
for (const auto& [name, tracker] : trackers) {
std::cout << "Fragment: " << name << std::endl;
tracker->print();
}
}
} else {
HOLOSCAN_LOG_INFO("Running application in single fragment mode");
auto app = holoscan::make_application<AppEdgeSingleFragment>();

HOLOSCAN_LOG_INFO("Using configuration file from {}", config_path);
app->config(config_path);

HOLOSCAN_LOG_INFO("Using configuration file from {}", config_path);
app->config(config_path);
HOLOSCAN_LOG_INFO("Using input data from {}", data_directory);
app->set_datapath(data_directory);

HOLOSCAN_LOG_INFO("Using input data from {}", data_directory);
app->set_datapath(data_directory);
app->run();
DataFlowTracker* tracker = nullptr;
if (benchmarking) {
HOLOSCAN_LOG_INFO("Benchmarking enabled");
tracker = &app->track();
}
app->run();
if (benchmarking) { tracker->print(); }
}
return 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace holohub::grpc_h264_endoscopy_tool_tracking {
using namespace holoscan;

/**
* @class AppEdge
* @class AppEdgeMultiFragment
* @brief A two-fragment application for the H.264 endoscopy tool tracking application.
*
* This class inherits from the holoscan::Application and is a client application offloads the
Expand All @@ -39,9 +39,9 @@ using namespace holoscan;
* two systems, separating the input from the visualization. For example, a video surveillance
* camera capturing and streaming input to another system displaying the footage.
*/
class AppEdge : public holoscan::Application {
class AppEdgeMultiFragment : public holoscan::Application {
public:
explicit AppEdge(const std::vector<std::string>& argv = {}) : Application(argv) {}
explicit AppEdgeMultiFragment(const std::vector<std::string>& argv = {}) : Application(argv) {}
void set_datapath(const std::string& path) { datapath_ = path; }

void compose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace holohub::grpc_h264_endoscopy_tool_tracking {
using namespace holoscan;

/**
* @class AppEdge
* @class AppEdgeSingleFragment
* @brief A two-fragment application for the H.264 endoscopy tool tracking application.
*
* This class inherits from the holoscan::Application and is a client application offloads the
Expand All @@ -40,10 +40,10 @@ using namespace holoscan;
* on two systems, separating the input from the visualization. For example, a video surveillance
* camera capturing and streaming input to another system displaying the footage.
*/
class AppEdge : public holoscan::Application {
class AppEdgeSingleFragment : public holoscan::Application {
public:
explicit AppEdge(const std::vector<std::string>& argv = {}) : Application(argv) {}
~AppEdge() {
explicit AppEdgeSingleFragment(const std::vector<std::string>& argv = {}) : Application(argv) {}
~AppEdgeSingleFragment() {
entity_client_service_->stop_entity_stream();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@
extensions:
- ../../../../../lib/gxf_extensions/libgxf_lstm_tensor_rt_inference.so

application:
title: Endoscopy Tool Tracking - gRPC
version: 1.0
inputFormats: []
outputFormats: ["screen"]
multifragment: false # default: false, true to run in multi-fragment mode, false otherwise
benchmarking: false # default: false, true to enable Data Flow Benchmarking, false otherwise

replayer:
basename: "surgical_video"
frame_rate: 0 # as specified in timestamps
repeat: false # default: false
realtime: false # default: true
realtime: true # default: true
count: 0 # default: 0 (no frame count restriction)

format_converter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ class VideoInputFragment : public holoscan::Fragment {
entity_client_service_ = std::make_shared<EntityClientService>(
from_config("grpc_client.server_address").as<std::string>(),
from_config("grpc_client.rpc_timeout").as<uint32_t>(),
from_config("grpc_client.interrupt").as<bool>(),
request_queue_,
response_queue_,
outgoing_requests);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ grpc_server:
grpc_client:
server_address: localhost:50051
rpc_timeout: 5
interrupt: true

scheduler:
worker_thread_number: 8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
]
},
"run": {
"command": "<holohub_app_bin>/ucx_endoscopy_tool_tracking",
"command": "<holohub_app_bin>/ucx_endoscopy_tool_tracking --data <holohub_data_dir>/endoscopy",
"workdir": "holohub_bin"
}
}
Expand Down
1 change: 1 addition & 0 deletions operators/grpc_operators/server/application_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ void ApplicationFactory::destroy_application_instance(
if (instance.instance == application_instance) {
instance.instance->stop_streaming();
instance.future.wait_for(std::chrono::seconds(1));
if (instance.tracker != nullptr) { instance.tracker->print(); }
application_instances_.erase(service_name);
HOLOSCAN_LOG_INFO("Application instance deleted for {}", service_name);
return;
Expand Down
1 change: 1 addition & 0 deletions operators/grpc_operators/server/application_factory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class HoloscanGrpcApplication;
struct ApplicationInstance {
std::shared_ptr<HoloscanGrpcApplication> instance;
std::future<void> future;
DataFlowTracker* tracker = nullptr;
};

/**
Expand Down

0 comments on commit d74f624

Please sign in to comment.