diff --git a/extensions/eda/plugins/event_source/webhook.py b/extensions/eda/plugins/event_source/webhook.py index f9e2227e..8ae85259 100644 --- a/extensions/eda/plugins/event_source/webhook.py +++ b/extensions/eda/plugins/event_source/webhook.py @@ -12,10 +12,22 @@ certfile: The optional path to a certificate file to enable TLS support keyfile: The optional path to a key file to be used together with certfile password: The optional password to be used when loading the certificate chain + hmac_secret: The optional HMAC secret used to verify the payload from the client + hmac_algo: The optional HMAC algorithm used to calculate the payload hash. + See your python's hashlib.algorithms_available set for available options. + Defaults to sha256 + hmac_header: The optional HMAC header sent by the client with the payload signature. + Defaults to x-hub-signature-256 + hmac_format: The optional HMAC signature format format. + Supported formats: hex, base64 + Defaults to hex """ import asyncio +import base64 +import hashlib +import hmac import json import logging import ssl @@ -56,6 +68,36 @@ def _parse_token(request: web.Request) -> (str, str): return scheme, token +async def _hmac_verify(request: web.Request) -> bool: + hmac_secret = request.app["hmac_secret"] + hmac_header = request.app["hmac_header"] + hmac_algo = request.app["hmac_algo"] + hmac_format = request.app["hmac_format"] + + if hmac_header not in request.headers: + raise web.HTTPBadRequest(text="Signature header not found") + + hmac_header_digest = request.headers[hmac_header].strip() + + if hmac_header_digest.startswith(f"{hmac_algo}="): + hmac_prefix_len = len(f"{hmac_algo}=") + hmac_header_digest = hmac_header_digest[hmac_prefix_len:] + + body = await request.text() + + event_hmac = hmac.new( + key=hmac_secret, + msg=body.encode("utf-8"), + digestmod=hmac_algo, + ) + if hmac_format == "base64": + event_digest = base64.b64encode(event_hmac.digest()).decode("utf-8") + elif hmac_format == "hex": + event_digest = event_hmac.hexdigest() + + return hmac.compare_digest(hmac_header_digest, event_digest) + + @web.middleware async def bearer_auth(request: web.Request, handler: Callable) -> web.StreamResponse: """Verify authorization is Bearer type.""" @@ -69,16 +111,46 @@ async def bearer_auth(request: web.Request, handler: Callable) -> web.StreamResp return await handler(request) +@web.middleware +async def hmac_verify(request: web.Request, handler: Callable) -> web.StreamResponse: + """Verify event's HMAC signature.""" + hmac_verified = await _hmac_verify(request) + if not hmac_verified: + raise web.HTTPUnauthorized(text="HMAC verification failed") + + return await handler(request) + + async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None: """Receive events via webhook.""" if "port" not in args: msg = "Missing required argument: port" raise ValueError(msg) + + middlewares = [] + app_attrs = {} + if "token" in args: - app = web.Application(middlewares=[bearer_auth]) - app["token"] = args["token"] - else: - app = web.Application() + middlewares.append(bearer_auth) + app_attrs["token"] = args["token"] + + if "hmac_secret" in args: + middlewares.append(hmac_verify) + + app_attrs["hmac_secret"] = args["hmac_secret"].encode("utf-8") + app_attrs["hmac_algo"] = args.get("hmac_algo", "sha256") + if app_attrs["hmac_algo"] not in hashlib.algorithms_available: + msg = f"Unsupported HMAC algorithm: {app_attrs['hmac_algo']}" + raise ValueError(msg) + app_attrs["hmac_header"] = args.get("hmac_header", "x-hub-signature-256") + app_attrs["hmac_format"] = args.get("hmac_format", "hex") + if app_attrs["hmac_format"] not in ["hex", "base64"]: + msg = f"Unsupported HMAC header format {app_attrs['hmac_format']}" + raise ValueError(msg) + + app = web.Application(middlewares=middlewares) + for key, value in app_attrs.items(): + app[key] = value app["queue"] = queue app.add_routes(routes) diff --git a/tests/integration/event_source_webhook/test_webhook_hmac_rules.yml b/tests/integration/event_source_webhook/test_webhook_hmac_rules.yml new file mode 100644 index 00000000..6986c231 --- /dev/null +++ b/tests/integration/event_source_webhook/test_webhook_hmac_rules.yml @@ -0,0 +1,19 @@ +--- +- name: test webhook source plugin + hosts: localhost + sources: + - ansible.eda.webhook: + port: "{{ WH_PORT | default(5000) }}" + hmac_secret: "{{ HMAC_SECRET }}" + hmac_algo: "{{ HMAC_ALGO }}" + rules: + - name: match webhook event + condition: event.payload.ping == "pong" + action: + debug: + msg: "Rule fired successfully" + + - name: shutdown + condition: event.payload.shutdown is defined + action: + shutdown: diff --git a/tests/integration/event_source_webhook/test_webhook_source.py b/tests/integration/event_source_webhook/test_webhook_source.py index b8162529..0a65bef6 100644 --- a/tests/integration/event_source_webhook/test_webhook_source.py +++ b/tests/integration/event_source_webhook/test_webhook_source.py @@ -86,3 +86,72 @@ def test_webhook_source_with_busy_port(subprocess_teardown): stdout, _unused_stderr = proc2.communicate() assert "address already in use" in stdout.decode() assert proc2.returncode == 1 + + +def test_webhook_source_hmac_sanity(subprocess_teardown): + """ + Check the successful execution, response and shutdown + of the webhook source plugin. + """ + msgs = [ + ( + json.dumps({"ping": "pong"}).encode("ascii"), + "sha256=23fff24c4b9835c6179de19103c6c640150d07d8a72c987b030b541a9d988736", + ), + ( + json.dumps({"shutdown": ""}).encode("ascii"), + "185e5a6124894d6fed1c69c8bea049da241adec83b468c867c4e83627e64d9b9", + ), + ] + + port = 5000 + url = f"http://127.0.0.1:{port}/webhook" + + env = os.environ.copy() + env["WH_PORT"] = str(port) + env["HMAC_SECRET"] = "secret" + env["HMAC_ALGO"] = "sha256" + + rules_file = TESTS_PATH + "/event_source_webhook/test_webhook_hmac_rules.yml" + + proc = CLIRunner( + rules=rules_file, envvars="WH_PORT,HMAC_SECRET,HMAC_ALGO", env=env, debug=True + ).run_in_background() + subprocess_teardown(proc) + + wait_for_events(proc) + + for msg, digest in msgs: + headers = {"x-hub-signature-256": digest} + requests.post(url, data=msg, headers=headers) + + try: + stdout, _unused_stderr = proc.communicate(timeout=5) + except subprocess.TimeoutExpired: + proc.terminate() + stdout, _unused_stderr = proc.communicate() + + assert "Rule fired successfully" in stdout.decode() + assert f"'Host': '127.0.0.1:{port}'" in stdout.decode() + assert proc.returncode == 0 + + +def test_webhook_source_with_unsupported_hmac_algo(subprocess_teardown): + """ + Ensure the CLI responds correctly if the desired HMAC algorithm is not supported. + """ + + port = 5000 + env = os.environ.copy() + env["WH_PORT"] = str(port) + env["HMAC_SECRET"] = "secret" + env["HMAC_ALGO"] = "invalid_hmac_algo" + + rules_file = TESTS_PATH + "/event_source_webhook/test_webhook_hmac_rules.yml" + proc = CLIRunner( + rules=rules_file, envvars="WH_PORT,HMAC_SECRET,HMAC_ALGO", env=env, debug=True + ).run_in_background() + proc.wait(timeout=15) + stdout, _unused_stderr = proc.communicate() + assert f"Unsupported HMAC algorithm: {env['HMAC_ALGO']}" in stdout.decode() + assert proc.returncode == 1 diff --git a/tests/unit/event_source/test_webhook.py b/tests/unit/event_source/test_webhook.py index 4fa9fdb7..70e34c33 100644 --- a/tests/unit/event_source/test_webhook.py +++ b/tests/unit/event_source/test_webhook.py @@ -23,6 +23,27 @@ async def post_code(server_task, info): server_task.cancel() +async def assert_post( + server_task, info, expected_status=HTTPStatus.OK, expected_text=None +): + url = f'http://{info["host"]}/{info["endpoint"]}' + payload = info["payload"] + headers = {} + + if "token" in info: + headers["Authorization"] = f"Bearer {info['token']}" + + if "hmac_header" in info: + headers[info["hmac_header"]] = info["hmac_digest"] + + async with aiohttp.ClientSession() as session: + async with session.post(url, json=payload, headers=headers) as resp: + server_task.cancel() + assert resp.status == expected_status + if expected_text: + assert expected_text in await resp.text() + + async def cancel_code(server_task): server_task.cancel() @@ -77,3 +98,284 @@ async def do_request(): plugin_task = asyncio.create_task(start_server(queue, args)) request_task = asyncio.create_task(do_request()) await asyncio.gather(plugin_task, request_task) + + +@pytest.mark.asyncio +async def test_post_hmac_hex_endpoint(): + queue = asyncio.Queue() + + args = { + "host": "127.0.0.1", + "port": 8000, + "hmac_secret": "secret", + "hmac_algo": "sha256", + "hmac_header": "x-hub-signature-256", + "hmac_format": "hex", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": args["hmac_header"], + "hmac_digest": "sha256=9ec8272937a36a4b4427d4f9ab7b0425856c5ef5d7e1b496f864aaf99c1910ca", # noqa: E501 + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + post_task = asyncio.create_task(assert_post(plugin_task, task_info)) + + await asyncio.gather(plugin_task, post_task) + + data = await queue.get() + assert data["payload"] == task_info["payload"] + assert data["meta"]["endpoint"] == task_info["endpoint"] + assert data["meta"]["headers"]["Host"] == task_info["host"] + + +@pytest.mark.asyncio +async def test_post_hmac_hex_wo_digest_prefix_endpoint(): + queue = asyncio.Queue() + + args = { + "host": "127.0.0.1", + "port": 8000, + "hmac_secret": "secret", + "hmac_algo": "sha256", + "hmac_header": "x-hub-signature-256", + "hmac_format": "hex", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": args["hmac_header"], + "hmac_digest": "9ec8272937a36a4b4427d4f9ab7b0425856c5ef5d7e1b496f864aaf99c1910ca", # noqa: E501 + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + post_task = asyncio.create_task(assert_post(plugin_task, task_info)) + + await asyncio.gather(plugin_task, post_task) + + data = await queue.get() + assert data["payload"] == task_info["payload"] + assert data["meta"]["endpoint"] == task_info["endpoint"] + assert data["meta"]["headers"]["Host"] == task_info["host"] + + +@pytest.mark.asyncio +async def test_post_hmac_hex_endpoint_invalid_signature(): + queue = asyncio.Queue() + + args = { + "host": "127.0.0.1", + "port": 8000, + "hmac_secret": "secret", + "hmac_algo": "sha256", + "hmac_header": "x-hub-signature-256", + "hmac_format": "hex", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": args["hmac_header"], + "hmac_digest": "sha256=11f8feeab79372c842f0097fc105dd66d90c41412ab9d3c4071859d7b6ae864b", # noqa: E501 + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + post_task = asyncio.create_task( + assert_post(plugin_task, task_info, HTTPStatus.UNAUTHORIZED) + ) + + await asyncio.gather(plugin_task, post_task) + + +@pytest.mark.asyncio +async def test_post_hmac_hex_endpoint_missing_signature(): + queue = asyncio.Queue() + + args = { + "host": "127.0.0.1", + "port": 8000, + "hmac_secret": "secret", + "hmac_algo": "sha256", + "hmac_header": "x-hub-signature-256", + "hmac_format": "hex", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": "x-not-a-signature-header", + "hmac_digest": "sha256=205009e3e895e0fe0ff982e1020dd0fb4b6d16cf9c666652b3492e20429ccdb8", # noqa: E501 + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + post_task = asyncio.create_task( + assert_post(plugin_task, task_info, HTTPStatus.BAD_REQUEST) + ) + + await asyncio.gather(plugin_task, post_task) + + +@pytest.mark.asyncio +async def test_post_hmac_base64_endpoint(): + queue = asyncio.Queue() + + args = { + "host": "127.0.0.1", + "port": 8000, + "hmac_secret": "secret", + "hmac_algo": "sha256", + "hmac_header": "x-custom-signature", + "hmac_format": "base64", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": args["hmac_header"], + "hmac_digest": "sha256=nsgnKTejaktEJ9T5q3sEJYVsXvXX4bSW+GSq+ZwZEMo=", + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + post_task = asyncio.create_task(assert_post(plugin_task, task_info)) + + await asyncio.gather(plugin_task, post_task) + + data = await queue.get() + assert data["payload"] == task_info["payload"] + assert data["meta"]["endpoint"] == task_info["endpoint"] + assert data["meta"]["headers"]["Host"] == task_info["host"] + + +@pytest.mark.asyncio +async def test_post_hmac_base64_endpoint_invalid_signature(): + queue = asyncio.Queue() + + args = { + "host": "127.0.0.1", + "port": 8000, + "hmac_secret": "secret", + "hmac_algo": "sha256", + "hmac_header": "x-hub-signature-256", + "hmac_format": "hex", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": args["hmac_header"], + "hmac_digest": "nsgnKTejaktEJ9T5q3sEJYVsXvXX4bSW+GSq+ZwZEMo=", + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + post_task = asyncio.create_task( + assert_post(plugin_task, task_info, HTTPStatus.UNAUTHORIZED) + ) + + await asyncio.gather(plugin_task, post_task) + + +@pytest.mark.asyncio +async def test_post_token_and_hmac_hex_endpoint(): + queue = asyncio.Queue() + + args = { + "host": "127.0.0.1", + "port": 8000, + "token": "secret", + "hmac_secret": "secret", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": "x-hub-signature-256", + "hmac_digest": "sha256=9ec8272937a36a4b4427d4f9ab7b0425856c5ef5d7e1b496f864aaf99c1910ca", # noqa: E501 + "token": args["token"], + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + post_task = asyncio.create_task(assert_post(plugin_task, task_info)) + + await asyncio.gather(plugin_task, post_task) + + data = await queue.get() + assert data["payload"] == task_info["payload"] + assert data["meta"]["endpoint"] == task_info["endpoint"] + assert data["meta"]["headers"]["Host"] == task_info["host"] + + +@pytest.mark.asyncio +async def test_post_token_and_hmac_hex_endpoint_invalid_signature(): + queue = asyncio.Queue() + + args = args = { + "host": "127.0.0.1", + "port": 8000, + "token": "secret", + "hmac_secret": "secret", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": "x-hub-signature-256", + "hmac_digest": "11f8feeab79372c842f0097fc105dd66d90c41412ab9d3c4071859d7b6ae864b", # noqa: E501 + "token": args["token"], + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + expected_text = "HMAC verification failed" + post_task = asyncio.create_task( + assert_post(plugin_task, task_info, HTTPStatus.UNAUTHORIZED, expected_text) + ) + + await asyncio.gather(plugin_task, post_task) + + +@pytest.mark.asyncio +async def test_post_token_and_hmac_hex_endpoint_invalid_token(): + queue = asyncio.Queue() + + args = { + "host": "127.0.0.1", + "port": 8000, + "token": "secret", + "hmac_secret": "secret", + } + + plugin_task = asyncio.create_task(start_server(queue, args)) + + task_info = { + "payload": {"src_path": "https://example.com/payload"}, + "hmac_header": "x-hub-signature-256", + "hmac_digest": "11f8feeab79372c842f0097fc105dd66d90c41412ab9d3c4071859d7b6ae864b", # noqa: E501 + "token": "invalid_token", + "endpoint": "test", + "host": f'{args["host"]}:{args["port"]}', + } + + expected_text = "Invalid authorization token" + post_task = asyncio.create_task( + assert_post(plugin_task, task_info, HTTPStatus.UNAUTHORIZED, expected_text) + ) + + await asyncio.gather(plugin_task, post_task)