import os
import warnings
from pathlib import Path
from xml.etree import ElementTree
import dask
import dask.array as da
import numpy as np
import pandas as pd
import xarray as xr
from dtscalibration.io.utils import coords_time
from dtscalibration.io.utils import dim_attrs
from dtscalibration.io.utils import get_xml_namespace
from dtscalibration.io.utils import open_file
from dtscalibration.io.utils import ziphandle_to_filepathlist
[docs]
def read_silixa_files(
filepathlist=None,
directory=None,
zip_handle=None,
file_ext="*.xml",
timezone_netcdf="UTC",
silent=False,
load_in_memory="auto",
**kwargs,
):
"""Read a folder with measurement files from a device of the Silixa brand. Each measurement file contains
values for a
single timestep. Remember to check which timezone you are working in.
The silixa files are already timezone aware
Parameters
----------
filepathlist : list of str, optional
List of paths that point the the silixa files
directory : str, Path, optional
Path to folder
timezone_netcdf : str, optional
Timezone string of the netcdf file. UTC follows CF-conventions.
file_ext : str, optional
file extension of the measurement files
silent : bool
If set tot True, some verbose texts are not printed to stdout/screen
load_in_memory : {'auto', True, False}
If 'auto' the Stokes data is only loaded to memory for small files
kwargs : dict-like, optional
keyword-arguments are passed to DataStore initialization
Returns:
--------
datastore : DataStore
The newly created datastore.
"""
assert "timezone_input_files" not in kwargs, (
"The silixa files are " "already timezone aware"
)
if filepathlist is None and zip_handle is None:
filepathlist = sorted(Path(directory).glob(file_ext))
# Make sure that the list of files contains any files
assert len(filepathlist) >= 1, (
"No measurement files found in provided " "directory: \n" + str(directory)
)
elif filepathlist is None and zip_handle:
filepathlist = ziphandle_to_filepathlist(fh=zip_handle, extension=file_ext)
# Make sure that the list of files contains any files
assert len(filepathlist) >= 1, (
"No measurement files found in provided " "list/directory"
)
xml_version = silixa_xml_version_check(filepathlist)
if xml_version == 4:
data_vars, coords, attrs = read_silixa_files_routine_v4(
filepathlist,
timezone_netcdf=timezone_netcdf,
silent=silent,
load_in_memory=load_in_memory,
)
elif xml_version in (6, 7, 8):
data_vars, coords, attrs = read_silixa_files_routine_v6(
filepathlist,
xml_version=xml_version,
timezone_netcdf=timezone_netcdf,
silent=silent,
load_in_memory=load_in_memory,
)
else:
raise NotImplementedError(
"Silixa xml version " + f"{xml_version} not implemented"
)
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="Converting non-nanosecond precision timedelta"
)
ds = xr.Dataset(data_vars=data_vars, coords=coords, attrs=attrs, **kwargs)
return ds
def silixa_xml_version_check(filepathlist):
"""Function which tests which version of xml files have to be read.
Parameters
----------
filepathlist
Returns:
--------
"""
sep = ":"
attrs = read_silixa_attrs_singlefile(filepathlist[0], sep)
version_string = attrs["customData:SystemSettings:softwareVersion"]
# Get major version from string. Tested for Ultima v4, v6, v7 XT-DTS v6
major_version = int(version_string.replace(" ", "").split(":")[-1][0])
return major_version
def read_silixa_attrs_singlefile(filename, sep):
"""Parameters
----------
filename
sep
Returns:
--------
"""
import xmltodict
def metakey(meta, dict_to_parse, prefix):
"""Fills the metadata dictionairy with data from dict_to_parse.
The dict_to_parse is the raw data from a silixa xml-file.
dict_to_parse is a nested dictionary to represent the
different levels of hierarchy. For example,
toplevel = {lowlevel: {key: value}}.
This function returns {'toplevel:lowlevel:key': value}.
Where prefix is the flattened hierarchy.
Parameters
----------
meta : dict
the output dictionairy with prcessed metadata
dict_to_parse : dict
prefix
Returns:
--------
"""
for key in dict_to_parse:
if prefix == "":
prefix_parse = key.replace("@", "")
else:
prefix_parse = sep.join([prefix, key.replace("@", "")])
if prefix_parse == sep.join(("logData", "data")):
# skip the LAF , ST data
continue
if hasattr(dict_to_parse[key], "keys"):
# Nested dictionaries, flatten hierarchy.
meta.update(metakey(meta, dict_to_parse[key], prefix_parse))
elif isinstance(dict_to_parse[key], list):
# if the key has values for the multiple channels
for ival, val in enumerate(dict_to_parse[key]):
num_key = prefix_parse + "_" + str(ival)
meta.update(metakey(meta, val, num_key))
else:
meta[prefix_parse] = dict_to_parse[key]
return meta
with open_file(filename) as fh:
doc_ = xmltodict.parse(fh.read())
doc = doc_["wellLogs"]["wellLog"] if "wellLogs" in doc_ else doc_["logs"]["log"]
return metakey({}, doc, "")
def read_silixa_files_routine_v6( # noqa: MC0001
filepathlist,
xml_version=6,
timezone_netcdf="UTC",
silent=False,
load_in_memory="auto",
):
"""Internal routine that reads Silixa files.
Use dtscalibration.read_silixa_files function instead.
The silixa files are already timezone aware
Parameters
----------
load_in_memory
filepathlist
timezone_netcdf
silent
Returns:
--------
"""
# translate names
tld = {"ST": "st", "AST": "ast", "REV-ST": "rst", "REV-AST": "rast", "TMP": "tmp"}
# Open the first xml file using ET, get the name space and amount of data
with open_file(filepathlist[0]) as fh:
xml_tree = ElementTree.parse(fh)
namespace = get_xml_namespace(xml_tree.getroot())
logtree = xml_tree.find(f"./{namespace}log")
logdata_tree = logtree.find(f"./{namespace}logData")
# Amount of datapoints is the size of the logdata tree, corrected for
# the mnemonic list and unit list
nx = len(logdata_tree) - 2
sep = ":"
ns = {"s": namespace[1:-1]}
# Obtain metadata from the first file
attrs = read_silixa_attrs_singlefile(filepathlist[0], sep)
# Add standardised required attributes
attrs["isDoubleEnded"] = attrs["customData:isDoubleEnded"]
double_ended_flag = bool(int(attrs["isDoubleEnded"]))
attrs["forwardMeasurementChannel"] = attrs["customData:forwardMeasurementChannel"]
if double_ended_flag:
attrs["backwardMeasurementChannel"] = attrs[
"customData:reverseMeasurementChannel"
]
else:
attrs["backwardMeasurementChannel"] = "N/A"
# obtain basic data info
data_item_names = (
attrs["logData:mnemonicList"].replace(" ", "").strip(" ").split(",")
)
nitem = len(data_item_names)
ntime = len(filepathlist)
double_ended_flag = bool(int(attrs["isDoubleEnded"]))
chFW = int(attrs["forwardMeasurementChannel"]) - 1 # zero-based
chBW = (
int(attrs["backwardMeasurementChannel"]) - 1 if double_ended_flag else -1
) # zero-based or -1 if single ended
# print summary
if not silent:
print(f"{ntime} files were found, each representing a single timestep")
print(f"{nitem} recorded vars were found: " + ", ".join(data_item_names))
print(f"Recorded at {nx} points along the cable")
if double_ended_flag:
print("The measurement is double ended")
else:
print("The measurement is single ended")
# obtain timeseries from data
timeseries_loc_in_hierarchy = [
("log", "customData", "acquisitionTime"),
("log", "customData", "referenceTemperature"),
("log", "customData", "probe1Temperature"),
("log", "customData", "probe2Temperature"),
("log", "customData", "referenceProbeVoltage"),
("log", "customData", "probe1Voltage"),
("log", "customData", "probe2Voltage"),
(
"log",
"customData",
"UserConfiguration",
"ChannelConfiguration",
"AcquisitionConfiguration",
"AcquisitionTime",
"userAcquisitionTimeFW",
),
]
if double_ended_flag:
timeseries_loc_in_hierarchy.append(
(
"log",
"customData",
"UserConfiguration",
"ChannelConfiguration",
"AcquisitionConfiguration",
"AcquisitionTime",
"userAcquisitionTimeBW",
)
)
timeseries = {
item[-1]: dict(loc=item, array=np.zeros(ntime, dtype=np.float32))
for item in timeseries_loc_in_hierarchy
}
# add units to timeseries (unit of measurement)
for key, item in timeseries.items():
item["uom"] = attrs.get(f"customData:{key}:uom", "")
# Gather data
arr_path = "s:" + "/s:".join(["log", "logData", "data"])
@dask.delayed
def grab_data_per_file(file_handle):
"""Parameters
----------
file_handle
Returns:
--------
"""
with open_file(file_handle, mode="r") as f_h:
eltree = ElementTree.parse(f_h)
arr_el = eltree.findall(arr_path, namespaces=ns)
if not len(arr_el) == nx:
raise ValueError(
"Inconsistent length of x-dimension"
+ "\nCheck if files are mixed up, or if the number of "
+ "data points vary per file."
)
# remove the breaks on both sides of the string
# split the string on the comma
arr_str = [arr_eli.text[1:-1].split(",") for arr_eli in arr_el]
return np.array(arr_str, dtype=float)
data_lst_dly = [grab_data_per_file(fp) for fp in filepathlist]
data_lst = [
da.from_delayed(x, shape=(nx, nitem), dtype=float) for x in data_lst_dly
]
data_arr = da.stack(data_lst).T # .compute()
# Check whether to compute data_arr (if possible 25% faster)
data_arr_cnk = data_arr.rechunk({0: -1, 1: -1, 2: "auto"})
if load_in_memory == "auto" and data_arr_cnk.npartitions <= 5 or load_in_memory:
if not silent:
print("Reading the data from disk")
data_arr = data_arr_cnk.compute()
else:
if not silent:
print("Not reading the data from disk")
data_arr = data_arr_cnk
data_vars = {}
for name, data_arri in zip(data_item_names, data_arr):
if name == "LAF":
continue
if tld[name] in dim_attrs:
data_vars[tld[name]] = (["x", "time"], data_arri, dim_attrs[tld[name]])
else:
raise ValueError(f"Dont know what to do with the {name} data column")
# Obtaining the timeseries data (reference temperature etc)
_ts_dtype = [(k, np.float32) for k in timeseries]
_time_dtype = [
("filename_tstamp", np.int64),
("minDateTimeIndex", "<U29"),
("maxDateTimeIndex", "<U29"),
]
ts_dtype = np.dtype(_ts_dtype + _time_dtype)
@dask.delayed
def grab_timeseries_per_file(file_handle, xml_version):
"""Parameters
----------
file_handle
Returns:
--------
"""
with open_file(file_handle, mode="r") as f_h:
eltree = ElementTree.parse(f_h)
out = []
for _, v in timeseries.items():
# Get all the timeseries data
if "userAcquisitionTimeFW" in v["loc"]:
# requires two namespace searches
path1 = "s:" + "/s:".join(v["loc"][:4])
val1 = eltree.findall(path1, namespaces=ns)
path2 = "s:" + "/s:".join(v["loc"][4:6])
val2 = val1[chFW].find(path2, namespaces=ns)
out.append(val2.text)
elif "userAcquisitionTimeBW" in v["loc"]:
# requires two namespace searches
path1 = "s:" + "/s:".join(v["loc"][:4])
val1 = eltree.findall(path1, namespaces=ns)
path2 = "s:" + "/s:".join(v["loc"][4:6])
val2 = val1[chBW].find(path2, namespaces=ns)
out.append(val2.text)
else:
path = "s:" + "/s:".join(v["loc"])
val = eltree.find(path, namespaces=ns)
out.append(val.text)
# get all the time related data
startDateTimeIndex = eltree.find(
"s:log/s:startDateTimeIndex", namespaces=ns
).text
endDateTimeIndex = eltree.find(
"s:log/s:endDateTimeIndex", namespaces=ns
).text
if isinstance(file_handle, tuple):
file_name = os.path.split(file_handle[0])[-1]
else:
file_name = os.path.split(file_handle)[-1]
if xml_version == 6:
tstamp = np.int64(file_name[10:27])
elif xml_version == 7:
tstamp = np.int64(file_name[15:27])
elif xml_version == 8:
tstamp = np.int64(file_name[-23:-4].replace(".", ""))
else:
raise ValueError(f"Unknown version number: {xml_version}")
out += [tstamp, startDateTimeIndex, endDateTimeIndex]
return np.array(tuple(out), dtype=ts_dtype)
ts_lst_dly = [grab_timeseries_per_file(fp, xml_version) for fp in filepathlist]
ts_lst = [da.from_delayed(x, shape=tuple(), dtype=ts_dtype) for x in ts_lst_dly]
ts_arr = da.stack(ts_lst).compute()
for name in timeseries:
if name in dim_attrs:
data_vars[name] = (("time",), ts_arr[name], dim_attrs[name])
else:
data_vars[name] = (("time",), ts_arr[name])
# construct the coordinate dictionary
if isinstance(filepathlist[0], tuple):
filename_list = [os.path.split(f)[-1] for f, n2 in filepathlist]
else:
filename_list = [os.path.split(f)[-1] for f in filepathlist]
coords = {
"x": ("x", data_arr[0, :, 0], dim_attrs["x"]),
"filename": ("time", filename_list),
"filename_tstamp": ("time", ts_arr["filename_tstamp"]),
}
maxTimeIndex = pd.DatetimeIndex(ts_arr["maxDateTimeIndex"])
dtFW = ts_arr["userAcquisitionTimeFW"].astype("timedelta64[s]")
if not double_ended_flag:
tcoords = coords_time(
maxTimeIndex,
timezone_netcdf=timezone_netcdf,
dtFW=dtFW,
double_ended_flag=double_ended_flag,
)
else:
dtBW = ts_arr["userAcquisitionTimeBW"].astype("timedelta64[s]")
tcoords = coords_time(
maxTimeIndex,
timezone_netcdf=timezone_netcdf,
dtFW=dtFW,
dtBW=dtBW,
double_ended_flag=double_ended_flag,
)
coords.update(tcoords)
return data_vars, coords, attrs
def read_silixa_files_routine_v4( # noqa: MC0001
filepathlist, timezone_netcdf="UTC", silent=False, load_in_memory="auto"
):
"""Internal routine that reads Silixa files.
Use dtscalibration.read_silixa_files function instead.
The silixa files are already timezone aware
Parameters
----------
load_in_memory
filepathlist
timezone_netcdf
silent
Returns:
--------
"""
# translate names
tld = {"ST": "st", "AST": "ast", "REV-ST": "rst", "REV-AST": "rast", "TMP": "tmp"}
# Open the first xml file using ET, get the name space and amount of data
xml_tree = ElementTree.parse(filepathlist[0])
namespace = get_xml_namespace(xml_tree.getroot())
logtree = xml_tree.find(f"./{namespace}wellLog")
logdata_tree = logtree.find(f"./{namespace}logData")
# Amount of datapoints is the size of the logdata tree
nx = len(logdata_tree)
sep = ":"
ns = {"s": namespace[1:-1]}
# Obtain metadata from the first file
attrs = read_silixa_attrs_singlefile(filepathlist[0], sep)
# Add standardised required attributes
attrs["isDoubleEnded"] = attrs["customData:isDoubleEnded"]
double_ended_flag = bool(int(attrs["isDoubleEnded"]))
attrs["forwardMeasurementChannel"] = attrs["customData:forwardMeasurementChannel"]
if double_ended_flag:
attrs["backwardMeasurementChannel"] = attrs[
"customData:reverseMeasurementChannel"
]
else:
attrs["backwardMeasurementChannel"] = "N/A"
chFW = int(attrs["forwardMeasurementChannel"]) - 1 # zero-based
chBW = (
int(attrs["backwardMeasurementChannel"]) - 1 if double_ended_flag else -1
) # zero-based or -1 if single ended
# obtain basic data info
if double_ended_flag:
data_item_names = [attrs[f"logCurveInfo_{x}:mnemonic"] for x in range(6)]
else:
data_item_names = [attrs[f"logCurveInfo_{x}:mnemonic"] for x in range(4)]
nitem = len(data_item_names)
ntime = len(filepathlist)
# print summary
if not silent:
print(f"{ntime} files were found, each representing a single timestep")
print(f"{nitem} recorded vars were found: " + ", ".join(data_item_names))
print(f"Recorded at {nx} points along the cable")
if double_ended_flag:
print("The measurement is double ended")
else:
print("The measurement is single ended")
# obtain timeseries from data
timeseries_loc_in_hierarchy = [
("wellLog", "customData", "acquisitionTime"),
("wellLog", "customData", "referenceTemperature"),
("wellLog", "customData", "probe1Temperature"),
("wellLog", "customData", "probe2Temperature"),
("wellLog", "customData", "referenceProbeVoltage"),
("wellLog", "customData", "probe1Voltage"),
("wellLog", "customData", "probe2Voltage"),
(
"wellLog",
"customData",
"UserConfiguration",
"ChannelConfiguration",
"AcquisitionConfiguration",
"AcquisitionTime",
"userAcquisitionTimeFW",
),
]
if double_ended_flag:
timeseries_loc_in_hierarchy.append(
(
"wellLog",
"customData",
"UserConfiguration",
"ChannelConfiguration",
"AcquisitionConfiguration",
"AcquisitionTime",
"userAcquisitionTimeBW",
)
)
timeseries = {
item[-1]: dict(loc=item, array=np.zeros(ntime, dtype=np.float32))
for item in timeseries_loc_in_hierarchy
}
# add units to timeseries (unit of measurement)
for key, item in timeseries.items():
item["uom"] = attrs.get(f"customData:{key}:uom", "")
# Gather data
arr_path = "s:" + "/s:".join(["wellLog", "logData", "data"])
@dask.delayed
def grab_data_per_file(file_handle):
"""Parameters
----------
file_handle
Returns:
--------
"""
with open_file(file_handle, mode="r") as f_h:
eltree = ElementTree.parse(f_h)
arr_el = eltree.findall(arr_path, namespaces=ns)
if not len(arr_el) == nx:
raise ValueError(
"Inconsistent length of x-dimension"
+ "\nCheck if files are mixed up, or if the number of "
+ "data points vary per file."
)
# remove the breaks on both sides of the string
# split the string on the comma
arr_str = [arr_eli.text.split(",") for arr_eli in arr_el]
return np.array(arr_str, dtype=float)
data_lst_dly = [grab_data_per_file(fp) for fp in filepathlist]
data_lst = [
da.from_delayed(x, shape=(nx, nitem), dtype=float) for x in data_lst_dly
]
data_arr = da.stack(data_lst).T # .compute()
# Check whether to compute data_arr (if possible 25% faster)
data_arr_cnk = data_arr.rechunk({0: -1, 1: -1, 2: "auto"})
if load_in_memory == "auto" and data_arr_cnk.npartitions <= 5 or load_in_memory:
if not silent:
print("Reading the data from disk")
data_arr = data_arr_cnk.compute()
else:
if not silent:
print("Not reading the data from disk")
data_arr = data_arr_cnk
data_vars = {}
for name, data_arri in zip(data_item_names, data_arr):
if name == "LAF":
continue
if tld[name] in dim_attrs:
data_vars[tld[name]] = (["x", "time"], data_arri, dim_attrs[tld[name]])
else:
raise ValueError("Dont know what to do with the" " {name} data column")
# Obtaining the timeseries data (reference temperature etc)
_ts_dtype = [(k, np.float32) for k in timeseries]
_time_dtype = [
("filename_tstamp", np.int64),
("minDateTimeIndex", "<U29"),
("maxDateTimeIndex", "<U29"),
]
ts_dtype = np.dtype(_ts_dtype + _time_dtype)
@dask.delayed
def grab_timeseries_per_file(file_handle):
"""Parameters
----------
file_handle
Returns:
--------
"""
with open_file(file_handle, mode="r") as f_h:
eltree = ElementTree.parse(f_h)
out = []
for k, v in timeseries.items():
# Get all the timeseries data
if "userAcquisitionTimeFW" in v["loc"]:
# requires two namespace searches
path1 = "s:" + "/s:".join(v["loc"][:4])
val1 = eltree.findall(path1, namespaces=ns)
path2 = "s:" + "/s:".join(v["loc"][4:6])
val2 = val1[chFW].find(path2, namespaces=ns)
out.append(val2.text)
elif "userAcquisitionTimeBW" in v["loc"]:
# requires two namespace searches
path1 = "s:" + "/s:".join(v["loc"][:4])
val1 = eltree.findall(path1, namespaces=ns)
path2 = "s:" + "/s:".join(v["loc"][4:6])
val2 = val1[chBW].find(path2, namespaces=ns)
out.append(val2.text)
else:
path = "s:" + "/s:".join(v["loc"])
val = eltree.find(path, namespaces=ns)
out.append(val.text)
# get all the time related data
startDateTimeIndex = eltree.find(
"s:wellLog/s:minDateTimeIndex", namespaces=ns
).text
endDateTimeIndex = eltree.find(
"s:wellLog/s:maxDateTimeIndex", namespaces=ns
).text
if isinstance(file_handle, tuple):
file_name = os.path.split(file_handle[0])[-1]
else:
file_name = os.path.split(file_handle)[-1]
tstamp = np.int64(file_name[10:-4])
out += [tstamp, startDateTimeIndex, endDateTimeIndex]
return np.array(tuple(out), dtype=ts_dtype)
ts_lst_dly = [grab_timeseries_per_file(fp) for fp in filepathlist]
ts_lst = [da.from_delayed(x, shape=tuple(), dtype=ts_dtype) for x in ts_lst_dly]
ts_arr = da.stack(ts_lst).compute()
for name in timeseries:
if name in dim_attrs:
data_vars[name] = (("time",), ts_arr[name], dim_attrs[name])
else:
data_vars[name] = (("time",), ts_arr[name])
# construct the coordinate dictionary
coords = {
"x": ("x", data_arr[0, :, 0], dim_attrs["x"]),
"filename": ("time", [os.path.split(f)[1] for f in filepathlist]),
"filename_tstamp": ("time", ts_arr["filename_tstamp"]),
}
maxTimeIndex = pd.DatetimeIndex(ts_arr["maxDateTimeIndex"])
dtFW = ts_arr["userAcquisitionTimeFW"].astype("timedelta64[s]")
if not double_ended_flag:
tcoords = coords_time(
maxTimeIndex,
timezone_netcdf=timezone_netcdf,
dtFW=dtFW,
double_ended_flag=double_ended_flag,
)
else:
dtBW = ts_arr["userAcquisitionTimeBW"].astype("timedelta64[s]")
tcoords = coords_time(
maxTimeIndex,
timezone_netcdf=timezone_netcdf,
dtFW=dtFW,
dtBW=dtBW,
double_ended_flag=double_ended_flag,
)
coords.update(tcoords)
return data_vars, coords, attrs