From b17dd7457942604a7c37607c29a0684b59ac9aee Mon Sep 17 00:00:00 2001 From: Teddy van Jerry Date: Wed, 16 Aug 2023 11:28:15 -0700 Subject: [PATCH] Add UDP Sender for Testing (#1) --- scripts/self_test.sh | 15 +++++++++++++++ tests/udp_sender.py | 40 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100755 scripts/self_test.sh create mode 100644 tests/udp_sender.py diff --git a/scripts/self_test.sh b/scripts/self_test.sh new file mode 100755 index 0000000..85b2b0a --- /dev/null +++ b/scripts/self_test.sh @@ -0,0 +1,15 @@ +#!/bin/sh + +exe_name="bin/m2c-udp-process" + +# First setup Python UDP packets sender +python3 tests/udp_sender.py 127.0.0.1 1234 -q -i0.1 & +python_pid=$! + +# Then run the test +${exe_name} r tests/self.yml -V # verbose mode + +# Kill the background Python script process +kill ${python_pid} + +echo "Test finished." diff --git a/tests/udp_sender.py b/tests/udp_sender.py new file mode 100644 index 0000000..b3fd998 --- /dev/null +++ b/tests/udp_sender.py @@ -0,0 +1,40 @@ +# UDP packet sender for testing purposes +# Usage: python3 udp_sender.py [-q] [-i ] +# +# Author: Wuqiong Zhao (me@wqzhao.org) +# Date: 2023-08-16 + +import socket +import time +import argparse + +message = "#" + "0" * 1039 # 1040 bytes + +def send_udp_packet(host, port, data, quiet=False): + try: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.sendto(data.encode(), (host, port)) + if not quiet: + print(f"Sent packet with length {len(data)} to {host}:{port}") + except Exception as e: + print(f"Error sending UDP packet: {e}") + finally: + sock.close() + +def main(): + parser = argparse.ArgumentParser(description="Send UDP packets at regular intervals") + parser.add_argument("host", help="Destination host IP address") + parser.add_argument("port", type=int, help="Destination port number") + parser.add_argument("-q", "--quiet", action="store_true", help="Suppress packet information printing") + parser.add_argument("-i", "--interval", type=float, default=0.1, help="Packet sending interval in seconds") + args = parser.parse_args() + + try: + while True: + send_udp_packet(args.host, args.port, message, args.quiet) + time.sleep(args.interval) + except KeyboardInterrupt: + print("\nUDP packet sending stopped.") + +if __name__ == "__main__": + main()