Source code for dtscalibration.io.sensornet

import fnmatch
import os
import warnings
from glob import glob

import numpy as np
import pandas as pd
import xarray as xr

from dtscalibration.io.utils import coords_time
from dtscalibration.io.utils import dim_attrs
from dtscalibration.io.utils import open_file


[docs] def read_sensornet_files( filepathlist=None, directory=None, file_ext="*.ddf", timezone_input_files="UTC", timezone_netcdf="UTC", silent=False, add_internal_fiber_length=50.0, fiber_length=None, **kwargs, ): """Read a folder with measurement files. Each measurement file contains values for a single timestep. Remember to check which timezone you are working in. Parameters ---------- filepathlist : list of str, optional List of paths that point the the silixa files directory : str, Path, optional Path to folder timezone_input_files : str, optional Timezone string of the measurement files. Remember to check when measurements are taken. Also if summertime is used. timezone_netcdf : str, optional Timezone string of the netcdf file. UTC follows CF-conventions. file_ext : str, optional file extension of the measurement files silent : bool If set tot True, some verbose texts are not printed to stdout/screen add_internal_fiber_length : float Set to zero if only the measurements of the fiber connected to the DTS system of interest. Set to 50 if you also want to keep the internal reference section. fiber_length : float It is the fiber length between the two connector entering the DTS device. If left to `None`, it is approximated with `x[-1] - add_internal_fiber_length`. kwargs : dict-like, optional keyword-arguments are passed to DataStore initialization Notes: ------ Compressed sensornet files can not be directly decoded because the files are encoded with encoding='windows-1252' instead of UTF-8. Returns: -------- datastore : DataStore The newly created datastore. """ if filepathlist is None: # Also look for files in sub-folders filepathlist_unsorted = glob( os.path.join(directory, "**", file_ext), recursive=True ) # Make sure that the list of files contains any files msg = "No measurement files found in provided directory: \n" + str(directory) assert len(filepathlist_unsorted) >= 1, msg # sort based on dates in filesname. A simple sorted() is not sufficient # as month folders do not sort well basenames = [os.path.basename(fp) for fp in filepathlist_unsorted] dates = ["".join(bn.split(" ")[2:4]) for bn in basenames] i_sort = np.argsort(dates) filepathlist = [filepathlist_unsorted[i] for i in i_sort] # Check measurements are all from same channel chno = [bn.split(" ")[1] for bn in basenames] assert ( len(set(chno)) == 1 ), "Folder contains measurements from multiple channels" # Make sure that the list of files contains any files assert len(filepathlist) >= 1, ( "No measurement files found in provided " "list/directory" ) ddf_version = sensornet_ddf_version_check(filepathlist) valid_versions = [ "Halo DTS v1*", "ORYX F/W v1.02 Oryx Data Collector v3*", "ORYX F/W v4.00 Oryx Data Collector v3*", "Sentinel DTS v5*", ] valid = any([fnmatch.fnmatch(ddf_version, v_) for v_ in valid_versions]) if valid and ( fnmatch.fnmatch(ddf_version, "Halo DTS v1*") or fnmatch.fnmatch(ddf_version, "Sentinel DTS v5*") ): flip_reverse_measurements = True elif fnmatch.fnmatch(ddf_version, "ORYX F/W v4*"): flip_reverse_measurements = False else: flip_reverse_measurements = False warnings.warn( f"\n Sensornet .dff version {ddf_version}" " has not been tested.\n Please open an issue on github" " and provide an example file" ) data_vars, coords, attrs = read_sensornet_files_routine_v3( filepathlist, timezone_netcdf=timezone_netcdf, timezone_input_files=timezone_input_files, silent=silent, add_internal_fiber_length=add_internal_fiber_length, fiber_length=fiber_length, flip_reverse_measurements=flip_reverse_measurements, ) ds = xr.Dataset(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs) return ds
def sensornet_ddf_version_check(filepathlist): """Function which checks and returns the .ddf file version. Parameters ---------- filepathlist Returns: -------- ddf_version """ # Obtain metadata fro mthe first file _, meta = read_sensornet_single(filepathlist[0]) if "Software version number" in meta: version_string = meta["Software version number"] else: raise ValueError( "Software version number could not be detected in .ddf file" + "Either file is corrupted or not supported" ) ddf_version = version_string.replace(",", ".") return ddf_version def read_sensornet_single(filename): """Parameters ---------- filename Returns: -------- """ headerlength = 26 # The $\circ$ Celsius symbol is unreadable in utf8 with open_file(filename, encoding="windows-1252") as fileobject: filelength = sum([1 for _ in fileobject]) datalength = filelength - headerlength meta = {} with open_file(filename, encoding="windows-1252") as fileobject: for ii in range(0, 4): fileline = fileobject.readline().split(":\t") meta[fileline[0]] = fileline[1].replace("\n", "") for ii in range(4, headerlength - 1): fileline = fileobject.readline().split("\t") meta[fileline[0]] = fileline[1].replace("\n", "").replace(",", ".") # data_names = fileobject.readline().split("\t") if meta["differential loss correction"] == "single-ended": data = { "x": np.zeros(datalength), "tmp": np.zeros(datalength), "st": np.zeros(datalength), "ast": np.zeros(datalength), } for ii in range(0, datalength): fileline = fileobject.readline().replace(",", ".").split("\t") data["x"][ii] = float(fileline[0]) data["tmp"][ii] = float(fileline[1]) data["st"][ii] = float(fileline[2]) data["ast"][ii] = float(fileline[3]) elif meta["differential loss correction"] == "combined": data = { "x": np.zeros(datalength), "tmp": np.zeros(datalength), "st": np.zeros(datalength), "ast": np.zeros(datalength), "rst": np.zeros(datalength), "rast": np.zeros(datalength), } for ii in range(0, datalength): fileline = fileobject.readline().replace(",", ".").split("\t") data["x"][ii] = float(fileline[0]) data["tmp"][ii] = float(fileline[1]) data["st"][ii] = float(fileline[2]) data["ast"][ii] = float(fileline[3]) data["rst"][ii] = float(fileline[4]) data["rast"][ii] = float(fileline[5]) else: raise ValueError( 'unknown differential loss correction: "' + meta["differential loss correction"] + '"' ) meta["default loss term dB per km"] = meta["default loss term (dB/km)"] del meta["default loss term (dB/km)"] return data, meta def read_sensornet_files_routine_v3( filepathlist, timezone_netcdf="UTC", timezone_input_files="UTC", silent=False, add_internal_fiber_length=50.0, fiber_length=None, flip_reverse_measurements=False, ): """Internal routine that reads Sensor files. Use dtscalibration.read_sensornet_files function instead. Parameters ---------- filepathlist timezone_netcdf timezone_input_files silent add_internal_fiber_length : float Set to zero if only the measurements of the fiber connected to the DTS system of interest. Set to 50 if you also want to keep the internal reference section. fiber_length : float It is the fiber length between the two connector entering the DTS device. Returns: -------- """ # Obtain metadata from the first file data, meta = read_sensornet_single(filepathlist[0]) # Pop keys from the meta dict which are variable over time popkeys = ( "T ext. ref 1 (°C)", "T ext. ref 2 (°C)", "T internal ref (°C)", "date", "time", "gamma", "k internal", "k external", ) [meta.pop(key) for key in popkeys] attrs = meta # Add standardised required attributes if meta["differential loss correction"] == "single-ended": attrs["isDoubleEnded"] = "0" elif meta["differential loss correction"] == "combined": attrs["isDoubleEnded"] = "1" double_ended_flag = bool(int(attrs["isDoubleEnded"])) attrs["forwardMeasurementChannel"] = meta["forward channel"][-1] if double_ended_flag: attrs["backwardMeasurementChannel"] = "N/A" else: attrs["backwardMeasurementChannel"] = meta["reverse channel"][-1] # obtain basic data info nx = data["x"].size ntime = len(filepathlist) # chFW = int(attrs['forwardMeasurementChannel']) - 1 # zero-based # if double_ended_flag: # chBW = int(attrs['backwardMeasurementChannel']) - 1 # zero-based # else: # # no backward channel is negative value. writes better to netcdf # chBW = -1 # print summary if not silent: print("%s files were found," % ntime + " each representing a single timestep") print("Recorded at %s points along the cable" % nx) if double_ended_flag: print("The measurement is double ended") else: print("The measurement is single ended") # Gather data # x has already been read. should not change over time xraw = data["x"] # Define all variables referenceTemperature = np.zeros(ntime) probe1temperature = np.zeros(ntime) probe2temperature = np.zeros(ntime) gamma_ddf = np.zeros(ntime) k_internal = np.zeros(ntime) k_external = np.zeros(ntime) acquisitiontimeFW = np.zeros(ntime) acquisitiontimeBW = np.zeros(ntime) timestamp = [""] * ntime ST = np.zeros((nx, ntime)) AST = np.zeros((nx, ntime)) TMP = np.zeros((nx, ntime)) if double_ended_flag: REV_ST = np.zeros((nx, ntime)) REV_AST = np.zeros((nx, ntime)) for ii in range(ntime): data, meta = read_sensornet_single(filepathlist[ii]) timestamp[ii] = pd.DatetimeIndex([meta["date"] + " " + meta["time"]])[0] probe1temperature[ii] = float(meta["T ext. ref 1 (°C)"]) probe2temperature[ii] = float(meta["T ext. ref 2 (°C)"]) referenceTemperature[ii] = float(meta["T internal ref (°C)"]) gamma_ddf[ii] = float(meta["gamma"]) k_internal[ii] = float(meta["k internal"]) k_external[ii] = float(meta["k external"]) acquisitiontimeFW[ii] = float(meta["forward acquisition time"]) acquisitiontimeBW[ii] = float(meta["reverse acquisition time"]) ST[:, ii] = data["st"] AST[:, ii] = data["ast"] TMP[:, ii] = data["tmp"] if double_ended_flag: REV_ST[:, ii] = data["rst"] REV_AST[:, ii] = data["rast"] if fiber_length is None and double_ended_flag: fiber_length = np.max([0.0, xraw[-1] - add_internal_fiber_length]) elif fiber_length is None and not double_ended_flag: fiber_length = xraw[-1] else: pass assert fiber_length > 0.0, ( "`fiber_length` is not defined. Use key" "word argument in read function." + str(fiber_length) ) fiber_start_index = (np.abs(xraw + add_internal_fiber_length)).argmin() fiber_0_index = np.abs(xraw).argmin() fiber_1_index = (np.abs(xraw - fiber_length)).argmin() fiber_n_indices = fiber_1_index - fiber_0_index fiber_n_indices_internal = fiber_0_index - fiber_start_index if double_ended_flag: fiber_end_index = np.min([xraw.size, fiber_1_index + fiber_n_indices_internal]) else: fiber_end_index = fiber_1_index if double_ended_flag: if not flip_reverse_measurements: # fiber length how the backward channel is aligned fiber_length_raw = float(meta["fibre end"]) fiber_bw_1_index = np.abs(xraw - fiber_length_raw).argmin() fiber_bw_end_index = np.min( [xraw.size, fiber_bw_1_index + (fiber_end_index - fiber_1_index)] ) fiber_bw_start_index = np.max( [0, fiber_bw_1_index - fiber_n_indices - fiber_n_indices_internal] ) if (fiber_end_index - fiber_start_index) == ( fiber_bw_end_index - fiber_bw_start_index ): REV_ST = REV_ST[fiber_bw_start_index:fiber_bw_end_index] REV_AST = REV_AST[fiber_bw_start_index:fiber_bw_end_index] else: REV_ST = REV_ST[fiber_start_index:fiber_end_index] REV_AST = REV_AST[fiber_start_index:fiber_end_index] else: # Use the fiber indices from the forward channel n_indices_internal_left = fiber_0_index - fiber_start_index n_indices_internal_right = np.max([0, fiber_end_index - fiber_1_index]) n_indices_internal_shortest = np.min( [n_indices_internal_left, n_indices_internal_right] ) fiber_start_index = fiber_0_index - n_indices_internal_shortest fiber_end_index = ( fiber_0_index + fiber_n_indices + n_indices_internal_shortest ) REV_ST = REV_ST[fiber_end_index:fiber_start_index:-1] REV_AST = REV_AST[fiber_end_index:fiber_start_index:-1] x = xraw[fiber_start_index:fiber_end_index] TMP = TMP[fiber_start_index:fiber_end_index] ST = ST[fiber_start_index:fiber_end_index] AST = AST[fiber_start_index:fiber_end_index] data_vars = { "st": (["x", "time"], ST, dim_attrs["st"]), "ast": (["x", "time"], AST, dim_attrs["ast"]), "tmp": (["x", "time"], TMP, dim_attrs["tmp"]), "probe1Temperature": ( "time", probe1temperature, { "name": "Probe 1 temperature", "description": "reference probe 1 " "temperature", "units": r"$^\circ$C", }, ), "probe2Temperature": ( "time", probe2temperature, { "name": "Probe 2 temperature", "description": "reference probe 2 " "temperature", "units": r"$^\circ$C", }, ), "referenceTemperature": ( "time", referenceTemperature, { "name": "reference temperature", "description": "Internal reference " "temperature", "units": r"$^\circ$C", }, ), "gamma_ddf": ( "time", gamma_ddf, { "name": "gamma ddf", "description": "machine " "calibrated gamma", "units": "-", }, ), "k_internal": ( "time", k_internal, { "name": "k internal", "description": "machine calibrated " "internal k", "units": "-", }, ), "k_external": ( "time", k_external, { "name": "reference temperature", "description": "machine calibrated " "external k", "units": "-", }, ), "userAcquisitionTimeFW": ( "time", acquisitiontimeFW, dim_attrs["userAcquisitionTimeFW"], ), "userAcquisitionTimeBW": ( "time", acquisitiontimeBW, dim_attrs["userAcquisitionTimeBW"], ), } if double_ended_flag: data_vars["rst"] = (["x", "time"], REV_ST, dim_attrs["rst"]) data_vars["rast"] = (["x", "time"], REV_AST, dim_attrs["rast"]) filenamelist = [os.path.split(f)[-1] for f in filepathlist] coords = {"x": ("x", x, dim_attrs["x"]), "filename": ("time", filenamelist)} dtFW = data_vars["userAcquisitionTimeFW"][1].astype("timedelta64[s]") dtBW = data_vars["userAcquisitionTimeBW"][1].astype("timedelta64[s]") if not double_ended_flag: tcoords = coords_time( np.array(timestamp).astype("datetime64[ns]"), timezone_netcdf=timezone_netcdf, timezone_input_files=timezone_input_files, dtFW=dtFW, double_ended_flag=double_ended_flag, ) else: tcoords = coords_time( np.array(timestamp).astype("datetime64[ns]"), timezone_netcdf=timezone_netcdf, timezone_input_files=timezone_input_files, dtFW=dtFW, dtBW=dtBW, double_ended_flag=double_ended_flag, ) coords.update(tcoords) return data_vars, coords, attrs