diff --git a/src/dtscalibration/__init__.py b/src/dtscalibration/__init__.py index 4ecabb02..cd531011 100644 --- a/src/dtscalibration/__init__.py +++ b/src/dtscalibration/__init__.py @@ -5,12 +5,12 @@ from dtscalibration.datastore_utils import merge_double_ended from dtscalibration.datastore_utils import shift_double_ended from dtscalibration.datastore_utils import suggest_cable_shift_double_ended -from dtscalibration.io import open_datastore -from dtscalibration.io import open_mf_datastore -from dtscalibration.io import read_apsensing_files -from dtscalibration.io import read_sensornet_files -from dtscalibration.io import read_sensortran_files -from dtscalibration.io import read_silixa_files +from dtscalibration.io.apsensing import read_apsensing_files +from dtscalibration.io.datastore import open_datastore +from dtscalibration.io.datastore import open_mf_datastore +from dtscalibration.io.sensornet import read_sensornet_files +from dtscalibration.io.sensortran import read_sensortran_files +from dtscalibration.io.silixa import read_silixa_files from dtscalibration.plot import plot_accuracy from dtscalibration.plot import plot_location_residuals_double_ended from dtscalibration.plot import plot_residuals_reference_sections diff --git a/src/dtscalibration/datastore.py b/src/dtscalibration/datastore.py index d25b4ff6..2b1a6064 100644 --- a/src/dtscalibration/datastore.py +++ b/src/dtscalibration/datastore.py @@ -21,7 +21,7 @@ from dtscalibration.datastore_utils import get_params_from_pval_double_ended from dtscalibration.datastore_utils import get_params_from_pval_single_ended from dtscalibration.datastore_utils import ufunc_per_section_helper -from dtscalibration.io_utils import _dim_attrs +from dtscalibration.io.utils import _dim_attrs from dtscalibration.variance_helpers import variance_stokes_constant_helper from dtscalibration.variance_helpers import variance_stokes_exponential_helper from dtscalibration.variance_helpers import variance_stokes_linear_helper @@ -80,7 +80,13 @@ class DataStore(xr.Dataset): """ def __init__(self, *args, autofill_dim_attrs=True, **kwargs): - super().__init__(*args, **kwargs) + with warnings.catch_warnings(): + # Filter out nanosecond precision warning: no good way to avoid ATM. + warnings.filterwarnings( + "ignore", + message="Converting non-nanosecond precision timedelta values to nanosecond precision.", + ) + super().__init__(*args, **kwargs) # check order of the dimensions of the data_vars # first 'x' (if in initiated DataStore), then 'time', then the rest @@ -2256,8 +2262,8 @@ def average_single_ended( ---------- p_val : array-like, optional Define `p_val`, `p_var`, `p_cov` if you used an external function - for calibration. Has size 2 + `nt`. First value is :math:`\gamma`, - second is :math:`\Delta \\alpha`, others are :math:`C` for each + for calibration. Has size 2 + `nt`. First value is :math:`\\gamma`, + second is :math:`\\Delta \\alpha`, others are :math:`C` for each timestep. If set to False, no uncertainty in the parameters is propagated into the confidence intervals. Similar to the spec sheets of the DTS diff --git a/src/dtscalibration/datastore_utils.py b/src/dtscalibration/datastore_utils.py index 0d54dd31..71726888 100644 --- a/src/dtscalibration/datastore_utils.py +++ b/src/dtscalibration/datastore_utils.py @@ -1,3 +1,4 @@ +import warnings from typing import TYPE_CHECKING from typing import Optional from typing import Union @@ -1146,8 +1147,13 @@ def suggest_cable_shift_double_ended( rast = ds["rast"].data[:nx2] x2 = ds["x"].data[i_shift:] - i_f = np.log(st / ast) - i_b = np.log(rst / rast) + with warnings.catch_warnings(): + # Supress log(x/0) warnings. The data will result in NaNs values. + warnings.filterwarnings( + "ignore", message="invalid value encountered in log" + ) + i_f = np.log(st / ast) + i_b = np.log(rst / rast) att = (i_b - i_f) / 2 # varianble E in article diff --git a/src/dtscalibration/io.py b/src/dtscalibration/io.py deleted file mode 100644 index 413851f5..00000000 --- a/src/dtscalibration/io.py +++ /dev/null @@ -1,562 +0,0 @@ -import fnmatch -import glob -import inspect -import os -import warnings - -import numpy as np -import xarray as xr - -from dtscalibration import DataStore -from dtscalibration.io_utils import apsensing_xml_version_check -from dtscalibration.io_utils import read_apsensing_files_routine -from dtscalibration.io_utils import read_sensornet_files_routine_v3 -from dtscalibration.io_utils import read_sensortran_files_routine -from dtscalibration.io_utils import read_silixa_files_routine_v4 -from dtscalibration.io_utils import read_silixa_files_routine_v6 -from dtscalibration.io_utils import sensornet_ddf_version_check -from dtscalibration.io_utils import sensortran_binary_version_check -from dtscalibration.io_utils import silixa_xml_version_check -from dtscalibration.io_utils import ziphandle_to_filepathlist - - -def open_datastore( - filename_or_obj, - group=None, - decode_cf=True, - mask_and_scale=None, - decode_times=True, - concat_characters=True, - decode_coords=True, - engine=None, - chunks=None, - lock=None, - cache=None, - drop_variables=None, - backend_kwargs=None, - load_in_memory=False, - **kwargs, -): - """Load and decode a datastore from a file or file-like object. Most - arguments are passed to xarray.open_dataset(). - - Parameters - ---------- - filename_or_obj : str, Path, file or xarray.backends.*DataStore - Strings and Path objects are interpreted as a path to a netCDF file - or an OpenDAP URL and opened with python-netCDF4, unless the filename - ends with .gz, in which case the file is gunzipped and opened with - scipy.io.netcdf (only netCDF3 supported). File-like objects are opened - with scipy.io.netcdf (only netCDF3 supported). - group : str, optional - Path to the netCDF4 group in the given file to open (only works for - netCDF4 files). - decode_cf : bool, optional - Whether to decode these variables, assuming they were saved according - to CF conventions. - mask_and_scale : bool, optional - If True, replace array values equal to `_FillValue` with NA and scale - values according to the formula `original_values * scale_factor + - add_offset`, where `_FillValue`, `scale_factor` and `add_offset` are - taken from variable attributes (if they exist). If the `_FillValue` or - `missing_value` attribute contains multiple values a warning will be - issued and all array values matching one of the multiple values will - be replaced by NA. mask_and_scale defaults to True except for the - pseudonetcdf backend. - decode_times : bool, optional - If True, decode times encoded in the standard NetCDF datetime format - into datetime objects. Otherwise, leave them encoded as numbers. - concat_characters : bool, optional - If True, concatenate along the last dimension of character arrays to - form string arrays. Dimensions will only be concatenated over (and - removed) if they have no corresponding variable and if they are only - used as the last dimension of character arrays. - decode_coords : bool, optional - If True, decode the 'coordinates' attribute to identify coordinates in - the resulting dataset. - engine : {'netcdf4', 'scipy', 'pydap', 'h5netcdf', 'pynio', - 'pseudonetcdf'}, optional - Engine to use when reading files. If not provided, the default engine - is chosen based on available dependencies, with a preference for - 'netcdf4'. - chunks : int or dict, optional - If chunks is provided, it used to load the new dataset into dask - arrays. ``chunks={}`` loads the dataset with dask using a single - chunk for all arrays. - lock : False, True or threading.Lock, optional - If chunks is provided, this argument is passed on to - :py:func:`dask.array.from_array`. By default, a global lock is - used when reading data from netCDF files with the netcdf4 and h5netcdf - engines to avoid issues with concurrent access when using dask's - multithreaded backend. - cache : bool, optional - If True, cache data loaded from the underlying datastore in memory as - NumPy arrays when accessed to avoid reading from the underlying data- - store multiple times. Defaults to True unless you specify the `chunks` - argument to use dask, in which case it defaults to False. Does not - change the behavior of coordinates corresponding to dimensions, which - always load their data from disk into a ``pandas.Index``. - drop_variables: string or iterable, optional - A variable or list of variables to exclude from being parsed from the - dataset. This may be useful to drop variables with problems or - inconsistent values. - backend_kwargs: dictionary, optional - A dictionary of keyword arguments to pass on to the backend. This - may be useful when backend options would improve performance or - allow user control of dataset processing. - - Returns - ------- - dataset : Dataset - The newly created dataset. - - See Also - -------- - xarray.open_dataset - xarray.load_dataset - """ - - xr_kws = inspect.signature(xr.open_dataset).parameters.keys() - - ds_kwargs = {k: v for k, v in kwargs.items() if k not in xr_kws} - - if chunks is None: - chunks = {} - - with xr.open_dataset( - filename_or_obj, - group=group, - decode_cf=decode_cf, - mask_and_scale=mask_and_scale, - decode_times=decode_times, - concat_characters=concat_characters, - decode_coords=decode_coords, - engine=engine, - chunks=chunks, - lock=lock, - cache=cache, - drop_variables=drop_variables, - backend_kwargs=backend_kwargs, - ) as ds_xr: - ds = DataStore( - data_vars=ds_xr.data_vars, - coords=ds_xr.coords, - attrs=ds_xr.attrs, - **ds_kwargs, - ) - - # to support deprecated st_labels - ds = ds.rename_labels(assertion=False) - - if load_in_memory: - if "cache" in kwargs: - raise TypeError("cache has no effect in this context") - return ds.load() - - else: - return ds - - -def open_mf_datastore( - path=None, paths=None, combine="by_coords", load_in_memory=False, **kwargs -): - """ - Open a datastore from multiple netCDF files. This script assumes the - datastore was split along the time dimension. But only variables with a - time dimension should be concatenated in the time dimension. Other - options from xarray do not support this. - - Parameters - ---------- - combine : {'by_coords', 'nested'}, optional - Leave it at by_coords - path : str - A file path to the stored netcdf files with an asterisk in the - filename to list all. Ensure you have leading zeros in the file - numbering. - paths : list - Define you own list of file paths. - Returns - ------- - dataset : Dataset - The newly created dataset. - """ - from xarray.backends.api import open_mfdataset - - if paths is None: - paths = sorted(glob.glob(path)) - assert paths, "No files match found with: " + path - - with open_mfdataset(paths=paths, combine=combine, **kwargs) as xds: - ds = DataStore(data_vars=xds.data_vars, coords=xds.coords, attrs=xds.attrs) - - # to support deprecated st_labels - ds = ds.rename_labels(assertion=False) - - if load_in_memory: - if "cache" in kwargs: - raise TypeError("cache has no effect in this context") - return ds.load() - - else: - return ds - - -def read_silixa_files( - filepathlist=None, - directory=None, - zip_handle=None, - file_ext="*.xml", - timezone_netcdf="UTC", - silent=False, - load_in_memory="auto", - **kwargs, -): - """Read a folder with measurement files from a device of the Silixa brand. Each measurement file contains - values for a - single timestep. Remember to check which timezone you are working in. - - The silixa files are already timezone aware - - Parameters - ---------- - filepathlist : list of str, optional - List of paths that point the the silixa files - directory : str, Path, optional - Path to folder - timezone_netcdf : str, optional - Timezone string of the netcdf file. UTC follows CF-conventions. - file_ext : str, optional - file extension of the measurement files - silent : bool - If set tot True, some verbose texts are not printed to stdout/screen - load_in_memory : {'auto', True, False} - If 'auto' the Stokes data is only loaded to memory for small files - kwargs : dict-like, optional - keyword-arguments are passed to DataStore initialization - - Returns - ------- - datastore : DataStore - The newly created datastore. - """ - - assert "timezone_input_files" not in kwargs, ( - "The silixa files are " "already timezone aware" - ) - - if filepathlist is None and zip_handle is None: - filepathlist = sorted(glob.glob(os.path.join(directory, file_ext))) - - # Make sure that the list of files contains any files - assert ( - len(filepathlist) >= 1 - ), "No measurement files found in provided " "directory: \n" + str(directory) - - elif filepathlist is None and zip_handle: - filepathlist = ziphandle_to_filepathlist(fh=zip_handle, extension=file_ext) - - # Make sure that the list of files contains any files - assert len(filepathlist) >= 1, ( - "No measurement files found in provided " "list/directory" - ) - - xml_version = silixa_xml_version_check(filepathlist) - - if xml_version == 4: - data_vars, coords, attrs = read_silixa_files_routine_v4( - filepathlist, - timezone_netcdf=timezone_netcdf, - silent=silent, - load_in_memory=load_in_memory, - ) - - elif xml_version in (6, 7, 8): - data_vars, coords, attrs = read_silixa_files_routine_v6( - filepathlist, - xml_version=xml_version, - timezone_netcdf=timezone_netcdf, - silent=silent, - load_in_memory=load_in_memory, - ) - - else: - raise NotImplementedError( - "Silixa xml version " + f"{xml_version} not implemented" - ) - - ds = DataStore(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) - return ds - - -def read_sensortran_files( - directory, timezone_input_files="UTC", timezone_netcdf="UTC", silent=False, **kwargs -): - """Read a folder with measurement files from a device of the Sensortran - brand. Each measurement file contains values for a single timestep. Remember - to check which timezone you are working in. - - The sensortran files are already timezone aware - - Parameters - ---------- - directory : str, Path - Path to folder containing BinaryRawDTS and BinaryTemp files - timezone_input_files : str, optional - Timezone string of the measurement files. - Remember to check when measurements are taken. - Also if summertime is used. - timezone_netcdf : str, optional - Timezone string of the netcdf file. UTC follows CF-conventions. - silent : bool - If set tot True, some verbose texts are not printed to stdout/screen - kwargs : dict-like, optional - keyword-arguments are passed to DataStore initialization - - Returns - ------- - datastore : DataStore - The newly created datastore. - """ - - filepathlist_dts = sorted(glob.glob(os.path.join(directory, "*BinaryRawDTS.dat"))) - - # Make sure that the list of files contains any files - assert ( - len(filepathlist_dts) >= 1 - ), "No RawDTS measurement files found " "in provided directory: \n" + str(directory) - - filepathlist_temp = [f.replace("RawDTS", "Temp") for f in filepathlist_dts] - - for ii, fname in enumerate(filepathlist_dts): - # Check if corresponding temperature file exists - if not os.path.isfile(filepathlist_temp[ii]): - raise FileNotFoundError( - "Could not find BinaryTemp " + f"file corresponding to {fname}" - ) - - version = sensortran_binary_version_check(filepathlist_dts) - - if version == 3: - data_vars, coords, attrs = read_sensortran_files_routine( - filepathlist_dts, - filepathlist_temp, - timezone_input_files=timezone_input_files, - timezone_netcdf=timezone_netcdf, - silent=silent, - ) - else: - raise NotImplementedError( - "Sensortran binary version " + f"{version} not implemented" - ) - - ds = DataStore(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) - return ds - - -def read_apsensing_files( - filepathlist=None, - directory=None, - file_ext="*.xml", - timezone_input_files="UTC", - timezone_netcdf="UTC", - silent=False, - load_in_memory="auto", - **kwargs, -): - """Read a folder with measurement files from a device of the Sensortran - brand. Each measurement file contains values for a single timestep. - Remember to check which timezone you are working in. - - Parameters - ---------- - filepathlist : list of str, optional - List of paths that point the the silixa files - directory : str, Path, optional - Path to folder - timezone_input_files : str, optional - Timezone string of the measurement files. - Remember to check when measurements are taken. - Also if summertime is used. - timezone_netcdf : str, optional - Timezone string of the netcdf file. UTC follows CF-conventions. - file_ext : str, optional - file extension of the measurement files - silent : bool - If set tot True, some verbose texts are not printed to stdout/screen - load_in_memory : {'auto', True, False} - If 'auto' the Stokes data is only loaded to memory for small files - kwargs : dict-like, optional - keyword-arguments are passed to DataStore initialization - - Notes - ----- - Only XML files are supported for now - - Returns - ------- - datastore : DataStore - The newly created datastore. - """ - if not file_ext == "*.xml": - raise NotImplementedError("Only .xml files are supported for now") - - if filepathlist is None: - filepathlist = sorted(glob.glob(os.path.join(directory, file_ext))) - - # Make sure that the list of files contains any files - assert ( - len(filepathlist) >= 1 - ), "No measurement files found in provided " "directory: \n" + str(directory) - - # Make sure that the list of files contains any files - assert len(filepathlist) >= 1, ( - "No measurement files found in provided " "list/directory" - ) - - device = apsensing_xml_version_check(filepathlist) - - valid_devices = ["CP320"] - - if device in valid_devices: - pass - - else: - warnings.warn( - "AP sensing device {device}" - " has not been tested.\nPlease open an issue on github" - " and provide an example file" - ) - - data_vars, coords, attrs = read_apsensing_files_routine( - filepathlist, - timezone_netcdf=timezone_netcdf, - timezone_input_files=timezone_input_files, - silent=silent, - load_in_memory=load_in_memory, - ) - - ds = DataStore(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) - return ds - - -def read_sensornet_files( - filepathlist=None, - directory=None, - file_ext="*.ddf", - timezone_input_files="UTC", - timezone_netcdf="UTC", - silent=False, - add_internal_fiber_length=50.0, - fiber_length=None, - **kwargs, -): - """Read a folder with measurement files. Each measurement file contains - values for a single timestep. Remember to check which timezone - you are working in. - - Parameters - ---------- - filepathlist : list of str, optional - List of paths that point the the silixa files - directory : str, Path, optional - Path to folder - timezone_input_files : str, optional - Timezone string of the measurement files. - Remember to check when measurements are taken. - Also if summertime is used. - timezone_netcdf : str, optional - Timezone string of the netcdf file. UTC follows CF-conventions. - file_ext : str, optional - file extension of the measurement files - silent : bool - If set tot True, some verbose texts are not printed to stdout/screen - add_internal_fiber_length : float - Set to zero if only the measurements of the fiber connected to the DTS - system of interest. Set to 50 if you also want to keep the internal - reference section. - fiber_length : float - It is the fiber length between the two connector entering the DTS - device. If left to `None`, it is approximated with - `x[-1] - add_internal_fiber_length`. - kwargs : dict-like, optional - keyword-arguments are passed to DataStore initialization - - Notes - ----- - Compressed sensornet files can not be directly decoded, - because the files are encoded with encoding='windows-1252' instead of - UTF-8. - - Returns - ------- - datastore : DataStore - The newly created datastore. - """ - if filepathlist is None: - # Also look for files in sub-folders - filepathlist_unsorted = glob.glob( - os.path.join(directory, "**", file_ext), recursive=True - ) - - # Make sure that the list of files contains any files - msg = "No measurement files found in provided directory: \n" + str(directory) - assert len(filepathlist_unsorted) >= 1, msg - - # sort based on dates in filesname. A simple sorted() is not sufficient - # as month folders do not sort well - basenames = [os.path.basename(fp) for fp in filepathlist_unsorted] - dates = ["".join(bn.split(" ")[2:4]) for bn in basenames] - i_sort = np.argsort(dates) - filepathlist = [filepathlist_unsorted[i] for i in i_sort] - - # Check measurements are all from same channel - chno = [bn.split(" ")[1] for bn in basenames] - assert ( - len(set(chno)) == 1 - ), "Folder contains measurements from multiple channels" - - # Make sure that the list of files contains any files - assert len(filepathlist) >= 1, ( - "No measurement files found in provided " "list/directory" - ) - - ddf_version = sensornet_ddf_version_check(filepathlist) - - valid_versions = [ - "Halo DTS v1*", - "ORYX F/W v1.02 Oryx Data Collector v3*", - "ORYX F/W v4.00 Oryx Data Collector v3*", - "Sentinel DTS v5*", - ] - - valid = any([fnmatch.fnmatch(ddf_version, v_) for v_ in valid_versions]) - - if valid: - if fnmatch.fnmatch(ddf_version, "Halo DTS v1*"): - flip_reverse_measurements = True - elif fnmatch.fnmatch(ddf_version, "Sentinel DTS v5*"): - flip_reverse_measurements = True - else: - flip_reverse_measurements = False - - else: - flip_reverse_measurements = False - warnings.warn( - f"Sensornet .dff version {ddf_version}" - " has not been tested.\nPlease open an issue on github" - " and provide an example file" - ) - - data_vars, coords, attrs = read_sensornet_files_routine_v3( - filepathlist, - timezone_netcdf=timezone_netcdf, - timezone_input_files=timezone_input_files, - silent=silent, - add_internal_fiber_length=add_internal_fiber_length, - fiber_length=fiber_length, - flip_reverse_measurements=flip_reverse_measurements, - ) - - ds = DataStore(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) - return ds diff --git a/src/dtscalibration/io/__init__.py b/src/dtscalibration/io/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/dtscalibration/io/apsensing.py b/src/dtscalibration/io/apsensing.py new file mode 100644 index 00000000..ec013bc6 --- /dev/null +++ b/src/dtscalibration/io/apsensing.py @@ -0,0 +1,428 @@ +import os +import warnings +from pathlib import Path +from xml.etree import ElementTree + +import dask +import dask.array as da +import numpy as np +import pandas as pd + +from dtscalibration import DataStore +from dtscalibration.io.utils import dim_attrs +from dtscalibration.io.utils import get_xml_namespace +from dtscalibration.io.utils import open_file + +dim_attrs_apsensing = dict(dim_attrs) +dim_attrs_apsensing["TEMP"] = dim_attrs_apsensing.pop("tmp") +dim_attrs_apsensing["TEMP"]["name"] = "TEMP" +dim_attrs_apsensing.pop("acquisitionTime") +dim_attrs_apsensing.pop("userAcquisitionTimeFW") +dim_attrs_apsensing.pop("userAcquisitionTimeBW") + + +def read_apsensing_files( + filepathlist=None, + directory=None, + file_ext="*.xml", + timezone_input_files="UTC", + timezone_netcdf="UTC", + silent=False, + load_in_memory="auto", + **kwargs, +): + """Read a folder with measurement files from a device of the Sensortran + brand. Each measurement file contains values for a single timestep. + Remember to check which timezone you are working in. + + Parameters + ---------- + filepathlist : list of str, optional + List of paths that point the the silixa files + directory : str, Path, optional + Path to folder + timezone_input_files : str, optional + Timezone string of the measurement files. + Remember to check when measurements are taken. + Also if summertime is used. + timezone_netcdf : str, optional + Timezone string of the netcdf file. UTC follows CF-conventions. + file_ext : str, optional + file extension of the measurement files + silent : bool + If set tot True, some verbose texts are not printed to stdout/screen + load_in_memory : {'auto', True, False} + If 'auto' the Stokes data is only loaded to memory for small files + kwargs : dict-like, optional + keyword-arguments are passed to DataStore initialization + + Notes + ----- + Only XML files are supported for now + + Returns + ------- + datastore : DataStore + The newly created datastore. + """ + if not file_ext == "*.xml": + raise NotImplementedError("Only .xml files are supported for now") + + if filepathlist is None: + filepathlist = sorted(Path(directory).glob(file_ext)) + + # Make sure that the list of files contains any files + assert ( + len(filepathlist) >= 1 + ), "No measurement files found in provided " "directory: \n" + str(directory) + + # Make sure that the list of files contains any files + assert len(filepathlist) >= 1, ( + "No measurement files found in provided " "list/directory" + ) + + device = apsensing_xml_version_check(filepathlist) + + valid_devices = ["CP320"] + + if device in valid_devices: + pass + + else: + warnings.warn( + "AP sensing device {device}" + " has not been tested.\nPlease open an issue on github" + " and provide an example file" + ) + + data_vars, coords, attrs = read_apsensing_files_routine( + filepathlist, + timezone_netcdf=timezone_netcdf, + timezone_input_files=timezone_input_files, + silent=silent, + load_in_memory=load_in_memory, + ) + + ds = DataStore(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) + return ds + + +def apsensing_xml_version_check(filepathlist): + """Function which tests which version of xml files are read. + + Parameters + ---------- + filepathlist + + Returns + ------- + + """ + + sep = ":" + attrs, _ = read_apsensing_attrs_singlefile(filepathlist[0], sep) + + return attrs["wellbore:uid"] + + +def read_apsensing_files_routine( + filepathlist, + timezone_input_files="UTC", + timezone_netcdf="UTC", + silent=False, + load_in_memory="auto", +): + """ + Internal routine that reads AP Sensing files. + Use dtscalibration.read_apsensing_files function instead. + + The AP sensing files are not timezone aware + + Parameters + ---------- + filepathlist + timezone_input_files + timezone_netcdf + silent + load_in_memory + + Returns + ------- + + """ + assert ( + timezone_input_files == "UTC" and timezone_netcdf == "UTC" + ), "Only UTC timezones supported" + + # translate names + tld = {"ST": "st", "AST": "ast", "REV-ST": "rst", "REV-AST": "rast", "TEMP": "tmp"} + + # Open the first xml file using ET, get the name space and amount of data + xml_tree = ElementTree.parse(filepathlist[0]) + namespace = get_xml_namespace(xml_tree.getroot()) + + logtree = xml_tree.find( + ( + "{0}wellSet/{0}well/{0}wellboreSet/{0}wellbore" + + "/{0}wellLogSet/{0}wellLog" + ).format(namespace) + ) + logdata_tree = logtree.find(f"./{namespace}logData") + + # Amount of datapoints is the size of the logdata tree + nx = len(logdata_tree) + + sep = ":" + ns = {"s": namespace[1:-1]} + + # Obtain metadata from the first file + attrs, skip_chars = read_apsensing_attrs_singlefile(filepathlist[0], sep) + + # Add standardised required attributes + # No example of DE file available + attrs["isDoubleEnded"] = "0" + double_ended_flag = bool(int(attrs["isDoubleEnded"])) + + attrs["forwardMeasurementChannel"] = attrs[ + "wellbore:dtsMeasurementSet:dtsMeasurement:connectedToFiber:uidRef" + ] + attrs["backwardMeasurementChannel"] = "N/A" + + data_item_names = [ + attrs[f"wellbore:wellLogSet:wellLog:logCurveInfo_{x}:mnemonic"] + for x in range(0, 4) + ] + + nitem = len(data_item_names) + + ntime = len(filepathlist) + + # print summary + if not silent: + print("%s files were found, each representing a single timestep" % ntime) + print("%s recorded vars were found: " % nitem + ", ".join(data_item_names)) + print("Recorded at %s points along the cable" % nx) + + if double_ended_flag: + print("The measurement is double ended") + else: + print("The measurement is single ended") + + # Gather data + arr_path = "s:" + "/s:".join( + [ + "wellSet", + "well", + "wellboreSet", + "wellbore", + "wellLogSet", + "wellLog", + "logData", + "data", + ] + ) + + @dask.delayed + def grab_data_per_file(file_handle): + """ + + Parameters + ---------- + file_handle + + Returns + ------- + + """ + + with open_file(file_handle, mode="r") as f_h: + if skip_chars: + f_h.read(3) + eltree = ElementTree.parse(f_h) + arr_el = eltree.findall(arr_path, namespaces=ns) + + if not len(arr_el) == nx: + raise ValueError( + "Inconsistent length of x-dimension" + + "\nCheck if files are mixed up, or if the number of " + + "data points vary per file." + ) + + # remove the breaks on both sides of the string + # split the string on the comma + arr_str = [arr_eli.text.split(",") for arr_eli in arr_el] + return np.array(arr_str, dtype=float) + + data_lst_dly = [grab_data_per_file(fp) for fp in filepathlist] + + data_lst = [ + da.from_delayed(x, shape=(nx, nitem), dtype=float) for x in data_lst_dly + ] + data_arr = da.stack(data_lst).T # .compute() + + # Check whether to compute data_arr (if possible 25% faster) + data_arr_cnk = data_arr.rechunk({0: -1, 1: -1, 2: "auto"}) + if load_in_memory == "auto" and data_arr_cnk.npartitions <= 5: + if not silent: + print("Reading the data from disk") + data_arr = data_arr_cnk.compute() + elif load_in_memory: + if not silent: + print("Reading the data from disk") + data_arr = data_arr_cnk.compute() + else: + if not silent: + print("Not reading the data from disk") + data_arr = data_arr_cnk + + data_vars = {} + for name, data_arri in zip(data_item_names, data_arr): + if name == "LAF": + continue + + if tld[name] in dim_attrs_apsensing: + data_vars[tld[name]] = ( + ["x", "time"], + data_arri, + dim_attrs_apsensing[tld[name]], + ) + elif name in dim_attrs_apsensing: + data_vars[tld[name]] = (["x", "time"], data_arri, dim_attrs_apsensing[name]) + else: + raise ValueError("Dont know what to do with the" + f" {name} data column") + + _time_dtype = [("filename_tstamp", np.int64), ("acquisitionTime", "= 1, msg + + # sort based on dates in filesname. A simple sorted() is not sufficient + # as month folders do not sort well + basenames = [os.path.basename(fp) for fp in filepathlist_unsorted] + dates = ["".join(bn.split(" ")[2:4]) for bn in basenames] + i_sort = np.argsort(dates) + filepathlist = [filepathlist_unsorted[i] for i in i_sort] + + # Check measurements are all from same channel + chno = [bn.split(" ")[1] for bn in basenames] + assert ( + len(set(chno)) == 1 + ), "Folder contains measurements from multiple channels" + + # Make sure that the list of files contains any files + assert len(filepathlist) >= 1, ( + "No measurement files found in provided " "list/directory" + ) + + ddf_version = sensornet_ddf_version_check(filepathlist) + + valid_versions = [ + "Halo DTS v1*", + "ORYX F/W v1.02 Oryx Data Collector v3*", + "ORYX F/W v4.00 Oryx Data Collector v3*", + "Sentinel DTS v5*", + ] + + valid = any([fnmatch.fnmatch(ddf_version, v_) for v_ in valid_versions]) + + if valid: + if fnmatch.fnmatch(ddf_version, "Halo DTS v1*"): + flip_reverse_measurements = True + elif fnmatch.fnmatch(ddf_version, "Sentinel DTS v5*"): + flip_reverse_measurements = True + else: + flip_reverse_measurements = False + + else: + flip_reverse_measurements = False + warnings.warn( + f"Sensornet .dff version {ddf_version}" + " has not been tested.\nPlease open an issue on github" + " and provide an example file" + ) + + data_vars, coords, attrs = read_sensornet_files_routine_v3( + filepathlist, + timezone_netcdf=timezone_netcdf, + timezone_input_files=timezone_input_files, + silent=silent, + add_internal_fiber_length=add_internal_fiber_length, + fiber_length=fiber_length, + flip_reverse_measurements=flip_reverse_measurements, + ) + + ds = DataStore(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) + return ds + + +def sensornet_ddf_version_check(filepathlist): + """Function which checks and returns the .ddf file version + + Parameters + ---------- + filepathlist + + Returns + ------- + ddf_version + + """ + + # Obtain metadata fro mthe first file + _, meta = read_sensornet_single(filepathlist[0]) + + if "Software version number" in meta.keys(): + version_string = meta["Software version number"] + else: + raise ValueError( + "Software version number could not be detected in .ddf file" + + "Either file is corrupted or not supported" + ) + + ddf_version = version_string.replace(",", ".") + + return ddf_version + + +def read_sensornet_single(filename): + """ + + Parameters + ---------- + filename + + Returns + ------- + + """ + headerlength = 26 + + # The $\circ$ Celsius symbol is unreadable in utf8 + with open_file(filename, encoding="windows-1252") as fileobject: + filelength = sum([1 for _ in fileobject]) + datalength = filelength - headerlength + + meta = {} + with open_file(filename, encoding="windows-1252") as fileobject: + for ii in range(0, 4): + fileline = fileobject.readline().split(":\t") + meta[fileline[0]] = fileline[1].replace("\n", "") + + for ii in range(4, headerlength - 1): + fileline = fileobject.readline().split("\t") + meta[fileline[0]] = fileline[1].replace("\n", "").replace(",", ".") + + # data_names = + fileobject.readline().split("\t") + + if meta["differential loss correction"] == "single-ended": + data = { + "x": np.zeros(datalength), + "tmp": np.zeros(datalength), + "st": np.zeros(datalength), + "ast": np.zeros(datalength), + } + + for ii in range(0, datalength): + fileline = fileobject.readline().replace(",", ".").split("\t") + + data["x"][ii] = float(fileline[0]) + data["tmp"][ii] = float(fileline[1]) + data["st"][ii] = float(fileline[2]) + data["ast"][ii] = float(fileline[3]) + + elif meta["differential loss correction"] == "combined": + data = { + "x": np.zeros(datalength), + "tmp": np.zeros(datalength), + "st": np.zeros(datalength), + "ast": np.zeros(datalength), + "rst": np.zeros(datalength), + "rast": np.zeros(datalength), + } + + for ii in range(0, datalength): + fileline = fileobject.readline().replace(",", ".").split("\t") + + data["x"][ii] = float(fileline[0]) + data["tmp"][ii] = float(fileline[1]) + data["st"][ii] = float(fileline[2]) + data["ast"][ii] = float(fileline[3]) + data["rst"][ii] = float(fileline[4]) + data["rast"][ii] = float(fileline[5]) + + else: + raise ValueError( + 'unknown differential loss correction: "' + + meta["differential loss correction"] + + '"' + ) + + meta["default loss term dB per km"] = meta["default loss term (dB/km)"] + del meta["default loss term (dB/km)"] + + return data, meta + + +def read_sensornet_files_routine_v3( + filepathlist, + timezone_netcdf="UTC", + timezone_input_files="UTC", + silent=False, + add_internal_fiber_length=50.0, + fiber_length=None, + flip_reverse_measurements=False, +): + """ + Internal routine that reads Sensor files. + Use dtscalibration.read_sensornet_files function instead. + + Parameters + ---------- + filepathlist + timezone_netcdf + timezone_input_files + silent + add_internal_fiber_length : float + Set to zero if only the measurements of the fiber connected to the DTS + system of interest. Set to 50 if you also want to keep the internal + reference section. + fiber_length : float + It is the fiber length between the two connector entering the DTS + device. + + Returns + ------- + + """ + + # Obtain metadata from the first file + data, meta = read_sensornet_single(filepathlist[0]) + + # Pop keys from the meta dict which are variable over time + popkeys = ( + "T ext. ref 1 (°C)", + "T ext. ref 2 (°C)", + "T internal ref (°C)", + "date", + "time", + "gamma", + "k internal", + "k external", + ) + [meta.pop(key) for key in popkeys] + attrs = meta + + # Add standardised required attributes + if meta["differential loss correction"] == "single-ended": + attrs["isDoubleEnded"] = "0" + elif meta["differential loss correction"] == "combined": + attrs["isDoubleEnded"] = "1" + + double_ended_flag = bool(int(attrs["isDoubleEnded"])) + + attrs["forwardMeasurementChannel"] = meta["forward channel"][-1] + if double_ended_flag: + attrs["backwardMeasurementChannel"] = "N/A" + else: + attrs["backwardMeasurementChannel"] = meta["reverse channel"][-1] + + # obtain basic data info + nx = data["x"].size + + ntime = len(filepathlist) + + # chFW = int(attrs['forwardMeasurementChannel']) - 1 # zero-based + # if double_ended_flag: + # chBW = int(attrs['backwardMeasurementChannel']) - 1 # zero-based + # else: + # # no backward channel is negative value. writes better to netcdf + # chBW = -1 + + # print summary + if not silent: + print("%s files were found," % ntime + " each representing a single timestep") + print("Recorded at %s points along the cable" % nx) + + if double_ended_flag: + print("The measurement is double ended") + else: + print("The measurement is single ended") + + # Gather data + # x has already been read. should not change over time + xraw = data["x"] + + # Define all variables + referenceTemperature = np.zeros(ntime) + probe1temperature = np.zeros(ntime) + probe2temperature = np.zeros(ntime) + gamma_ddf = np.zeros(ntime) + k_internal = np.zeros(ntime) + k_external = np.zeros(ntime) + acquisitiontimeFW = np.zeros(ntime) + acquisitiontimeBW = np.zeros(ntime) + + timestamp = [""] * ntime + ST = np.zeros((nx, ntime)) + AST = np.zeros((nx, ntime)) + TMP = np.zeros((nx, ntime)) + + if double_ended_flag: + REV_ST = np.zeros((nx, ntime)) + REV_AST = np.zeros((nx, ntime)) + + for ii in range(ntime): + data, meta = read_sensornet_single(filepathlist[ii]) + + timestamp[ii] = pd.DatetimeIndex([meta["date"] + " " + meta["time"]])[0] + probe1temperature[ii] = float(meta["T ext. ref 1 (°C)"]) + probe2temperature[ii] = float(meta["T ext. ref 2 (°C)"]) + referenceTemperature[ii] = float(meta["T internal ref (°C)"]) + gamma_ddf[ii] = float(meta["gamma"]) + k_internal[ii] = float(meta["k internal"]) + k_external[ii] = float(meta["k external"]) + acquisitiontimeFW[ii] = float(meta["forward acquisition time"]) + acquisitiontimeBW[ii] = float(meta["reverse acquisition time"]) + + ST[:, ii] = data["st"] + AST[:, ii] = data["ast"] + TMP[:, ii] = data["tmp"] + + if double_ended_flag: + REV_ST[:, ii] = data["rst"] + REV_AST[:, ii] = data["rast"] + + if fiber_length is None and double_ended_flag: + fiber_length = np.max([0.0, xraw[-1] - add_internal_fiber_length]) + elif fiber_length is None and not double_ended_flag: + fiber_length = xraw[-1] + else: + pass + + assert fiber_length > 0.0, ( + "`fiber_length` is not defined. Use key" + "word argument in read function." + str(fiber_length) + ) + + fiber_start_index = (np.abs(xraw + add_internal_fiber_length)).argmin() + fiber_0_index = np.abs(xraw).argmin() + fiber_1_index = (np.abs(xraw - fiber_length)).argmin() + fiber_n_indices = fiber_1_index - fiber_0_index + fiber_n_indices_internal = fiber_0_index - fiber_start_index + if double_ended_flag: + fiber_end_index = np.min([xraw.size, fiber_1_index + fiber_n_indices_internal]) + else: + fiber_end_index = fiber_1_index + + if double_ended_flag: + if not flip_reverse_measurements: + # fiber length how the backward channel is aligned + fiber_length_raw = float(meta["fibre end"]) + fiber_bw_1_index = np.abs(xraw - fiber_length_raw).argmin() + fiber_bw_end_index = np.min( + [xraw.size, fiber_bw_1_index + (fiber_end_index - fiber_1_index)] + ) + fiber_bw_start_index = np.max( + [0, fiber_bw_1_index - fiber_n_indices - fiber_n_indices_internal] + ) + + REV_ST = REV_ST[fiber_bw_start_index:fiber_bw_end_index] + REV_AST = REV_AST[fiber_bw_start_index:fiber_bw_end_index] + + else: + # Use the fiber indices from the forward channel + n_indices_internal_left = fiber_0_index - fiber_start_index + n_indices_internal_right = np.max([0, fiber_end_index - fiber_1_index]) + n_indices_internal_shortest = np.min( + [n_indices_internal_left, n_indices_internal_right] + ) + fiber_start_index = fiber_0_index - n_indices_internal_shortest + fiber_end_index = ( + fiber_0_index + fiber_n_indices + n_indices_internal_shortest + ) + REV_ST = REV_ST[fiber_end_index:fiber_start_index:-1] + REV_AST = REV_AST[fiber_end_index:fiber_start_index:-1] + + x = xraw[fiber_start_index:fiber_end_index] + TMP = TMP[fiber_start_index:fiber_end_index] + ST = ST[fiber_start_index:fiber_end_index] + AST = AST[fiber_start_index:fiber_end_index] + + data_vars = { + "st": (["x", "time"], ST, dim_attrs["st"]), + "ast": (["x", "time"], AST, dim_attrs["ast"]), + "tmp": (["x", "time"], TMP, dim_attrs["tmp"]), + "probe1Temperature": ( + "time", + probe1temperature, + { + "name": "Probe 1 temperature", + "description": "reference probe 1 " "temperature", + "units": r"$^\circ$C", + }, + ), + "probe2Temperature": ( + "time", + probe2temperature, + { + "name": "Probe 2 temperature", + "description": "reference probe 2 " "temperature", + "units": r"$^\circ$C", + }, + ), + "referenceTemperature": ( + "time", + referenceTemperature, + { + "name": "reference temperature", + "description": "Internal reference " "temperature", + "units": r"$^\circ$C", + }, + ), + "gamma_ddf": ( + "time", + gamma_ddf, + { + "name": "gamma ddf", + "description": "machine " "calibrated gamma", + "units": "-", + }, + ), + "k_internal": ( + "time", + k_internal, + { + "name": "k internal", + "description": "machine calibrated " "internal k", + "units": "-", + }, + ), + "k_external": ( + "time", + k_external, + { + "name": "reference temperature", + "description": "machine calibrated " "external k", + "units": "-", + }, + ), + "userAcquisitionTimeFW": ( + "time", + acquisitiontimeFW, + dim_attrs["userAcquisitionTimeFW"], + ), + "userAcquisitionTimeBW": ( + "time", + acquisitiontimeBW, + dim_attrs["userAcquisitionTimeBW"], + ), + } + + if double_ended_flag: + data_vars["rst"] = (["x", "time"], REV_ST, dim_attrs["rst"]) + data_vars["rast"] = (["x", "time"], REV_AST, dim_attrs["rast"]) + + filenamelist = [os.path.split(f)[-1] for f in filepathlist] + + coords = {"x": ("x", x, dim_attrs["x"]), "filename": ("time", filenamelist)} + + dtFW = data_vars["userAcquisitionTimeFW"][1].astype("timedelta64[s]") + dtBW = data_vars["userAcquisitionTimeBW"][1].astype("timedelta64[s]") + if not double_ended_flag: + tcoords = coords_time( + np.array(timestamp).astype("datetime64[ns]"), + timezone_netcdf=timezone_netcdf, + timezone_input_files=timezone_input_files, + dtFW=dtFW, + double_ended_flag=double_ended_flag, + ) + else: + tcoords = coords_time( + np.array(timestamp).astype("datetime64[ns]"), + timezone_netcdf=timezone_netcdf, + timezone_input_files=timezone_input_files, + dtFW=dtFW, + dtBW=dtBW, + double_ended_flag=double_ended_flag, + ) + + coords.update(tcoords) + + return data_vars, coords, attrs diff --git a/src/dtscalibration/io/sensortran.py b/src/dtscalibration/io/sensortran.py new file mode 100644 index 00000000..0d2ff728 --- /dev/null +++ b/src/dtscalibration/io/sensortran.py @@ -0,0 +1,320 @@ +import struct +from pathlib import Path +from typing import Any +from typing import Union + +import numpy as np + +from dtscalibration import DataStore +from dtscalibration.io.utils import coords_time +from dtscalibration.io.utils import dim_attrs + + +def read_sensortran_files( + directory: Union[str, Path], + timezone_input_files: str = "UTC", + timezone_netcdf: str = "UTC", + silent: bool = False, + **kwargs, +) -> DataStore: + """Read a folder with measurement files from a device of the Sensortran + brand. Each measurement file contains values for a single timestep. Remember + to check which timezone you are working in. + + The sensortran files are already timezone aware + + Parameters + ---------- + directory : str, Path + Path to folder containing BinaryRawDTS and BinaryTemp files + timezone_input_files : str, optional + Timezone string of the measurement files. + Remember to check when measurements are taken. + Also if summertime is used. + timezone_netcdf : str, optional + Timezone string of the netcdf file. UTC follows CF-conventions. + silent : bool + If set tot True, some verbose texts are not printed to stdout/screen + kwargs : dict-like, optional + keyword-arguments are passed to DataStore initialization + + Returns + ------- + DataStore + The newly created datastore. + """ + + filepathlist_dts = sorted(Path(directory).glob("*BinaryRawDTS.dat")) + + # Make sure that the list of files contains any files + assert ( + len(filepathlist_dts) >= 1 + ), "No RawDTS measurement files found " "in provided directory: \n" + str(directory) + + filepathlist_temp = [ + Path(str(f).replace("RawDTS", "Temp")) for f in filepathlist_dts + ] + + for ii, fname in enumerate(filepathlist_dts): + # Check if corresponding temperature file exists + if not Path(filepathlist_temp[ii]).is_file(): + raise FileNotFoundError( + "Could not find BinaryTemp " + f"file corresponding to {fname}" + ) + + version = sensortran_binary_version_check(filepathlist_dts) + + if version == 3: + data_vars, coords, attrs = read_sensortran_files_routine( + filepathlist_dts, + filepathlist_temp, + timezone_input_files=timezone_input_files, + timezone_netcdf=timezone_netcdf, + silent=silent, + ) + else: + raise NotImplementedError( + "Sensortran binary version " + f"{version} not implemented" + ) + + ds = DataStore(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) + return ds + + +def sensortran_binary_version_check(filepathlist: list[Path]): + """Function which tests which version the sensortran binaries are. + + Parameters + ---------- + filepathlist + + Returns + ------- + + """ + fname = filepathlist[0] + + with fname.open(mode="rb") as f: + f.read(2) + version = struct.unpack(" tuple[dict[str, Any], dict[str, Any], dict]: + """ + Internal routine that reads sensortran files. + Use dtscalibration.read_sensortran_files function instead. + + The sensortran files are in UTC time + + Parameters + ---------- + filepathlist_dts + filepathlist_temp + timezone_netcdf + silent + + Returns + ------- + + """ + assert timezone_input_files == "UTC", "The sensortran files are always in UTC time." + + # Obtain metadata from the first file + data_dts, meta_dts = read_sensortran_single(filepathlist_dts[0]) + data_temp, meta_temp = read_sensortran_single(filepathlist_temp[0]) + + attrs = meta_dts + + # Add standardised required attributes + attrs["isDoubleEnded"] = "0" + + attrs["forwardMeasurementChannel"] = meta_dts["channel_id"] - 1 + attrs["backwardMeasurementChannel"] = "N/A" + + # obtain basic data info + nx = meta_temp["num_points"] + + ntime = len(filepathlist_dts) + + # print summary + if not silent: + print("%s files were found," % ntime + " each representing a single timestep") + print("Recorded at %s points along the cable" % nx) + + print("The measurement is single ended") + + # Gather data + # x has already been read. should not change over time + x = data_temp["x"] + + # Define all variables + referenceTemperature = np.zeros(ntime) + acquisitiontimeFW = np.ones(ntime) + + timestamp = [""] * ntime + ST = np.zeros((nx, ntime), dtype=np.int32) + AST = np.zeros((nx, ntime), dtype=np.int32) + TMP = np.zeros((nx, ntime)) + + ST_zero = np.zeros(ntime) + AST_zero = np.zeros(ntime) + + for ii in range(ntime): + data_dts, meta_dts = read_sensortran_single(filepathlist_dts[ii]) + data_temp, meta_temp = read_sensortran_single(filepathlist_temp[ii]) + + timestamp[ii] = data_dts["time"] + + referenceTemperature[ii] = data_temp["reference_temperature"] - 273.15 + + ST[:, ii] = data_dts["st"][:nx] + AST[:, ii] = data_dts["ast"][:nx] + # The TMP can vary by 1 or 2 datapoints, dynamically assign the values + TMP[: meta_temp["num_points"], ii] = data_temp["tmp"][:nx] + + zero_index = (meta_dts["num_points"] - nx) // 2 + ST_zero[ii] = np.mean(data_dts["st"][nx + zero_index :]) + AST_zero[ii] = np.mean(data_dts["ast"][nx + zero_index :]) + + data_vars = { + "st": (["x", "time"], ST, dim_attrs["st"]), + "ast": (["x", "time"], AST, dim_attrs["ast"]), + "tmp": ( + ["x", "time"], + TMP, + { + "name": "tmp", + "description": "Temperature calibrated by device", + "units": meta_temp["y_units"], + }, + ), + "referenceTemperature": ( + "time", + referenceTemperature, + { + "name": "reference temperature", + "description": "Internal reference " "temperature", + "units": r"$^\circ$C", + }, + ), + "st_zero": ( + ["time"], + ST_zero, + { + "name": "ST_zero", + "description": "Stokes zero count", + "units": meta_dts["y_units"], + }, + ), + "ast_zero": ( + ["time"], + AST_zero, + { + "name": "AST_zero", + "description": "anit-Stokes zero count", + "units": meta_dts["y_units"], + }, + ), + "userAcquisitionTimeFW": ( + "time", + acquisitiontimeFW, + dim_attrs["userAcquisitionTimeFW"], + ), + } + + coords = { + "x": ( + "x", + x, + { + "name": "distance", + "description": "Length along fiber", + "long_description": "Starting at connector " + "of forward channel", + "units": "m", + }, + ), + "filename": ("time", [f.name for f in filepathlist_dts]), + "filename_temp": ("time", [f.name for f in filepathlist_temp]), + } + + dtFW = data_vars["userAcquisitionTimeFW"][1].astype("timedelta64[s]") # type: ignore + + tcoords = coords_time( + np.array(timestamp).astype("datetime64[ns]"), + timezone_netcdf=timezone_netcdf, + timezone_input_files="UTC", + dtFW=dtFW, + double_ended_flag=False, + ) + + coords.update(tcoords) + + return data_vars, coords, attrs + + +def read_sensortran_single(file: Path) -> tuple[dict, dict]: + """ + Internal routine that reads a single sensortran file. + Use dtscalibration.read_sensortran_files function instead. + + Parameters + ---------- + file + + Returns + ------- + data, metadata + """ + import struct + from datetime import datetime + + meta = {} + data = {} + with file.open(mode="rb") as f: + meta["survey_type"] = struct.unpack("= 1 + ), "No measurement files found in provided " "directory: \n" + str(directory) + + elif filepathlist is None and zip_handle: + filepathlist = ziphandle_to_filepathlist(fh=zip_handle, extension=file_ext) + + # Make sure that the list of files contains any files + assert len(filepathlist) >= 1, ( + "No measurement files found in provided " "list/directory" + ) + + xml_version = silixa_xml_version_check(filepathlist) + + if xml_version == 4: + data_vars, coords, attrs = read_silixa_files_routine_v4( + filepathlist, + timezone_netcdf=timezone_netcdf, + silent=silent, + load_in_memory=load_in_memory, + ) + + elif xml_version in (6, 7, 8): + data_vars, coords, attrs = read_silixa_files_routine_v6( + filepathlist, + xml_version=xml_version, + timezone_netcdf=timezone_netcdf, + silent=silent, + load_in_memory=load_in_memory, + ) + + else: + raise NotImplementedError( + "Silixa xml version " + f"{xml_version} not implemented" + ) + + ds = DataStore(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) + return ds + + +def silixa_xml_version_check(filepathlist): + """Function which tests which version of xml files have to be read. + + Parameters + ---------- + filepathlist + + Returns + ------- + + """ + + sep = ":" + attrs = read_silixa_attrs_singlefile(filepathlist[0], sep) + + version_string = attrs["customData:SystemSettings:softwareVersion"] + + # Get major version from string. Tested for Ultima v4, v6, v7 XT-DTS v6 + major_version = int(version_string.replace(" ", "").split(":")[-1][0]) + + return major_version + + +def read_silixa_attrs_singlefile(filename, sep): + """ + + Parameters + ---------- + filename + sep + + Returns + ------- + + """ + import xmltodict + + def metakey(meta, dict_to_parse, prefix): + """ + Fills the metadata dictionairy with data from dict_to_parse. + The dict_to_parse is the raw data from a silixa xml-file. + dict_to_parse is a nested dictionary to represent the + different levels of hierarchy. For example, + toplevel = {lowlevel: {key: value}}. + This function returns {'toplevel:lowlevel:key': value}. + Where prefix is the flattened hierarchy. + + Parameters + ---------- + meta : dict + the output dictionairy with prcessed metadata + dict_to_parse : dict + prefix + + Returns + ------- + + """ + + for key in dict_to_parse: + if prefix == "": + prefix_parse = key.replace("@", "") + else: + prefix_parse = sep.join([prefix, key.replace("@", "")]) + + if prefix_parse == sep.join(("logData", "data")): + # skip the LAF , ST data + continue + + if hasattr(dict_to_parse[key], "keys"): + # Nested dictionaries, flatten hierarchy. + meta.update(metakey(meta, dict_to_parse[key], prefix_parse)) + + elif isinstance(dict_to_parse[key], list): + # if the key has values for the multiple channels + for ival, val in enumerate(dict_to_parse[key]): + num_key = prefix_parse + "_" + str(ival) + meta.update(metakey(meta, val, num_key)) + else: + meta[prefix_parse] = dict_to_parse[key] + + return meta + + with open_file(filename) as fh: + doc_ = xmltodict.parse(fh.read()) + + if "wellLogs" in doc_.keys(): + doc = doc_["wellLogs"]["wellLog"] + else: + doc = doc_["logs"]["log"] + + return metakey({}, doc, "") + + +def read_silixa_files_routine_v6( # noqa: MC0001 + filepathlist, + xml_version=6, + timezone_netcdf="UTC", + silent=False, + load_in_memory="auto", +): + """ + Internal routine that reads Silixa files. + Use dtscalibration.read_silixa_files function instead. + + The silixa files are already timezone aware + + Parameters + ---------- + load_in_memory + filepathlist + timezone_netcdf + silent + + Returns + ------- + + """ + + # translate names + tld = {"ST": "st", "AST": "ast", "REV-ST": "rst", "REV-AST": "rast", "TMP": "tmp"} + + # Open the first xml file using ET, get the name space and amount of data + + with open_file(filepathlist[0]) as fh: + xml_tree = ElementTree.parse(fh) + + namespace = get_xml_namespace(xml_tree.getroot()) + + logtree = xml_tree.find(f"./{namespace}log") + logdata_tree = logtree.find(f"./{namespace}logData") + + # Amount of datapoints is the size of the logdata tree, corrected for + # the mnemonic list and unit list + nx = len(logdata_tree) - 2 + + sep = ":" + ns = {"s": namespace[1:-1]} + + # Obtain metadata from the first file + attrs = read_silixa_attrs_singlefile(filepathlist[0], sep) + + # Add standardised required attributes + attrs["isDoubleEnded"] = attrs["customData:isDoubleEnded"] + double_ended_flag = bool(int(attrs["isDoubleEnded"])) + + attrs["forwardMeasurementChannel"] = attrs["customData:forwardMeasurementChannel"] + if double_ended_flag: + attrs["backwardMeasurementChannel"] = attrs[ + "customData:reverseMeasurementChannel" + ] + else: + attrs["backwardMeasurementChannel"] = "N/A" + + # obtain basic data info + data_item_names = ( + attrs["logData:mnemonicList"].replace(" ", "").strip(" ").split(",") + ) + nitem = len(data_item_names) + + ntime = len(filepathlist) + + double_ended_flag = bool(int(attrs["isDoubleEnded"])) + chFW = int(attrs["forwardMeasurementChannel"]) - 1 # zero-based + if double_ended_flag: + chBW = int(attrs["backwardMeasurementChannel"]) - 1 # zero-based + else: + # no backward channel is negative value. writes better to netcdf + chBW = -1 + + # print summary + if not silent: + print(f"{ntime} files were found, each representing a single timestep") + print(f"{nitem} recorded vars were found: " + ", ".join(data_item_names)) + print(f"Recorded at {nx} points along the cable") + + if double_ended_flag: + print("The measurement is double ended") + else: + print("The measurement is single ended") + + # obtain timeseries from data + timeseries_loc_in_hierarchy = [ + ("log", "customData", "acquisitionTime"), + ("log", "customData", "referenceTemperature"), + ("log", "customData", "probe1Temperature"), + ("log", "customData", "probe2Temperature"), + ("log", "customData", "referenceProbeVoltage"), + ("log", "customData", "probe1Voltage"), + ("log", "customData", "probe2Voltage"), + ( + "log", + "customData", + "UserConfiguration", + "ChannelConfiguration", + "AcquisitionConfiguration", + "AcquisitionTime", + "userAcquisitionTimeFW", + ), + ] + + if double_ended_flag: + timeseries_loc_in_hierarchy.append( + ( + "log", + "customData", + "UserConfiguration", + "ChannelConfiguration", + "AcquisitionConfiguration", + "AcquisitionTime", + "userAcquisitionTimeBW", + ) + ) + + timeseries = { + item[-1]: dict(loc=item, array=np.zeros(ntime, dtype=np.float32)) + for item in timeseries_loc_in_hierarchy + } + + # add units to timeseries (unit of measurement) + for key, item in timeseries.items(): + if f"customData:{key}:uom" in attrs: + item["uom"] = attrs[f"customData:{key}:uom"] + else: + item["uom"] = "" + + # Gather data + arr_path = "s:" + "/s:".join(["log", "logData", "data"]) + + @dask.delayed + def grab_data_per_file(file_handle): + """ + + Parameters + ---------- + file_handle + + Returns + ------- + + """ + with open_file(file_handle, mode="r") as f_h: + eltree = ElementTree.parse(f_h) + arr_el = eltree.findall(arr_path, namespaces=ns) + + if not len(arr_el) == nx: + raise ValueError( + "Inconsistent length of x-dimension" + + "\nCheck if files are mixed up, or if the number of " + + "data points vary per file." + ) + + # remove the breaks on both sides of the string + # split the string on the comma + arr_str = [arr_eli.text[1:-1].split(",") for arr_eli in arr_el] + + return np.array(arr_str, dtype=float) + + data_lst_dly = [grab_data_per_file(fp) for fp in filepathlist] + data_lst = [ + da.from_delayed(x, shape=(nx, nitem), dtype=float) for x in data_lst_dly + ] + data_arr = da.stack(data_lst).T # .compute() + + # Check whether to compute data_arr (if possible 25% faster) + data_arr_cnk = data_arr.rechunk({0: -1, 1: -1, 2: "auto"}) + if load_in_memory == "auto" and data_arr_cnk.npartitions <= 5: + if not silent: + print("Reading the data from disk") + data_arr = data_arr_cnk.compute() + elif load_in_memory: + if not silent: + print("Reading the data from disk") + data_arr = data_arr_cnk.compute() + else: + if not silent: + print("Not reading the data from disk") + data_arr = data_arr_cnk + + data_vars = {} + for name, data_arri in zip(data_item_names, data_arr): + if name == "LAF": + continue + if tld[name] in dim_attrs: + data_vars[tld[name]] = (["x", "time"], data_arri, dim_attrs[tld[name]]) + else: + raise ValueError(f"Dont know what to do with the {name} data column") + + # Obtaining the timeseries data (reference temperature etc) + _ts_dtype = [(k, np.float32) for k in timeseries] + _time_dtype = [ + ("filename_tstamp", np.int64), + ("minDateTimeIndex", " 0.0, ( - "`fiber_length` is not defined. Use key" - "word argument in read function." + str(fiber_length) - ) - - fiber_start_index = (np.abs(xraw + add_internal_fiber_length)).argmin() - fiber_0_index = np.abs(xraw).argmin() - fiber_1_index = (np.abs(xraw - fiber_length)).argmin() - fiber_n_indices = fiber_1_index - fiber_0_index - fiber_n_indices_internal = fiber_0_index - fiber_start_index - if double_ended_flag: - fiber_end_index = np.min([xraw.size, fiber_1_index + fiber_n_indices_internal]) - else: - fiber_end_index = fiber_1_index - - if double_ended_flag: - if not flip_reverse_measurements: - # fiber length how the backward channel is aligned - fiber_length_raw = float(meta["fibre end"]) - fiber_bw_1_index = np.abs(xraw - fiber_length_raw).argmin() - fiber_bw_end_index = np.min( - [xraw.size, fiber_bw_1_index + (fiber_end_index - fiber_1_index)] - ) - fiber_bw_start_index = np.max( - [0, fiber_bw_1_index - fiber_n_indices - fiber_n_indices_internal] - ) - - REV_ST = REV_ST[fiber_bw_start_index:fiber_bw_end_index] - REV_AST = REV_AST[fiber_bw_start_index:fiber_bw_end_index] - - else: - # Use the fiber indices from the forward channel - n_indices_internal_left = fiber_0_index - fiber_start_index - n_indices_internal_right = np.max([0, fiber_end_index - fiber_1_index]) - n_indices_internal_shortest = np.min( - [n_indices_internal_left, n_indices_internal_right] - ) - fiber_start_index = fiber_0_index - n_indices_internal_shortest - fiber_end_index = ( - fiber_0_index + fiber_n_indices + n_indices_internal_shortest - ) - REV_ST = REV_ST[fiber_end_index:fiber_start_index:-1] - REV_AST = REV_AST[fiber_end_index:fiber_start_index:-1] - - x = xraw[fiber_start_index:fiber_end_index] - TMP = TMP[fiber_start_index:fiber_end_index] - ST = ST[fiber_start_index:fiber_end_index] - AST = AST[fiber_start_index:fiber_end_index] - - data_vars = { - "st": (["x", "time"], ST, dim_attrs["st"]), - "ast": (["x", "time"], AST, dim_attrs["ast"]), - "tmp": (["x", "time"], TMP, dim_attrs["tmp"]), - "probe1Temperature": ( - "time", - probe1temperature, - { - "name": "Probe 1 temperature", - "description": "reference probe 1 " "temperature", - "units": r"$^\circ$C", - }, - ), - "probe2Temperature": ( - "time", - probe2temperature, - { - "name": "Probe 2 temperature", - "description": "reference probe 2 " "temperature", - "units": r"$^\circ$C", - }, - ), - "referenceTemperature": ( - "time", - referenceTemperature, - { - "name": "reference temperature", - "description": "Internal reference " "temperature", - "units": r"$^\circ$C", - }, - ), - "gamma_ddf": ( - "time", - gamma_ddf, - { - "name": "gamma ddf", - "description": "machine " "calibrated gamma", - "units": "-", - }, - ), - "k_internal": ( - "time", - k_internal, - { - "name": "k internal", - "description": "machine calibrated " "internal k", - "units": "-", - }, - ), - "k_external": ( - "time", - k_external, - { - "name": "reference temperature", - "description": "machine calibrated " "external k", - "units": "-", - }, - ), - "userAcquisitionTimeFW": ( - "time", - acquisitiontimeFW, - dim_attrs["userAcquisitionTimeFW"], - ), - "userAcquisitionTimeBW": ( - "time", - acquisitiontimeBW, - dim_attrs["userAcquisitionTimeBW"], - ), - } - - if double_ended_flag: - data_vars["rst"] = (["x", "time"], REV_ST, dim_attrs["rst"]) - data_vars["rast"] = (["x", "time"], REV_AST, dim_attrs["rast"]) - - filenamelist = [os.path.split(f)[-1] for f in filepathlist] - - coords = {"x": ("x", x, dim_attrs["x"]), "filename": ("time", filenamelist)} - - dtFW = data_vars["userAcquisitionTimeFW"][1].astype("timedelta64[s]") - dtBW = data_vars["userAcquisitionTimeBW"][1].astype("timedelta64[s]") - if not double_ended_flag: - tcoords = coords_time( - np.array(timestamp).astype("datetime64[ns]"), - timezone_netcdf=timezone_netcdf, - timezone_input_files=timezone_input_files, - dtFW=dtFW, - double_ended_flag=double_ended_flag, - ) - else: - tcoords = coords_time( - np.array(timestamp).astype("datetime64[ns]"), - timezone_netcdf=timezone_netcdf, - timezone_input_files=timezone_input_files, - dtFW=dtFW, - dtBW=dtBW, - double_ended_flag=double_ended_flag, - ) - - coords.update(tcoords) - - return data_vars, coords, attrs - - -def read_silixa_attrs_singlefile(filename, sep): - """ - - Parameters - ---------- - filename - sep - - Returns - ------- - - """ - import xmltodict - - def metakey(meta, dict_to_parse, prefix): - """ - Fills the metadata dictionairy with data from dict_to_parse. - The dict_to_parse is the raw data from a silixa xml-file. - dict_to_parse is a nested dictionary to represent the - different levels of hierarchy. For example, - toplevel = {lowlevel: {key: value}}. - This function returns {'toplevel:lowlevel:key': value}. - Where prefix is the flattened hierarchy. - - Parameters - ---------- - meta : dict - the output dictionairy with prcessed metadata - dict_to_parse : dict - prefix - - Returns - ------- - - """ - - for key in dict_to_parse: - if prefix == "": - prefix_parse = key.replace("@", "") - else: - prefix_parse = sep.join([prefix, key.replace("@", "")]) - - if prefix_parse == sep.join(("logData", "data")): - # skip the LAF , ST data - continue - - if hasattr(dict_to_parse[key], "keys"): - # Nested dictionaries, flatten hierarchy. - meta.update(metakey(meta, dict_to_parse[key], prefix_parse)) - - elif isinstance(dict_to_parse[key], list): - # if the key has values for the multiple channels - for ival, val in enumerate(dict_to_parse[key]): - num_key = prefix_parse + "_" + str(ival) - meta.update(metakey(meta, val, num_key)) - else: - meta[prefix_parse] = dict_to_parse[key] - - return meta - - with open_file(filename) as fh: - doc_ = xmltodict.parse(fh.read()) - - if "wellLogs" in doc_.keys(): - doc = doc_["wellLogs"]["wellLog"] - else: - doc = doc_["logs"]["log"] - - return metakey({}, doc, "") - - -def read_sensortran_files_routine( - filepathlist_dts, - filepathlist_temp, - timezone_input_files="UTC", - timezone_netcdf="UTC", - silent=False, -): - """ - Internal routine that reads sensortran files. - Use dtscalibration.read_sensortran_files function instead. - - The sensortran files are in UTC time - - Parameters - ---------- - filepathlist_dts - filepathlist_temp - timezone_netcdf - silent - - Returns - ------- - - """ - assert timezone_input_files == "UTC", "The sensortran files are always in UTC time." - - # Obtain metadata from the first file - data_dts, meta_dts = read_sensortran_single(filepathlist_dts[0]) - data_temp, meta_temp = read_sensortran_single(filepathlist_temp[0]) - - attrs = meta_dts - - # Add standardised required attributes - attrs["isDoubleEnded"] = "0" - - attrs["forwardMeasurementChannel"] = meta_dts["channel_id"] - 1 - attrs["backwardMeasurementChannel"] = "N/A" - - # obtain basic data info - nx = meta_temp["num_points"] - - ntime = len(filepathlist_dts) - - # print summary - if not silent: - print("%s files were found," % ntime + " each representing a single timestep") - print("Recorded at %s points along the cable" % nx) - - print("The measurement is single ended") - - # Gather data - # x has already been read. should not change over time - x = data_temp["x"] - - # Define all variables - referenceTemperature = np.zeros(ntime) - acquisitiontimeFW = np.ones(ntime) - - timestamp = [""] * ntime - ST = np.zeros((nx, ntime), dtype=np.int32) - AST = np.zeros((nx, ntime), dtype=np.int32) - TMP = np.zeros((nx, ntime)) - - ST_zero = np.zeros(ntime) - AST_zero = np.zeros(ntime) - - for ii in range(ntime): - data_dts, meta_dts = read_sensortran_single(filepathlist_dts[ii]) - data_temp, meta_temp = read_sensortran_single(filepathlist_temp[ii]) - - timestamp[ii] = data_dts["time"] - - referenceTemperature[ii] = data_temp["reference_temperature"] - 273.15 - - ST[:, ii] = data_dts["st"][:nx] - AST[:, ii] = data_dts["ast"][:nx] - # The TMP can vary by 1 or 2 datapoints, dynamically assign the values - TMP[: meta_temp["num_points"], ii] = data_temp["tmp"][:nx] - - zero_index = (meta_dts["num_points"] - nx) // 2 - ST_zero[ii] = np.mean(data_dts["st"][nx + zero_index :]) - AST_zero[ii] = np.mean(data_dts["ast"][nx + zero_index :]) - - data_vars = { - "st": (["x", "time"], ST, dim_attrs["st"]), - "ast": (["x", "time"], AST, dim_attrs["ast"]), - "tmp": ( - ["x", "time"], - TMP, - { - "name": "tmp", - "description": "Temperature calibrated by device", - "units": meta_temp["y_units"], - }, - ), - "referenceTemperature": ( - "time", - referenceTemperature, - { - "name": "reference temperature", - "description": "Internal reference " "temperature", - "units": r"$^\circ$C", - }, - ), - "st_zero": ( - ["time"], - ST_zero, - { - "name": "ST_zero", - "description": "Stokes zero count", - "units": meta_dts["y_units"], - }, - ), - "ast_zero": ( - ["time"], - AST_zero, - { - "name": "AST_zero", - "description": "anit-Stokes zero count", - "units": meta_dts["y_units"], - }, - ), - "userAcquisitionTimeFW": ( - "time", - acquisitiontimeFW, - dim_attrs["userAcquisitionTimeFW"], - ), - } - - filenamelist_dts = [os.path.split(f)[-1] for f in filepathlist_dts] - filenamelist_temp = [os.path.split(f)[-1] for f in filepathlist_temp] - - coords = { - "x": ( - "x", - x, - { - "name": "distance", - "description": "Length along fiber", - "long_description": "Starting at connector " + "of forward channel", - "units": "m", - }, - ), - "filename": ("time", filenamelist_dts), - "filename_temp": ("time", filenamelist_temp), - } - - dtFW = data_vars["userAcquisitionTimeFW"][1].astype("timedelta64[s]") - - tcoords = coords_time( - np.array(timestamp).astype("datetime64[ns]"), - timezone_netcdf=timezone_netcdf, - timezone_input_files="UTC", - dtFW=dtFW, - double_ended_flag=False, - ) - - coords.update(tcoords) - - return data_vars, coords, attrs - - -def read_sensortran_single(fname): - """ - Internal routine that reads a single sensortran file. - Use dtscalibration.read_sensortran_files function instead. - - Parameters - ---------- - fname - - Returns - ------- - data, metadata - """ - import struct - from datetime import datetime - - meta = {} - data = {} - with open(fname, "rb") as f: - meta["survey_type"] = struct.unpack("