-
Notifications
You must be signed in to change notification settings - Fork 22
/
base.py
476 lines (390 loc) · 16.4 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
"""Copyright 2020 Google Inc.
All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import print_function
import datetime
import re
from civics_cdf_validator import loggers
from civics_cdf_validator import stats
from lxml import etree
class SchemaHandler(object):
"""Base class for anything that parses an XML schema document."""
_XSCHEMA_NAMESPACE = "http://www.w3.org/2001/XMLSchema"
_XSCHEMA_INSTANCE_NAMESPACE = "http://www.w3.org/2001/XMLSchema-instance"
_TYPE_ATTRIB = "{%s}type" % (_XSCHEMA_INSTANCE_NAMESPACE)
def get_element_class(self, element):
"""Return the class of the element."""
if element is None:
return None
if self._TYPE_ATTRIB not in element.attrib:
return element.tag
return element.attrib[self._TYPE_ATTRIB]
def strip_schema_ns(self, element):
"""Remove namespace from lxml element tag."""
tag = element.tag
if not hasattr(tag, "startswith"):
# Comment tags return a function
return None
if tag.startswith("{%s}" % self._XSCHEMA_NAMESPACE):
return tag[len("{%s}" % self._XSCHEMA_NAMESPACE):]
return tag
def get_elements_by_class(self, element, element_name):
"""Searches for all tags under element of type element_name."""
# find all the tags that match element_name
elements = element.findall(".//" + element_name)
# next find all elements where the type is element_name
elements += element.xpath(
".//*[@xsi:type ='%s']" % (element_name),
namespaces={"xsi": self._XSCHEMA_INSTANCE_NAMESPACE})
return elements
class BaseRule(SchemaHandler):
"""Base class for rules."""
def __init__(self, election_tree, schema_tree, ocd_id_validator=None):
super(BaseRule, self).__init__()
self.election_tree = election_tree
self.schema_tree = schema_tree
self.ocd_id_validator = ocd_id_validator
def elements(self):
"""Return a list of all the elements this rule checks."""
raise NotImplementedError
def check(self, element):
"""Given an element, check whether it implements best practices."""
raise NotImplementedError
def set_option(self, option):
"""Used to set commandline options for the rule.
Args:
option: commandline option object.
Raises:
ElectionException: the rule must have the option_name attribute.
"""
if not hasattr(self, option.option_name):
raise loggers.ElectionException("Invalid attribute set")
setattr(self, option.option_name, option.option_value)
def setup(self):
"""Perform any rule specific setup before checking."""
class TreeRule(BaseRule):
"""Rule that checks entire tree."""
def elements(self):
return ["tree"]
def check(self):
"""Checks entire tree."""
class ValidReferenceRule(TreeRule):
"""Rule that makes sure reference values are properly defined."""
def __init__(
self,
election_tree,
schema_tree,
missing_element="data",
**kwargs,
):
super(ValidReferenceRule, self).__init__(
election_tree, schema_tree, **kwargs
)
self.missing_element = missing_element
def _gather_reference_values(self):
"""Collect a set of all values that are referencing a pre-defined value.
Ex: A party leader ID should reference an ID from a PersonCollection.
This method should return a set of all party leader IDs.
"""
raise NotImplementedError
def _gather_defined_values(self):
"""Collect a set of the pre-defined values that are being referenced.
Ex: A party leader ID should reference an ID from a PersonCollection.
This method should return a set of all PersonIDs from the PersonCollection.
"""
raise NotImplementedError
def check(self):
reference_ids = self._gather_reference_values()
defined_ids = self._gather_defined_values()
invalid_references = reference_ids - defined_ids
if invalid_references:
raise loggers.ElectionError.from_message(
("No defined {} for {} found in the feed.".format(
self.missing_element, ", ".join(invalid_references))))
class DateRule(BaseRule):
"""Base rule used for date validations.
When validating dates, this rule can be used to gather start and
end date values.
"""
def __init__(self, election_tree, schema_file, **kwargs):
super(DateRule, self).__init__(election_tree, schema_file, **kwargs)
self.start_elem = None
self.start_date = None
self.end_elem = None
self.end_date = None
self.error_log = []
def reset_instance_vars(self):
"""Reset instance variables to initial state.
Due to ordered procedure of validator, instance vars created in init
are not getting reset when same rule is run on different elements.
"""
self.start_elem = None
self.start_date = None
self.end_elem = None
self.end_date = None
self.error_log = []
def gather_dates(self, element):
"""Gather StartDate and EndDate values for the provided element.
An election element should have a start and end date in the desired format.
These dates should be extracted and set as instance variables to be used
in validation checks.
Args:
element: A parent element that contains StartDate and EndDate children.
Raises:
ElectionError: dates need to be properly formatted.
"""
error_log = []
self.start_elem = element.find("StartDate")
if self.start_elem is not None and self.start_elem.text is not None:
self.start_date = PartialDate.init_partial_date(self.start_elem.text)
if self.start_date is None:
error_message = (
"The StartDate text should be of the formats: yyyy-mm-dd, or yyyy,"
" or yyyy-mm")
error_log.append(loggers.LogEntry(error_message, [self.start_elem]))
self.end_elem = element.find("EndDate")
if self.end_elem is not None and self.end_elem.text is not None:
self.end_date = PartialDate.init_partial_date(self.end_elem.text)
if self.end_date is None:
error_message = ("The EndDate text should be of the formats: "
"yyyy-mm-dd, or yyyy, or yyyy-mm")
error_log.append(loggers.LogEntry(error_message, [self.end_elem]))
if error_log:
raise loggers.ElectionError(error_log)
def is_date_in_past(self, date):
"""Check if a date is in the past."""
today = datetime.datetime.utcnow()
today_partial_date = PartialDate(today.year, today.month, today.day)
delta = date.is_older_than(today_partial_date)
return delta > 0
def check_for_date_not_in_past(self, date, date_elem):
"""Check if given date is not in past and add an error message to the error log if the date is in past."""
if date is not None and self.is_date_in_past(date):
error_message = """The date {} is in the past.""".format(date)
self.error_log.append(loggers.LogEntry(error_message, [date_elem]))
def check_for_date_in_past(self, date, date_elem):
"""Check if given date is in past and add an error message to the error log if the date is not in past."""
if date is not None and not self.is_date_in_past(date):
error_message = """The date {} is not in the past.""".format(date)
self.error_log.append(loggers.LogEntry(error_message, [date_elem]))
def check_end_after_start(self):
"""Checks if EndDate is after StartDate and add an error message to the error log if the EndDate is before StartDate."""
if self.start_date is not None and self.end_date is not None:
start_end_delta = self.start_date.is_older_than(self.end_date)
if start_end_delta < 0:
error_message = """The dates (start: {}, end: {}) are invalid.
The end date must be the same or after the start date.""".format(
self.start_date, self.end_date)
self.error_log.append(loggers.LogEntry(error_message, [self.end_elem]))
class PartialDate():
"""Check for PartialDate."""
REGEX_PATTERN = re.compile(
r"^(?P<year>[0-9]{4})(?:-(?P<month>[0-9]{2}))?(?:-(?P<day>[0-9]{2}))?$")
def __init__(self, year=None, month=None, day=None):
self.year = year
self.month = month
self.day = day
def __str__(self):
if self.is_only_year_date():
return "%s" % self.year
elif self.is_month_date():
return "%s-%s" %(self.year, str(self.month).zfill(2))
elif self.is_complete_date():
return "%s-%s-%s" % (self.year, str(self.month).zfill(2), str(
self.day).zfill(2))
else:
return "Not defined"
@classmethod
def init_partial_date(cls, date_string):
"""Initializing partial date."""
match_object = re.match(cls.REGEX_PATTERN, date_string)
if match_object is None:
return None
else:
partial_date_year = int(match_object.groupdict().get(
"year")) if match_object.groupdict().get("year") is not None else None
partial_date_month = int(
match_object.groupdict().get("month")
) if match_object.groupdict().get("month") is not None else None
if partial_date_month is not None and partial_date_month > 12:
return None
partial_date_day = int(match_object.groupdict().get(
"day")) if match_object.groupdict().get("day") is not None else None
partial_date = PartialDate(partial_date_year, partial_date_month,
partial_date_day)
if partial_date.is_complete_date():
try:
datetime.datetime(partial_date_year, partial_date_month,
partial_date_day)
except ValueError:
return None
return partial_date
def is_older_than(self, other_date):
"""Compares 2 dates/partial dates.
Args:
other_date: date to be compared.
Returns:
The difference between the years if the given dates only contains a year.
The difference between the years if the given dates contains year and
month, then when the years of both dates aren't same.
The difference between the months, if the years of both dates are same.
The difference between the days if the given dates contain complete day,
and if the year and month of both dates are the same.
"""
if self.is_only_year_date() or other_date.is_only_year_date():
return other_date.year - self.year
elif self.is_month_date() or other_date.is_month_date():
if other_date.year - self.year != 0:
return other_date.year - self.year
return other_date.month - self.month
else:
if other_date.year - self.year != 0:
return other_date.year - self.year
elif other_date.month - self.month != 0:
return other_date.month - self.month
return other_date.day - self.day
def is_only_year_date(self):
return self.year is not None and self.month is None and self.day is None
def is_month_date(self):
return self.year is not None and self.month is not None and self.day is None
def is_complete_date(self):
return (
self.year is not None
and self.month is not None
and self.day is not None
)
class MissingFieldRule(BaseRule):
"""Check for required fields for given entity types and field names."""
def get_severity(self):
"""Return 0 for Info, 1 for Warning, or 2 for Error."""
raise NotImplementedError
def element_field_mapping(self):
"""Return a map of element tag to list of required fields."""
raise NotImplementedError
def setup(self):
severity = self.get_severity()
handled_severities = loggers.handled_severities()
if (severity > len(handled_severities)
or severity < 0):
raise Exception(("Invalid severity. Must be either 0 (Info), "
"1 (Warning), or 2 (Error)"))
self.exception = handled_severities[severity]
def elements(self):
return list(self.element_field_mapping().keys())
def check(self, element):
error_log = []
required_field_tags = self.element_field_mapping()[element.tag]
for field_tag in required_field_tags:
required_field = element.find(field_tag)
if (required_field is None or required_field.text is None
or not required_field.text.strip()):
error_log.append(loggers.LogEntry(
"The element {} is missing field {}.".format(element.tag,
field_tag), [element]))
if error_log:
raise self.exception(error_log)
class RuleOption(object):
class_name = None
option_name = None
option_value = None
def __init__(self, option_name, option_value):
self.option_name = option_name
self.option_value = option_value
class RulesRegistry(SchemaHandler):
"""Registry of rules and the elements they check."""
_TOP_LEVEL_ENTITIES = set(
["Party", "GpUnit", "Office", "Person", "Candidate", "Contest"])
def __init__(
self,
election_file,
schema_file,
rule_classes_to_check,
rule_options,
ocd_id_validator,
):
self.election_file = election_file
self.schema_file = schema_file
self.rule_classes_to_check = rule_classes_to_check
self.rule_options = rule_options
self.ocd_id_validator = ocd_id_validator
self.registry = {}
self.exceptions_wrapper = loggers.ExceptionListWrapper()
self.election_tree = None
def register_rules(self):
"""Register all the rules to be checked.
Returns:
A dictionary of elements and rules that check each element
"""
for rule in self.rule_classes_to_check:
rule_instance = rule(
self.election_tree,
self.schema_tree,
ocd_id_validator=self.ocd_id_validator,
)
if rule.__name__ in self.rule_options.keys():
for option in self.rule_options[rule.__name__]:
rule_instance.set_option(option)
rule_instance.setup()
for element in set(rule_instance.elements()):
if element in self.registry:
self.registry[element].append(rule_instance)
else:
self.registry[element] = [rule_instance]
def print_exceptions(self, severity, verbose):
self.exceptions_wrapper.print_exceptions(severity, verbose)
def get_all_exceptions(self):
self.exceptions_wrapper.get_all_exceptions()
def count_stats(self):
"""Aggregates the counts for each top level entity."""
if self.election_tree:
# Find the top-level entities.
entity_path_str = ".//{0}Collection//{1}"
print("\n" + " " * 5 + "Entity and Attribute Counts:")
for entity_name in stats.ENTITY_STATS:
entity_instances = self.election_tree.findall(
entity_path_str.format(entity_name, entity_name))
if entity_instances:
# If top-level entity exists, instantiate a stat counter with total.
entity_stats = stats.ENTITY_STATS[entity_name](len(entity_instances))
# Then for each possible nested attribute, add count for those.
for attr in entity_stats.attribute_counts:
for instance in entity_instances:
entity_stats.increment_attribute(
attr, len(instance.findall(".//{}".format(attr))))
print(entity_stats)
def check_rules(self):
"""Checks all rules."""
try:
self.schema_tree = etree.parse(self.schema_file)
self.election_tree = etree.parse(self.election_file)
except etree.LxmlError as e:
exp = loggers.ElectionFatal.from_message(
"Fatal Error. XML file could not be parsed. {}".format(e))
self.exceptions_wrapper.exception_handler(exp)
return
self.register_rules()
for rule in self.registry.get("tree", []):
try:
rule.check()
except loggers.ElectionException as e:
rule_name = rule.__class__.__name__
self.exceptions_wrapper.exception_handler(e, rule_name)
for _, element in etree.iterwalk(self.election_tree, events=("end",)):
tag = self.get_element_class(element)
if not tag or tag not in self.registry:
continue
for element_rule in self.registry[tag]:
try:
element_rule.check(element)
except loggers.ElectionException as e:
rule_name = element_rule.__class__.__name__
self.exceptions_wrapper.exception_handler(e, rule_name)