Skip to content

Commit

Permalink
Add UDP Sender for Testing (#1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Teddy-van-Jerry committed Aug 16, 2023
1 parent 5989cb5 commit b17dd74
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 0 deletions.
15 changes: 15 additions & 0 deletions scripts/self_test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/bin/sh

exe_name="bin/m2c-udp-process"

# First setup Python UDP packets sender
python3 tests/udp_sender.py 127.0.0.1 1234 -q -i0.1 &
python_pid=$!

# Then run the test
${exe_name} r tests/self.yml -V # verbose mode

# Kill the background Python script process
kill ${python_pid}

echo "Test finished."
40 changes: 40 additions & 0 deletions tests/udp_sender.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# UDP packet sender for testing purposes
# Usage: python3 udp_sender.py <host> <port> [-q] [-i <interval>]
#
# Author: Wuqiong Zhao ([email protected])
# Date: 2023-08-16

import socket
import time
import argparse

message = "#" + "0" * 1039 # 1040 bytes

def send_udp_packet(host, port, data, quiet=False):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.sendto(data.encode(), (host, port))
if not quiet:
print(f"Sent packet with length {len(data)} to {host}:{port}")
except Exception as e:
print(f"Error sending UDP packet: {e}")
finally:
sock.close()

def main():
parser = argparse.ArgumentParser(description="Send UDP packets at regular intervals")
parser.add_argument("host", help="Destination host IP address")
parser.add_argument("port", type=int, help="Destination port number")
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress packet information printing")
parser.add_argument("-i", "--interval", type=float, default=0.1, help="Packet sending interval in seconds")
args = parser.parse_args()

try:
while True:
send_udp_packet(args.host, args.port, message, args.quiet)
time.sleep(args.interval)
except KeyboardInterrupt:
print("\nUDP packet sending stopped.")

if __name__ == "__main__":
main()

0 comments on commit b17dd74

Please sign in to comment.