-
Notifications
You must be signed in to change notification settings - Fork 0
/
validate_var.py
69 lines (54 loc) · 2.11 KB
/
validate_var.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#
# Copyright: (c) 2019, Vaclav Tomec <[email protected]>
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible.plugins.action import ActionBase
from ansible.module_utils._text import to_native
import json
from jsonschema import validate, FormatChecker
from jsonschema.exceptions import _Error
class ActionModule(ActionBase):
"""Validate if a given variable is compliant with JSON schema"""
_VALID_ARGS = frozenset(('var', 'schema'))
def run(self, tmp=None, task_vars=None):
if task_vars is None:
task_vars = dict()
result = super(ActionModule, self).run(tmp, task_vars)
del tmp # tmp no longer has any effect
# check if arguments are provided
__var = self._task.args.get('var', None)
__schema = self._task.args.get('schema', None)
if __var is None:
result['failed'] = True
result['msg'] = "'var' argument needs to be provided"
return result
if __schema is None:
result['failed'] = True
result['msg'] = "'schema' argument needs to be provided"
return result
try:
# open file
schema_path = self._find_needle('files', __schema)
result['schema_path'] = schema_path
f_schema = open(schema_path, 'r')
except IOError as e:
result['failed'] = True
result['msg'] = "Failed to open %s: %s" % (to_native(__schema), to_native(e))
return result
try:
# load schema
schema = json.loads(f_schema.read())
f_schema.close()
except ValueError as e:
result['failed'] = True
result['msg'] = "Failed to load schema %s: %s" % (to_native(__schema), to_native(e))
return result
try:
# validate data
validate(__var, schema, format_checker=FormatChecker())
except _Error as e:
result['failed'] = True
result['msg'] = "Validation failed: %s" % to_native(e)
return result
return result