diff --git a/docker-compose.override.yml b/docker-compose.override.yml index 292c5ea7..f34bd412 100644 --- a/docker-compose.override.yml +++ b/docker-compose.override.yml @@ -17,4 +17,4 @@ services: mosquitto: ports: - 1883:1883 - - 8884:8884 + - 8884:8884 \ No newline at end of file diff --git a/docs/source/reference/running/data-pipeline-plugins.rst b/docs/source/reference/running/data-pipeline-plugins.rst index a1c5234b..89adeef6 100644 --- a/docs/source/reference/running/data-pipeline-plugins.rst +++ b/docs/source/reference/running/data-pipeline-plugins.rst @@ -98,6 +98,27 @@ A typical BUFR4 plugin workflow definition would be defined as follows: notify: true # trigger GeoJSON publishing for API and UI file-pattern: '^.*\.bin$' +``wis2box.data.cap_message.CAPMessageData`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +This plugin takes the incoming XML file, then validates it against the +`CAP v1.2 schema `_ +and verifies the digital signature before publishing. + +The validation is performed using the `capvalidator `_ +package. + +A typical CAP message plugin workflow definition would be defined as follows: + +.. code-block:: yaml + + xml: + - plugin: wis2box.data.cap_message.CAPMessageData + notify: true + buckets: + - ${WIS2BOX_STORAGE_INCOMING} + file-pattern: '^.*\.xml$' + ``wis2box.data.universal.UniversalData`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/tests/data/CAP/sc_example.xml b/tests/data/CAP/sc_example.xml new file mode 100644 index 00000000..5886f5fd --- /dev/null +++ b/tests/data/CAP/sc_example.xml @@ -0,0 +1,71 @@ + + + + urn:oid:2.49.0.0.690.0.2024.5.19.13.18.0 + info@meteo.gov.sc + 2024-05-19T17:18:00+04:00 + Actual + Alert + Public + + en + Met + Strong Winds + Immediate + Moderate + Observed + General Public + 2024-05-19T17:30:00+04:00 + 2024-05-19T17:30:00+04:00 + 2024-05-19T23:30:00+04:00 + Seychelles Meteorological Authority + Moderate to strong south-easterly winds over Aldabra area + Moderate to strong south-easterly winds over Aldabra area associated with a severe tropical storm Ialy on 19th May 2024 from 5pm to 11pm + Beware of strong south-easterly winds of 40km/hr gusting to 60km/hr causing rough seas. Mariners are advised to take extra precautions when navigating these areas + https://www.meteo.sc/alerts/severe-tropical-storm-ialy-is-expected-to-cause-moderate-to-strong-south-easterly-winds-over-aldabra-area-on-19th-may-2024/ + info@meteo.gov.sc + + Aldabra area + -9.186965,46.078276 -9.336746,45.936002 -9.57065,45.983427 -9.748309,46.116216 -9.897844,46.249006 -10.000609,46.42922 -9.953902,46.694799 -9.692216,46.770679 -9.467752,46.770679 -9.327387,46.742224 -9.186965,46.694799 -9.140146,46.533555 -9.121416,46.362825 -9.186965,46.078276 + + + + + + + + + + + + + s9RKcAph3khDLX4nOQQDZ7c23uVZqCDJziRZznnh3nA= + + + liScT3sHEpNO1TXEDY1nMC5FoBnQruioH/xkU1rNRKfK8Y3y4/lRz64ueJfoWGYB/N3NWLkMhQ6V0X4lHGPyfA== + + + MIIDeTCCAv6gAwIBAgISA7FZkY9qfwdIeZgOZUlggQr+MAoGCCqGSM49BAMDMDIx +CzAJBgNVBAYTAlVTMRYwFAYDVQQKEw1MZXQncyBFbmNyeXB0MQswCQYDVQQDEwJF +NTAeFw0yNDA3MDIxMTIzNDlaFw0yNDA5MzAxMTIzNDhaMBcxFTATBgNVBAMTDHd3 +dy5tZXRlby5zYzBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABPnwJK7C7LIL7c3L +DXDTbFlLSHx8VXbbSKFqkr0v8xjseuNjh8IXFm95mvkdk7q1S0SUYyYn3d8a0krt +2qhGqCKjggINMIICCTAOBgNVHQ8BAf8EBAMCB4AwHQYDVR0lBBYwFAYIKwYBBQUH +AwEGCCsGAQUFBwMCMAwGA1UdEwEB/wQCMAAwHQYDVR0OBBYEFCQFyLRIaHFpqT8I +nTKhv8oSaTW1MB8GA1UdIwQYMBaAFJ8rX888IU+dBLftKyzExnCL0tcNMFUGCCsG +AQUFBwEBBEkwRzAhBggrBgEFBQcwAYYVaHR0cDovL2U1Lm8ubGVuY3Iub3JnMCIG +CCsGAQUFBzAChhZodHRwOi8vZTUuaS5sZW5jci5vcmcvMBcGA1UdEQQQMA6CDHd3 +dy5tZXRlby5zYzATBgNVHSAEDDAKMAgGBmeBDAECATCCAQMGCisGAQQB1nkCBAIE +gfQEgfEA7wB2AHb/iD8KtvuVUcJhzPWHujS0pM27KdxoQgqf5mdMWjp0AAABkHNm +tr8AAAQDAEcwRQIhANX48FhFLRl8W0qsVh12vz2F92wr2aKId+AQ/0kvE+a0AiA/ +eB4KudtHm4LJL7VSVL7UvffuEPOsY+PvoongycjZpQB1AEiw42vapkc0D+VqAvqd +MOscUgHLVt0sgdm7v6s52IRzAAABkHNmvkUAAAQDAEYwRAIgFtFOST10XUPf2BYT +xBBvHVqU98eB2hwQtgVJ4hJP5RoCIF0wDotvI7r+kamXqgvee+/ig4NP2ZbqaLP6 +a2/T5cjnMAoGCCqGSM49BAMDA2kAMGYCMQCyLgDsI/yPYKkI1zM3zs0w7iI23MfZ +BGuNKUUa7qHLR1O6eNnEmrSH24bdzXdacRoCMQCLW6bf0Y1mwuJN+jBCjTbyCe+F +1ZEDJBb2AKxTZpWdVdtfRErY5BxHuACOm9SlXGE= + + + + + \ No newline at end of file diff --git a/tests/data/metadata/discovery/int-wmo-test-cap.yml b/tests/data/metadata/discovery/int-wmo-test-cap.yml new file mode 100644 index 00000000..70cfd4fd --- /dev/null +++ b/tests/data/metadata/discovery/int-wmo-test-cap.yml @@ -0,0 +1,64 @@ +wis2box: + retention: P30D + topic_hierarchy: int-wmo-test/data/core/weather/advisories-warnings + centre_id: int-wmo-test + data_mappings: + plugins: + xml: + - plugin: wis2box.data.cap_message.CAPMessageData + notify: true + buckets: + - ${WIS2BOX_STORAGE_INCOMING} + file-pattern: '^.*\.xml$' + +mcf: + version: 1.0 + +metadata: + identifier: urn:wmo:md:int_wmo_test:cap + hierarchylevel: dataset + +identification: + title: CAP Alerts test dataset + abstract: CAP Alerts test dataset + dates: + creation: 2023-03-26 + keywords: + default: + keywords: + - CAP + - warnings + - alerts + wmo: + keywords: + - weather + keywords_type: themes + vocabulary: + name: Earth system disciplines as defined by the WMO Unified Data Policy, Resolution 1 (Cg-Ext(2021), Annex 1. + url: https://codes.wmo.int/topic-hierarchy/earth-system-discipline + extents: + spatial: + - bbox: [-180.0, -90.0, 180.0, 90.0] + crs: 4326 + temporal: + - begin: 2024-07-02 + end: null + resolution: P1H + wmo_data_policy: core + +contact: + host: + organization: WMO + url: https://wmo.int + individualname: Firstname Lastname + positionname: Position Name + phone: null + fax: null + address: null + city: null + administrativearea: null + postalcode: null + country: Switzerland + email: you@example.com + hoursofservice: 0700h - 1500h UTC + contactinstructions: email diff --git a/tests/requirements.txt b/tests/requirements.txt index 3e58e592..4a353aca 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,5 +1,6 @@ flake8 +paho-mqtt<2 pytest pywcmp pywis-pubsub -requests +requests \ No newline at end of file diff --git a/tests/test_publish_cap_message.py b/tests/test_publish_cap_message.py new file mode 100644 index 00000000..8958c373 --- /dev/null +++ b/tests/test_publish_cap_message.py @@ -0,0 +1,57 @@ +############################################################################### +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +############################################################################### + +import base64 +import json + +import paho.mqtt.publish as publish + +BROKER_USERNAME = 'wis2box' +BROKER_PASSWORD = 'wis2box' +BROKER_HOST = 'localhost' +BROKER_PORT = '1883' + +filename = 'tests/data/cap/sc_example.xml' + +# create a message containing the CAP alert in +# the data field as base64 encoded bytes +with open(filename, 'rb') as file: + data = base64.b64encode(file.read()).decode() + +msg = { + 'metadata_id': 'urn:wmo:md:int_wmo_test:cap', + 'data': data, + 'filename': filename.split('/')[-1] +} + +# publish notification on internal broker +private_auth = { + 'username': BROKER_USERNAME, + 'password': BROKER_PASSWORD +} + +publish.single(topic='wis2box/cap/publication', + payload=json.dumps(msg), + qos=1, + retain=False, + hostname=BROKER_HOST, + port=int(BROKER_PORT), + auth=private_auth) diff --git a/wis2box-broker/entrypoint.sh b/wis2box-broker/entrypoint.sh index c8bf940d..9a71986a 100644 --- a/wis2box-broker/entrypoint.sh +++ b/wis2box-broker/entrypoint.sh @@ -24,4 +24,15 @@ fi sed -i "s#_WIS2BOX_BROKER_QUEUE_MAX#$WIS2BOX_BROKER_QUEUE_MAX#" /mosquitto/config/mosquitto.conf sed -i "s#_WIS2BOX_BROKER_USERNAME#$WIS2BOX_BROKER_USERNAME#" /mosquitto/config/acl.conf +for i in `env | grep -Ee "\> /mosquitto/config/acl.conf + echo "topic readwrite ${!topic}" >> /mosquitto/config/acl.conf +done + /usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf diff --git a/wis2box-management/Dockerfile b/wis2box-management/Dockerfile index ddd38e6b..4776c56f 100644 --- a/wis2box-management/Dockerfile +++ b/wis2box-management/Dockerfile @@ -19,7 +19,7 @@ # ############################################################################### -FROM ubuntu:focal +FROM ubuntu:jammy LABEL maintainer="tomkralidis@gmail.com; mlimper@wmo.int" @@ -40,6 +40,8 @@ RUN apt-get update -y && apt-get install -y ${DEBIAN_PACKAGES} \ https://github.com/wmo-cop/pyoscar/archive/refs/tags/0.7.0.zip \ https://github.com/geopython/pygeometa/archive/refs/tags/0.16.0.zip \ https://github.com/wmo-im/pywis-topics/archive/refs/tags/0.2.0.zip \ + # install cap validator + && pip3 install --no-cache-dir capvalidator>=0.1.0-dev4 \ # install shapely && pip3 install --no-cache-dir cython pygeos==0.13 \ && pip3 install shapely \ diff --git a/wis2box-management/requirements.txt b/wis2box-management/requirements.txt index 27199bb6..ef1c7b4a 100644 --- a/wis2box-management/requirements.txt +++ b/wis2box-management/requirements.txt @@ -9,3 +9,4 @@ pywis-pubsub pywis-topics PyYAML requests +capvalidator>=0.1.0-dev4 \ No newline at end of file diff --git a/wis2box-management/wis2box/data/cap_message.py b/wis2box-management/wis2box/data/cap_message.py new file mode 100644 index 00000000..a64526d1 --- /dev/null +++ b/wis2box-management/wis2box/data/cap_message.py @@ -0,0 +1,95 @@ +############################################################################### +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +############################################################################### + +from datetime import datetime +import logging +from pathlib import Path +from typing import Union + +from capvalidator import validate_cap_message, get_dates + +from wis2box.data.base import BaseAbstractData + +LOGGER = logging.getLogger(__name__) + + +class CAPMessageData(BaseAbstractData): + """ + DataPublish: + + transform sets output_data to input_data + using metadata received by the plugin + """ + + def __init__(self, defs: dict) -> None: + """ + CAPMessageData data initializer + + :param def: `dict` object of resource mappings + + :returns: `None` + """ + + super().__init__(defs) + + def transform(self, input_data: Union[Path, bytes], + filename: str = '') -> bool: + """ + Transform input_data to output_data + + :param input_data: input data + :param filename: filename of input data + :param _meta: metadata of input data + + :returns: `bool` of result + """ + + suffix = filename.split('.')[-1] + rmk = filename.split('.')[0] + input_bytes = self.as_bytes(input_data) + + # get the sent date from the CAP XML + sent_date = get_dates(input_bytes).sent + # convert isoformat to datetime + _meta = {} + _meta['data_date'] = datetime.fromisoformat(sent_date) + # add relative filepath to _meta + _meta['relative_filepath'] = self.get_local_filepath(_meta['data_date']) # noqa + + # validate the CAP XML string content using the capvalidator package + result = validate_cap_message(input_bytes, strict=False) + if not result.passed: + LOGGER.error( + f'Invalid CAP XML, not publishing. Reason: {result.message}') + return False + + LOGGER.info( + f'CAP XML is valid, publishing to wis2box. {result.message}') + + self.output_data[rmk] = { + suffix: input_bytes, + '_meta': _meta + } + return True + + def get_local_filepath(self, date_): + yyyymmdd = date_.strftime('%Y-%m-%d') + return Path(yyyymmdd) / 'wis' / self.metadata_id diff --git a/wis2box-management/wis2box/pubsub/subscribe.py b/wis2box-management/wis2box/pubsub/subscribe.py index dceee6ec..a4e09557 100644 --- a/wis2box-management/wis2box/pubsub/subscribe.py +++ b/wis2box-management/wis2box/pubsub/subscribe.py @@ -36,13 +36,15 @@ from wis2box.data_mappings import get_data_mappings from wis2box.data.message import MessageData + from wis2box.env import (DATADIR, DOCKER_BROKER, - STORAGE_SOURCE, STORAGE_ARCHIVE) + STORAGE_SOURCE, STORAGE_ARCHIVE, + STORAGE_INCOMING) from wis2box.handler import Handler, NotHandledError import wis2box.metadata.discovery as discovery_metadata from wis2box.plugin import load_plugin, PLUGINS from wis2box.pubsub.message import gcm - +from wis2box.storage import put_data LOGGER = logging.getLogger(__name__) @@ -99,23 +101,24 @@ def handle(self, filepath): msg = f'handle() error: {err}' raise err - def handle_publish(self, message): + def handle_publish(self, message, publisher='wis2box'): LOGGER.debug('Loading MessageData plugin to publish data from message') # noqa topic_hierarchy = message['channel'] metadata_id = message.get('metadata_id') + # if metadata_id not provided, log error and return if metadata_id is None: LOGGER.error('metadata_id not provided in message received on topic wis2box/data/publication') # noqa # ensure topic_hierarchy starts with 'origin/a/wis2/' if not topic_hierarchy.startswith('origin/a/wis2/'): topic_hierarchy = f'origin/a/wis2/{topic_hierarchy}' + defs = { 'topic_hierarchy': topic_hierarchy, '_meta': message['_meta'], 'notify': True, 'metadata_id': metadata_id } - MessageData(defs=defs) plugin = MessageData(defs=defs) try: input_bytes = base64.b64decode(message['data'].encode('utf-8')) @@ -155,6 +158,26 @@ def on_message_handler(self, client, userdata, msg): while len(mp.active_children()) == mp.cpu_count(): sleep(0.05) mp.Process(target=self.handle, args=(filepath,)).start() + elif topic == 'wis2box/cap/publication': + LOGGER.debug('Publishing data received by cap-editor') + # get filename and data from message and store in incoming-data + metadata_id = message.get('metadata_id') + if metadata_id is None: + LOGGER.error('metadata_id not found in message') + return False + filename = message.get('filename') + if filename is None: + LOGGER.error('filename not found in message') + return False + data = message.get('data') + if data is None: + LOGGER.error('data not found in message') + return False + # convert base64 encoded data to bytes + data_bytes = base64.b64decode(data.encode('utf-8')) + # store data in incoming-data + path = f'{STORAGE_INCOMING}/{metadata_id}/{filename}' + put_data(data_bytes, path) elif topic == 'wis2box/data/publication': LOGGER.debug('Publishing data') self.handle_publish(message)