Skip to content

Commit

Permalink
Split ingestion pipeline (#61)
Browse files Browse the repository at this point in the history
* Creates a `validate_fhirflat` function to split out validation from the ingestion function

* Adds `validate` CLI

* rename expandCoding

* Add documentation
  • Loading branch information
pipliggins committed Aug 6, 2024
1 parent e159bd7 commit 627c07e
Show file tree
Hide file tree
Showing 19 changed files with 862 additions and 262 deletions.
42 changes: 42 additions & 0 deletions docs/howto/conversion-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,45 @@ The equivalent function to the CLI described above can be used as
```
fhirflat.convert_data_to_flat("data_file_path", "sheet_id", "%Y-%m-%d", "Brazil/East")
```

## Conversion without validation

If you wish to convert your data into FHIRflat, but not perform validation to check the
converted data conforms to the FHIR spec, you can add the `--no-validate` flag:

```bash
fhirflat transform data-file google-sheet-id date-format timezone-name --no-validate
```

The equivalent library function is
```python
fhirflat.convert_data_to_flat(<data_file_path>, <sheet_id>, <date_format>, <timezone>, validate=False)
```

We strongly recommend you don't do this unless necessary for time constraints; some
errors in conversion can cause the parquet file to fail to save (e.g. if columns contain
mixed types due to errors which would be caught during validation).

Data which is already in a FHIRflat format can be validated against the schema using

```bash
fhirflat validate <folder_name>
```

where `folder_name` is the path to the folder containing your flat files. The files **must**
be named according to the corresponding FHIR resource, e.g. the folder containing flat
Encounter data must be named `encounter.parquet`.

The folder can be provided in a compressed format, e.g. zipped; you can specifiy this
using
```bash
fhirflat validate <file_name> -c "zip"
```

The output folder of validated data will be compressed using the same format.

The equivalent library function is

```python
fhirflat.validate(<file_name>, compress_format="zip")
```
6 changes: 5 additions & 1 deletion fhirflat/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sys

from .ingest import main as ingest_to_flat
from .ingest import validate_cli as validate


def main():
Expand All @@ -10,16 +11,19 @@ def main():
Available subcommands:
transform - Convert raw data into FHIRflat files
validate - Validate FHIRflat files against FHIR schemas
"""
)
sys.exit(1)
subcommand = sys.argv[1]
if subcommand not in ["transform"]:
if subcommand not in ["transform", "validate"]:
print("fhirflat: unrecognised subcommand", subcommand)
sys.exit(1)
sys.argv = sys.argv[1:]
if subcommand == "transform":
ingest_to_flat()
elif subcommand == "validate":
validate()
else:
pass

Expand Down
10 changes: 5 additions & 5 deletions fhirflat/fhir2flat.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,19 +115,19 @@ def single_or_list(x):
return df.groupby(df.index).agg(single_or_list)


def expandCoding(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
def condenseCoding(df: pd.DataFrame, column_name: str) -> pd.DataFrame:
"""
Turns a column containing a list of dictionaries with coding information into
2 columns containing a list of strings with the coding information, and the text.
[ {"system": "http://loinc.org", "code": "1234", "display": "Test"} ]
becomes
[ "http://loinc.org/1234" ], ["Test"]
[ "http://loinc.org|1234" ], ["Test"]
If a "text" field has already been provided, this overrides the display.
"""

def expand(
def condense(
row: pd.Series, column_name: str, text_present: bool = False
) -> pd.Series:
codes = row[column_name]
Expand All @@ -148,7 +148,7 @@ def expand(
if column_name.removesuffix(".coding") + ".text" in df.columns:
text_present = True

df = df.apply(lambda x: expand(x, column_name, text_present), axis=1)
df = df.apply(lambda x: condense(x, column_name, text_present), axis=1)

if not text_present:
df.insert(
Expand Down Expand Up @@ -291,7 +291,7 @@ def fhir2flat(resource: FHIRFlatBase, lists: list | None = None) -> pd.DataFrame

# expand all instances of the "coding" list
for coding in df.columns[df.columns.str.endswith("coding")]:
df = expandCoding(df, coding)
df = condenseCoding(df, coding)

# condense all references
for reference in df.columns[df.columns.str.endswith("reference")]:
Expand Down
44 changes: 4 additions & 40 deletions fhirflat/flat2fhir.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
from fhir.resources.fhirprimitiveextension import FHIRPrimitiveExtension
from fhir.resources.period import Period
from fhir.resources.quantity import Quantity
from pydantic.v1 import BaseModel
from pydantic.v1.error_wrappers import ValidationError

from .util import (
find_data_class,
get_fhirtype,
get_local_extension_type,
group_keys,
Expand All @@ -21,15 +21,15 @@ def create_codeable_concept(
) -> dict[str, list[str]]:
"""Re-creates a codeableConcept structure from the FHIRflat representation."""

# for reading in from ingestion pipeline
# for creating backbone elements
if name + ".code" in old_dict and name + ".system" in old_dict:
raw_codes: str | float | list[str | None] = old_dict.get(name + ".code")
if raw_codes is not None and not isinstance(raw_codes, list):
formatted_code = (
raw_codes if isinstance(raw_codes, str) else str(int(raw_codes))
)
codes = [old_dict[name + ".system"] + "|" + formatted_code]
elif raw_codes is None:
elif not raw_codes:
codes = raw_codes
else:
formatted_codes = [
Expand Down Expand Up @@ -174,48 +174,12 @@ def set_datatypes(k, v_dict, klass) -> dict:
return {s.split(".", 1)[1]: v_dict[s] for s in v_dict}


def find_data_class(data_class: list[BaseModel] | BaseModel, k: str) -> BaseModel:
"""
Finds the type class for item k within the data class.
Parameters
----------
data_class: list[BaseModel] or BaseModel
The data class to search within. If a list, the function will search for the
a class with a matching title to k.
k: str
The property to search for within the data class
"""

if isinstance(data_class, list):
title_matches = [k.lower() == c.schema()["title"].lower() for c in data_class]
result = [x for x, y in zip(data_class, title_matches, strict=True) if y]
if len(result) == 1:
return get_fhirtype(k)
else:
raise ValueError(f"Couldn't find a matching class for {k} in {data_class}")

else:
k_schema = data_class.schema()["properties"].get(k)

base_class = (
k_schema.get("items").get("type")
if k_schema.get("items") is not None
else k_schema.get("type")
)

if base_class is None:
assert k_schema.get("type") == "array"

base_class = [opt.get("type") for opt in k_schema["items"]["anyOf"]]
return get_fhirtype(base_class)


def expand_concepts(data: dict[str, str], data_class: type[_DomainResource]) -> dict:
"""
Combines columns containing flattened FHIR concepts back into
JSON-like structures.
"""

groups = group_keys(data.keys())
group_classes = {}

Expand Down
122 changes: 118 additions & 4 deletions fhirflat/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import dateutil.parser
import numpy as np
import pandas as pd
from pyarrow.lib import ArrowTypeError

import fhirflat
from fhirflat.util import get_local_resource, group_keys
Expand Down Expand Up @@ -437,6 +438,7 @@ def convert_data_to_flat(
mapping_files_types: tuple[dict, dict] | None = None,
sheet_id: str | None = None,
subject_id="subjid",
validate: bool = True,
compress_format: None | str = None,
):
"""
Expand Down Expand Up @@ -465,13 +467,22 @@ def convert_data_to_flat(
be named by resource, and contain the mapping for that resource.
subject_id: str
The name of the column containing the subject ID in the data file.
validate: bool
Whether to validate the FHIRflat files after creation.
compress_format: optional str
If the output folder should be zipped, and if so with what format.
"""

if not mapping_files_types and not sheet_id:
raise TypeError("Either mapping_files_types or sheet_id must be provided")

if not validate:
warnings.warn(
"Validation of the FHIRflat files has been disabled. ",
UserWarning,
stacklevel=2,
)

if not os.path.exists(folder_name):
os.makedirs(folder_name)

Expand Down Expand Up @@ -522,10 +533,29 @@ def convert_data_to_flat(
else:
raise ValueError(f"Unknown mapping type {t}")

errors = resource.ingest_to_flat(
df,
os.path.join(folder_name, resource.__name__.lower()),
)
flat_nonvalidated = resource.ingest_to_flat(df)

if validate:
valid_flat, errors = resource.validate_fhirflat(flat_nonvalidated)

valid_flat.to_parquet(
f"{os.path.join(folder_name, resource.__name__.lower())}.parquet"
)
else:
errors = None
try:
flat_nonvalidated.to_parquet(
f"{os.path.join(folder_name, resource.__name__.lower())}.parquet"
)
except ArrowTypeError as e:
warnings.warn(
f"Error writing {resource.__name__.lower()}.parquet: {e}\n"
"This is likely due to a validation error, re-run without "
"--no-validate.",
UserWarning,
stacklevel=2,
)
continue

end_time = timeit.default_timer()
total_time = end_time - start_time
Expand All @@ -550,6 +580,60 @@ def convert_data_to_flat(
shutil.rmtree(folder_name)


def validate(folder_name: str, compress_format: str | None = None):
"""
Takes a folder containing (optionally compressed) FHIRflat files and validates them
against the FHIR. File names **must** correspond to the FHIR resource types they
represent. E.g. a file containing Patient resources must be named "patient.parquet".
Parameters
----------
folder_name
The path to the folder containing the FHIRflat files, or compressed file.
compress_format
The format to compress the validated files into.
"""

if Path(folder_name).is_file():
directory = Path(folder_name).with_suffix("")
shutil.unpack_archive(folder_name, extract_dir=directory)
else:
directory = folder_name

for file in Path(directory).glob("*.parquet"):
df = pd.read_parquet(file)
resource = file.stem
resource_type = get_local_resource(resource, case_insensitive=True)

valid_flat, errors = resource_type.validate_fhirflat(df, return_frames=True)

if errors is not None:

valid_flat.to_parquet(os.path.join(directory, f"{resource}_valid.parquet"))
errors.to_csv(
os.path.join(directory, f"{resource}_errors.csv"), index=False
)
error_length = len(errors)
print(
f"{error_length} rows in {file.name} have validation errors. "
f"Errors saved to {resource}_errors.csv. "
f"Valid rows saved to {resource}_valid.parquet"
)
else:
print(f"{file.name} is valid")
print("Validation complete")

if compress_format:
new_directory = str(directory) + "_validated"
shutil.make_archive(
new_directory,
format=compress_format,
root_dir=directory,
)
shutil.rmtree(directory)
print(f"Validated files saved as {new_directory}.{compress_format}")


def main():
parser = argparse.ArgumentParser(
description="Convert data to FHIRflat parquet files",
Expand Down Expand Up @@ -579,6 +663,13 @@ def main():
default="subjid",
)

parser.add_argument(
"--no-validate",
help="Do the data conversion without validation",
dest="validate",
action="store_false",
)

parser.add_argument(
"-c",
"--compress",
Expand All @@ -595,9 +686,32 @@ def main():
folder_name=args.output,
sheet_id=args.sheet_id,
subject_id=args.subject_id,
validate=args.validate,
compress_format=args.compress,
)


def validate_cli():
parser = argparse.ArgumentParser(
description="Validate FHIRflat parquet files against the FHIR schema",
prog="fhirflat validate",
)
parser.add_argument("folder", help="File path to folder containing FHIRflat files")

parser.add_argument(
"-c",
"--compress_format",
help="Format to compress the output into",
choices=["zip", "tar", "gztar", "bztar", "xztar"],
)

args = parser.parse_args()

validate(
args.folder,
compress_format=args.compress_format,
)


if __name__ == "__main__":
main()
Loading

0 comments on commit 627c07e

Please sign in to comment.