Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HMAC verification support to webhook plugin #159

Merged
merged 8 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 76 additions & 4 deletions extensions/eda/plugins/event_source/webhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,22 @@
certfile: The optional path to a certificate file to enable TLS support
keyfile: The optional path to a key file to be used together with certfile
password: The optional password to be used when loading the certificate chain
hmac_secret: The optional HMAC secret used to verify the payload from the client
hmac_algo: The optional HMAC algorithm used to calculate the payload hash.
See your python's hashlib.algorithms_available set for available options.
Defaults to sha256
hmac_header: The optional HMAC header sent by the client with the payload signature.
Defaults to x-hub-signature-256
hmac_format: The optional HMAC signature format format.
Supported formats: hex, base64
Defaults to hex

"""

import asyncio
import base64
import hashlib
import hmac
import json
import logging
import ssl
Expand Down Expand Up @@ -56,6 +68,36 @@ def _parse_token(request: web.Request) -> (str, str):
return scheme, token


async def _hmac_verify(request: web.Request) -> bool:
hmac_secret = request.app["hmac_secret"]
hmac_header = request.app["hmac_header"]
hmac_algo = request.app["hmac_algo"]
hmac_format = request.app["hmac_format"]

if hmac_header not in request.headers:
raise web.HTTPBadRequest(text="Signature header not found")

hmac_header_digest = request.headers[hmac_header].strip()

if hmac_header_digest.startswith(f"{hmac_algo}="):
hmac_prefix_len = len(f"{hmac_algo}=")
hmac_header_digest = hmac_header_digest[hmac_prefix_len:]

body = await request.text()

event_hmac = hmac.new(
key=hmac_secret,
msg=body.encode("utf-8"),
digestmod=hmac_algo,
)
if hmac_format == "base64":
event_digest = base64.b64encode(event_hmac.digest()).decode("utf-8")
elif hmac_format == "hex":
event_digest = event_hmac.hexdigest()

return hmac.compare_digest(hmac_header_digest, event_digest)


@web.middleware
async def bearer_auth(request: web.Request, handler: Callable) -> web.StreamResponse:
"""Verify authorization is Bearer type."""
Expand All @@ -69,16 +111,46 @@ async def bearer_auth(request: web.Request, handler: Callable) -> web.StreamResp
return await handler(request)


@web.middleware
async def hmac_verify(request: web.Request, handler: Callable) -> web.StreamResponse:
"""Verify event's HMAC signature."""
hmac_verified = await _hmac_verify(request)
if not hmac_verified:
raise web.HTTPUnauthorized(text="HMAC verification failed")

return await handler(request)


async def main(queue: asyncio.Queue, args: dict[str, Any]) -> None:
"""Receive events via webhook."""
if "port" not in args:
msg = "Missing required argument: port"
raise ValueError(msg)

middlewares = []
app_attrs = {}

if "token" in args:
app = web.Application(middlewares=[bearer_auth])
app["token"] = args["token"]
else:
app = web.Application()
middlewares.append(bearer_auth)
app_attrs["token"] = args["token"]

if "hmac_secret" in args:
middlewares.append(hmac_verify)

app_attrs["hmac_secret"] = args["hmac_secret"].encode("utf-8")
app_attrs["hmac_algo"] = args.get("hmac_algo", "sha256")
if app_attrs["hmac_algo"] not in hashlib.algorithms_available:
msg = f"Unsupported HMAC algorithm: {app_attrs['hmac_algo']}"
raise ValueError(msg)
app_attrs["hmac_header"] = args.get("hmac_header", "x-hub-signature-256")
app_attrs["hmac_format"] = args.get("hmac_format", "hex")
if app_attrs["hmac_format"] not in ["hex", "base64"]:
msg = f"Unsupported HMAC header format {app_attrs['hmac_format']}"
raise ValueError(msg)

app = web.Application(middlewares=middlewares)
for key, value in app_attrs.items():
app[key] = value
app["queue"] = queue

app.add_routes(routes)
Expand Down
19 changes: 19 additions & 0 deletions tests/integration/event_source_webhook/test_webhook_hmac_rules.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
- name: test webhook source plugin
hosts: localhost
sources:
- ansible.eda.webhook:
port: "{{ WH_PORT | default(5000) }}"
hmac_secret: "{{ HMAC_SECRET }}"
hmac_algo: "{{ HMAC_ALGO }}"
rules:
- name: match webhook event
condition: event.payload.ping == "pong"
action:
debug:
msg: "Rule fired successfully"

- name: shutdown
condition: event.payload.shutdown is defined
action:
shutdown:
69 changes: 69 additions & 0 deletions tests/integration/event_source_webhook/test_webhook_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,72 @@ def test_webhook_source_with_busy_port(subprocess_teardown):
stdout, _unused_stderr = proc2.communicate()
assert "address already in use" in stdout.decode()
assert proc2.returncode == 1


def test_webhook_source_hmac_sanity(subprocess_teardown):
"""
Check the successful execution, response and shutdown
of the webhook source plugin.
"""
msgs = [
(
json.dumps({"ping": "pong"}).encode("ascii"),
"sha256=23fff24c4b9835c6179de19103c6c640150d07d8a72c987b030b541a9d988736",
),
(
json.dumps({"shutdown": ""}).encode("ascii"),
"185e5a6124894d6fed1c69c8bea049da241adec83b468c867c4e83627e64d9b9",
),
]

port = 5000
url = f"http://127.0.0.1:{port}/webhook"

env = os.environ.copy()
env["WH_PORT"] = str(port)
env["HMAC_SECRET"] = "secret"
env["HMAC_ALGO"] = "sha256"

rules_file = TESTS_PATH + "/event_source_webhook/test_webhook_hmac_rules.yml"

proc = CLIRunner(
rules=rules_file, envvars="WH_PORT,HMAC_SECRET,HMAC_ALGO", env=env, debug=True
).run_in_background()
subprocess_teardown(proc)

wait_for_events(proc)

for msg, digest in msgs:
headers = {"x-hub-signature-256": digest}
requests.post(url, data=msg, headers=headers)

try:
stdout, _unused_stderr = proc.communicate(timeout=5)
except subprocess.TimeoutExpired:
proc.terminate()
stdout, _unused_stderr = proc.communicate()

assert "Rule fired successfully" in stdout.decode()
assert f"'Host': '127.0.0.1:{port}'" in stdout.decode()
assert proc.returncode == 0


def test_webhook_source_with_unsupported_hmac_algo(subprocess_teardown):
"""
Ensure the CLI responds correctly if the desired HMAC algorithm is not supported.
"""

port = 5000
env = os.environ.copy()
env["WH_PORT"] = str(port)
env["HMAC_SECRET"] = "secret"
env["HMAC_ALGO"] = "invalid_hmac_algo"

rules_file = TESTS_PATH + "/event_source_webhook/test_webhook_hmac_rules.yml"
proc = CLIRunner(
rules=rules_file, envvars="WH_PORT,HMAC_SECRET,HMAC_ALGO", env=env, debug=True
).run_in_background()
proc.wait(timeout=15)
stdout, _unused_stderr = proc.communicate()
assert f"Unsupported HMAC algorithm: {env['HMAC_ALGO']}" in stdout.decode()
assert proc.returncode == 1
Loading
Loading