Source code for dtscalibration.io.apsensing

import os
import re
import warnings
from pathlib import Path
from xml.etree import ElementTree

import dask
import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr

from dtscalibration.io.utils import dim_attrs
from dtscalibration.io.utils import get_xml_namespace
from dtscalibration.io.utils import open_file

dim_attrs_apsensing = dict(dim_attrs)
dim_attrs_apsensing["TEMP"] = dim_attrs_apsensing.pop("tmp")
dim_attrs_apsensing["TEMP"]["name"] = "TEMP"
dim_attrs_apsensing.pop("acquisitionTime")
dim_attrs_apsensing.pop("userAcquisitionTimeFW")
dim_attrs_apsensing.pop("userAcquisitionTimeBW")


[docs] def read_apsensing_files( filepathlist=None, directory=None, file_ext="*.xml", timezone_input_files="UTC", timezone_netcdf="UTC", silent=False, load_in_memory="auto", load_tra_arrays=False, **kwargs, ): """Read a folder with measurement files from a device of the Sensortran brand. Each measurement file contains values for a single timestep. Remember to check which timezone you are working in. Parameters ---------- filepathlist : list of str, optional List of paths that point the the silixa files directory : str, Path, optional Path to folder timezone_input_files : str, optional Timezone string of the measurement files. Remember to check when measurements are taken. Also if summertime is used. timezone_netcdf : str, optional Timezone string of the netcdf file. UTC follows CF-conventions. file_ext : str, optional file extension of the measurement files silent : bool If set tot True, some verbose texts are not printed to stdout/screen load_in_memory : {'auto', True, False} If 'auto' the Stokes data is only loaded to memory for small files load_tra_arrays : bool If true and tra files available, the tra array data will imported. Current implementation is limited to in-memory reading only. kwargs : dict-like, optional keyword-arguments are passed to DataStore initialization Notes: ------ Only XML files are supported for now Returns: -------- datastore : DataStore The newly created datastore. """ if not file_ext == "*.xml": raise NotImplementedError("Only .xml files are supported for now") if filepathlist is None: filepathlist = sorted(Path(directory).glob(file_ext)) # Make sure that the list of files contains any files assert len(filepathlist) >= 1, ( "No measurement files found in provided " "directory: \n" + str(directory) ) # Make sure that the list of files contains any files assert len(filepathlist) >= 1, ( "No measurement files found in provided " "list/directory" ) device = apsensing_xml_version_check(filepathlist) valid_devices = ["N4386B"] if device in valid_devices: pass else: warnings.warn( "AP sensing device {device}" " has not been tested.\nPlease open an issue on github" " and provide an example file" ) data_vars, coords, attrs = read_apsensing_files_routine( filepathlist, timezone_netcdf=timezone_netcdf, timezone_input_files=timezone_input_files, silent=silent, load_in_memory=load_in_memory, ) # add .tra data if it is available tra_exists, tra_filepathlist = check_if_tra_exists(filepathlist) if tra_exists: print(".tra files are present and will be read") data_dict_list = [] for _, tra_file in enumerate(tra_filepathlist): data_dict = read_single_tra_file(tra_file, load_tra_arrays) data_dict_list.append(data_dict) data_vars = append_to_data_vars_structure( data_vars, data_dict_list, load_tra_arrays ) ds = xr.Dataset(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) return ds
def apsensing_xml_version_check(filepathlist): """Function which tests which version of xml files are read. Parameters ---------- filepathlist Returns: -------- """ sep = ":" attrs, _ = read_apsensing_attrs_singlefile(filepathlist[0], sep) deviceid_serialnb = attrs["wellbore:dtsInstalledSystemSet:dtsInstalledSystem:uid"] deviceid = deviceid_serialnb.split("-")[0] return deviceid def read_apsensing_files_routine( filepathlist, timezone_input_files="UTC", timezone_netcdf="UTC", silent=False, load_in_memory="auto", ): """Internal routine that reads AP Sensing files. Use dtscalibration.read_apsensing_files function instead. The AP sensing files are not timezone aware Parameters ---------- filepathlist timezone_input_files timezone_netcdf silent load_in_memory Returns: -------- """ assert ( timezone_input_files == "UTC" and timezone_netcdf == "UTC" ), "Only UTC timezones supported" # translate names tld = {"ST": "st", "AST": "ast", "REV-ST": "rst", "REV-AST": "rast", "TEMP": "tmp"} # Open the first xml file using ET, get the name space and amount of data xml_tree = ElementTree.parse(filepathlist[0]) namespace = get_xml_namespace(xml_tree.getroot()) logtree = xml_tree.find( ( "{0}wellSet/{0}well/{0}wellboreSet/{0}wellbore" + "/{0}wellLogSet/{0}wellLog" ).format(namespace) ) logdata_tree = logtree.find(f"./{namespace}logData") # Amount of datapoints is the size of the logdata tree nx = len(logdata_tree) sep = ":" ns = {"s": namespace[1:-1]} # Obtain metadata from the first file attrs, skip_chars = read_apsensing_attrs_singlefile(filepathlist[0], sep) # Add standardised required attributes # No example of DE file available attrs["isDoubleEnded"] = "0" double_ended_flag = bool(int(attrs["isDoubleEnded"])) attrs["forwardMeasurementChannel"] = attrs[ "wellbore:dtsMeasurementSet:dtsMeasurement:connectedToFiber:uidRef" ] attrs["backwardMeasurementChannel"] = "N/A" data_item_names = [ attrs[f"wellbore:wellLogSet:wellLog:logCurveInfo_{x}:mnemonic"] for x in range(0, 4) ] nitem = len(data_item_names) ntime = len(filepathlist) # print summary if not silent: print(f"{ntime} files were found, each representing a single timestep") print(f"{nitem} recorded vars were found: " + ", ".join(data_item_names)) print(f"Recorded at {nx} points along the cable") if double_ended_flag: print("The measurement is double ended") else: print("The measurement is single ended") # Gather data arr_path = "s:" + "/s:".join( [ "wellSet", "well", "wellboreSet", "wellbore", "wellLogSet", "wellLog", "logData", "data", ] ) @dask.delayed def grab_data_per_file(file_handle): """Parameters ---------- file_handle Returns: -------- """ with open_file(file_handle, mode="r") as f_h: if skip_chars: f_h.read(3) eltree = ElementTree.parse(f_h) arr_el = eltree.findall(arr_path, namespaces=ns) if not len(arr_el) == nx: raise ValueError( "Inconsistent length of x-dimension" + "\nCheck if files are mixed up, or if the number of " + "data points vary per file." ) # remove the breaks on both sides of the string # split the string on the comma arr_str = [arr_eli.text.split(",") for arr_eli in arr_el] return np.array(arr_str, dtype=float) data_lst_dly = [grab_data_per_file(fp) for fp in filepathlist] data_lst = [ da.from_delayed(x, shape=(nx, nitem), dtype=float) for x in data_lst_dly ] data_arr = da.stack(data_lst).T # .compute() # Check whether to compute data_arr (if possible 25% faster) data_arr_cnk = data_arr.rechunk({0: -1, 1: -1, 2: "auto"}) if load_in_memory == "auto" and data_arr_cnk.npartitions <= 5 or load_in_memory: if not silent: print("Reading the data from disk") data_arr = data_arr_cnk.compute() else: if not silent: print("Not reading the data from disk") data_arr = data_arr_cnk data_vars = {} for name, data_arri in zip(data_item_names, data_arr): if name == "LAF": continue if tld[name] in dim_attrs_apsensing: data_vars[tld[name]] = ( ["x", "time"], data_arri, dim_attrs_apsensing[tld[name]], ) elif name in dim_attrs_apsensing: data_vars[tld[name]] = (["x", "time"], data_arri, dim_attrs_apsensing[name]) else: raise ValueError("Dont know what to do with the" + f" {name} data column") _time_dtype = [("filename_tstamp", np.int64), ("acquisitionTime", "<U29")] ts_dtype = np.dtype(_time_dtype) @dask.delayed def grab_timeseries_per_file(file_handle): """Parameters ---------- file_handle Returns: -------- """ with open_file(file_handle, mode="r") as f_h: if skip_chars: f_h.read(3) eltree = ElementTree.parse(f_h) out = [] # get all the time related data creationDate = eltree.find( ( "{0}wellSet/{0}well/{0}wellboreSet" + "/{0}wellbore/{0}wellLogSet" + "/{0}wellLog/{0}creationDate" ).format(namespace) ).text if isinstance(file_handle, tuple): file_name = os.path.split(file_handle[0])[-1] else: file_name = os.path.split(file_handle)[-1] tstamp = np.int64(file_name[-20:-4]) out += [tstamp, creationDate] return np.array(tuple(out), dtype=ts_dtype) ts_lst_dly = [grab_timeseries_per_file(fp) for fp in filepathlist] ts_lst = [da.from_delayed(x, shape=tuple(), dtype=ts_dtype) for x in ts_lst_dly] ts_arr = da.stack(ts_lst).compute() data_vars["creationDate"] = ( ("time",), [pd.Timestamp(str(item[1])) for item in ts_arr], ) # construct the coordinate dictionary coords = { "x": ("x", data_arr[0, :, 0], dim_attrs_apsensing["x"]), "filename": ("time", [os.path.split(f)[1] for f in filepathlist]), "time": data_vars["creationDate"], } return data_vars, coords, attrs def read_apsensing_attrs_singlefile(filename, sep): """Parameters ---------- filename sep Returns: -------- """ from xml.parsers.expat import ExpatError import xmltodict def metakey(meta, dict_to_parse, prefix): """Fills the metadata dictionairy with data from dict_to_parse. The dict_to_parse is the raw data from a silixa xml-file. dict_to_parse is a nested dictionary to represent the different levels of hierarchy. For example, toplevel = {lowlevel: {key: value}}. This function returns {'toplevel:lowlevel:key': value}. Where prefix is the flattened hierarchy. Parameters ---------- meta : dict the output dictionairy with prcessed metadata dict_to_parse : dict prefix Returns: -------- """ for key in dict_to_parse: if prefix == "": prefix_parse = key.replace("@", "") else: prefix_parse = sep.join([prefix, key.replace("@", "")]) items = ("wellbore", "wellLogSet", "wellLog", "logData", "data") if prefix_parse == sep.join(items): continue if hasattr(dict_to_parse[key], "keys"): # Nested dictionaries, flatten hierarchy. meta.update(metakey(meta, dict_to_parse[key], prefix_parse)) elif isinstance(dict_to_parse[key], list): # if the key has values for the multiple channels for ival, val in enumerate(dict_to_parse[key]): num_key = prefix_parse + "_" + str(ival) meta.update(metakey(meta, val, num_key)) else: meta[prefix_parse] = dict_to_parse[key] return meta with open_file(filename) as fh: data = fh.read() try: doc_ = xmltodict.parse(data) skip_chars = False # the first 3 characters can be weird, skip them except ExpatError: doc_ = xmltodict.parse(data[3:]) skip_chars = True doc = doc_["WITSMLComposite"]["wellSet"]["well"]["wellboreSet"] return metakey({}, doc, ""), skip_chars def check_if_tra_exists(filepathlist): """ Using AP Sensing N4386B both POSC (.xml) export and trace (.tra) export can be used to log measurements. This function checks, whether both export options were turned on simultaneously. All files .xml and .tra must be placed in the same directory. Parameters ---------- filepathlist : list of str List of paths that point the the .xml files Notes: ------ All files .xml and .tra must be placed in the same directory. Returns: -------- tra_available : boolean, True only, when all .xml files have a corresponding .tra file sorted_tra_filepathlist . list of str if tra_available is True: This list contains a list of filepaths for the .tra file. The list is sorted the same as the input .xml filepath list, because both were generated with the sorted(...) command and are thus sorted by their identical timestamps. """ directory = Path(filepathlist[0]).parent # create list of .tra files in directory sorted_tra_filepathlist = sorted(directory.glob("*.tra")) tra_files = "\n".join( [file.name for file in sorted_tra_filepathlist] ) # make it one big string tra_timestamps = set( re.findall(r"(\d{14}).tra", tra_files) ) # find 14 digits followed by .tra xml_timestamps = "\n".join([file.name for file in filepathlist]) xml_timestamps = set( re.findall(r"(\d{14}).xml", xml_timestamps) ) # note that these are sets now diff = xml_timestamps - tra_timestamps if len(diff) == len( xml_timestamps ): # No tra data - that may be intended --> warning. msg = f"Not all .xml files have a matching .tra file.\n Missing are time following timestamps {diff}. Not loading .tra data." warnings.warn(msg) return False, [] elif len(diff) > 0: msg = f"Not all .xml files have a matching .tra file.\n Missing are time following timestamps {diff}." raise ValueError(msg) diff = tra_timestamps - xml_timestamps if len(diff) > 0: msg = f"Not all .tra files have a matching .xml file.\n Missing are time following timestamps {diff}." raise ValueError(msg) return True, sorted_tra_filepathlist def parse_tra_numbers(val: str): """parsing helper function used by function read_single_tra_file() to determine correct datatype of read string. Parameters ---------- val : str String value of tra file item Returns: -------- val : Value in correct datatype (boolean, int, float and string supported), """ if val == "True": return True if val == "False": return False if val.isdigit(): return int(val) try: # a bit ugly, but sadly there is no string.isfloat() method... return float(val) except ValueError: return val def read_single_tra_file(tra_filepath, load_tra_arrays): """ Using AP Sensing N4386B both POSC (.xml) export and trace (.tra) export can be used to log measurements. This function reads the .tra data and appends it to the dask array, which was read from the POSC export (.xml) file. .tra files contain different data then the .xml files from POSC export - more metadata - log_ratio and loss(attenuation) calculated by device - PT100 sensor data (optional only if sensors are connnected to device) Parameters ---------- tra_filepathlist: list of str List of paths that point the the .tra files load_tra_arrays: boolean If False, the array data taken along the fibre (distance, temperature, log_ratio and loss) are not imported. Notes: ------ more metadata could be read from the .tra file and stored in the dask array Returns: -------- data_dict : dict containing time series measured fibre data by distance PT100 reference as float timestamp data other metadata """ with open(tra_filepath) as f: file = f.readlines() data = [line for line in file if line != "\n"] # drops out empty lines data_dict = {} current_section = None for line_with_break in data: line = line_with_break.replace("\n", "") # drops out linebreaks if line.startswith("["): # detects new section and sets it as current section current_section = line.replace("[", "").replace("]", "") data_dict[current_section] = {} else: content = line.split(";") content = [parse_tra_numbers(val) for val in content] content_name = content[0] if ( len(content) == 2 ): # = metadata & data after trace data (optional sensors and time stamp) data_dict[current_section][content_name] = content[1] elif load_tra_arrays: # == trace data containing distance, temperature, logratio, attenuation # read only when requested by load_tra_arrays=True data_dict[current_section][content_name] = tuple(content[1:]) trace_key = [key for key in data_dict if "Trace." in key][ 0 ] # find key of trace n in "Trace.n" is unknown data_dict["trace_key"] = trace_key return data_dict def append_to_data_vars_structure(data_vars, data_dict_list, load_tra_arrays): """ append data from .tra files to data_vars structure. (The data_vars structure is later on used to initialize the x-array dataset). Parameters ---------- data_vars: dictionary containing *.xml data data_dict_list: list of dictionaries each dictionary in the list contains the data of one .tra file load_tra_arrays: boolean If False, the array data taken along the fibre (distance, temperature, log_ratio and loss) were not imported and thus not in data_dict_list Returns: -------- data_vars : dictionary containing *.xml data and *.tra data """ # First derive numer of measurements in each file (should be constant): data_dict = data_dict_list[0] tr_key = data_dict["trace_key"] n_measurements = len([key for key in data_dict[tr_key] if isinstance(key, int)]) if load_tra_arrays: # Initialize arrays distances = np.zeros((len(data_dict_list), n_measurements)) t_by_dts = np.zeros((len(data_dict_list), n_measurements)) log_ratio = np.zeros((len(data_dict_list), n_measurements)) loss = np.zeros((len(data_dict_list), n_measurements)) for idx, data_dict in enumerate(data_dict_list): # first get distance, t_by_dts, log_ratio and loss as list from dictionary tr_key = data_dict["trace_key"] data_keys = [key for key in data_dict[tr_key] if isinstance(key, int)] data = np.array([data_dict[tr_key][key] for key in data_keys]).T # Fill in pre-initialized array distances[idx] = data[0] t_by_dts[idx] = data[1] log_ratio[idx] = data[2] loss[idx] = data[3] # add log_ratio and attenaution to data_vars data_vars["log_ratio_by_dts"] = (("time", "x"), log_ratio) data_vars["loss_by_dts"] = (("time", "x"), loss) # add reference temperature data, if they exist for idx_ref_temp in range(1, 5): if f"Ref.Temperature.Sensor.{idx_ref_temp}" in data_dict[tr_key]: ref_temps = [] for _, data_dict in enumerate(data_dict_list): tr_key = data_dict["trace_key"] ref_temps.append( data_dict[tr_key][f"Ref.Temperature.Sensor.{idx_ref_temp}"] ) data_vars[f"probe{idx_ref_temp}Temperature"] = (("time",), ref_temps) # check if files match by comparing timestamps and dts temperature for idx_t in range(0, len(data_dict_list)): # check timestamps data_dict = data_dict_list[idx_t] tr_key = data_dict["trace_key"] dd_ts = pd.Timestamp( int(data_dict[tr_key]["Date.Year"]), int(data_dict[tr_key]["Date.Month"]), int(data_dict[tr_key]["Date.Day"]), int(data_dict[tr_key]["Time.Hour"]), int(data_dict[tr_key]["Time.Minute"]), int(data_dict[tr_key]["Time.Second"]), ) err_msg = f"fatal error in allocation of .xml and .tra data.\nxml file {data_vars['creationDate'][1][idx_t]}\ntra file {str(dd_ts)}\n\n" if not data_vars["creationDate"][1][idx_t] == dd_ts: raise Exception(err_msg) # check dts temperature only if loaded if load_tra_arrays: for idx_x in [0, 2, 5]: if not data_vars["tmp"][1][idx_x][idx_t] == t_by_dts[idx_t][idx_x]: # fatal error in allocation of .tra and .xml data raise Exception(err_msg) return data_vars