"""This module defines the Measurement class, the central data structure of ixdat
An ixdat Measurement is a collection of references to DataSeries and the metadata needed
to combine them, i.e. "build" the combined dataset. It has a number of general methods
to visualize and analyze the combined dataset. Measurement is also the base class for a
number of technique-specific Measurement-derived classes.
A Measurement will typically be accompanied by one or more Calibration. This module
also defines the base class for Calibration, while technique-specific Calibration
classes will be defined in the corresponding module in ./techniques/
"""
import json
import numpy as np
from .db import Saveable, PlaceHolderObject, fill_object_list
from .data_series import (
DataSeries,
TimeSeries,
ValueSeries,
ConstantValue,
append_series,
time_shifted,
get_tspans_from_mask,
)
from .projects.samples import Sample
from .projects.lablogs import LabLog
from .exporters.csv_exporter import CSVExporter
from .plotters.value_plotter import ValuePlotter
from .exceptions import BuildError, SeriesNotFoundError, TechniqueError, ReadError
from .tools import deprecate, tstamp_to_string
[docs]class Measurement(Saveable):
"""The Measurement class"""
# ------ table description class attributes --------
table_name = "measurement"
column_attrs = {
"name",
"technique",
"metadata",
"aliases",
"sample_name",
"tstamp",
}
extra_linkers = {
"component_measurements": ("measurements", "m_ids"),
"measurement_calibrations": ("calibrations", "c_ids"),
"measurement_series": ("data_series", "s_ids"),
}
child_attrs = ["component_measurements", "calibration_list", "series_list"]
# TODO: child_attrs should be derivable from extra_linkers?
# ---- measurement class attributes, can be overwritten in inheriting classes ---- #
control_technique_name = None
"""Name of the control technique primarily used to control the experiment"""
control_series_name = None
"""Name (or alias) for main time variable or main time-dependent value variable,
typically of the control technique"""
selector_name = "selector"
"""Name of the default selector"""
selection_series_names = ("file_number",)
"""Name of the default things to use to construct the selector"""
series_constructors = {
"file_number": "_build_file_number_series",
"selector": "_build_selector_series",
}
"""Series which should be constructed from other series by the specified method
and cached the first time they are looked up"""
essential_series_names = None
"""Series which should always be present"""
default_plotter = ValuePlotter
default_exporter = CSVExporter
def __init__(
self,
name,
technique=None,
metadata=None,
s_ids=None,
series_list=None,
c_ids=None,
calibration_list=None,
m_ids=None,
component_measurements=None,
aliases=None,
reader=None,
plotter=None,
exporter=None,
sample=None,
lablog=None,
tstamp=None,
):
"""initialize a measurement
Args:
name (str): The name of the measurement
metadata (dict): Free-form measurement metadata. Must be json-compatible.
technique (str): The measurement technique
s_ids (list of int): The id's of the measurement's DataSeries, if
to be loaded (instead of given directly in series_list)
series_list (list of DataSeries): The measurement's DataSeries
c_ids (list of int): The id's of the measurement's Calibrations, if
to be loaded (instead of given directly in calibration_list)
calibration_list: The measurement's Calibrations
m_ids (list of int): The id's of the component measurements, if to be
loaded. None unless this is a combined measurement (typically
corresponding to more than one file).
component_measurements (list of Measurements): The measurements of which
this measurement is a combination
aliases (dict): Alternative names for DataSeries for versatile access
reader (Reader): The file reader (None unless read from a file)
plotter (Plotter): The visualization tool for the measurement
exporter (Exporter): The exporting tool for the measurement
sample (Sample or str): The sample being measured
lablog (LabLog): The log entry with e.g. notes taken during the measurement
tstamp (float): The nominal starting time of the measurement, used for
data selection, visualization, and exporting.
"""
super().__init__()
self.name = name
self.technique = technique
self.metadata = metadata or {}
self.reader = reader
if isinstance(sample, str):
sample = Sample.load_or_make(sample)
self.sample = sample
if isinstance(lablog, str):
lablog = LabLog.load_or_make(lablog)
self.lablog = lablog
self._series_list = fill_object_list(series_list, s_ids, cls=DataSeries)
self._component_measurements = fill_object_list(
component_measurements, m_ids, cls=Measurement
)
self._calibration_list = fill_object_list(
calibration_list, c_ids, cls=Calibration
)
self._tstamp = tstamp
self._cached_series = {}
self._aliases = aliases or {}
self.plotter = plotter or self.__class__.default_plotter(measurement=self)
self.exporter = exporter or self.__class__.default_exporter(measurement=self)
# defining these methods here gets them the right docstrings :D
self.plot_measurement = self.plotter.plot_measurement
self.plot = self.plotter.plot_measurement
self.export = self.exporter.export
# TODO: ... but we need to think a bit more about how to most elegantly and
# dynamically choose plotters (Nice idea from Anna:
# https://github.com/ixdat/ixdat/issues/32)
def __str__(self):
"""Return string representation"""
tseries_to_valueseries = {}
for series in self.series_list:
if isinstance(series, TimeSeries):
if series not in tseries_to_valueseries:
tseries_to_valueseries[series] = []
else:
if series.tseries in tseries_to_valueseries:
tseries_to_valueseries[series.tseries].append(series)
else:
tseries_to_valueseries[series.tseries] = [series]
out = []
for tseries, value_serieses in tseries_to_valueseries.items():
out.append("┏ " + str(tseries))
for n, value_series in enumerate(value_serieses):
if n == len(value_serieses) - 1:
out.append("┗━ " + str(value_series))
else:
out.append("┣━ " + str(value_series))
return (
f"{self.__class__.__name__} '{self.name}' with {len(self.series_list)} "
"series\n\n"
"Series list:\n" + "\n".join(out)
)
[docs] @classmethod
def from_dict(cls, obj_as_dict):
"""Return an object of the measurement class of the right technique
Args:
obj_as_dict (dict): The full serializaiton (rows from table and aux
tables) of the measurement. obj_as_dict["technique"] specifies the
technique class to use, from TECHNIQUE_CLASSES
"""
# TODO: see if there isn't a way to put the import at the top of the module.
# see: https://github.com/ixdat/ixdat/pull/1#discussion_r546437410
from .techniques import TECHNIQUE_CLASSES
# certain objects stored in the Measurement, but only saved as their names.
# __init__() will get the object from the name, but the argument is
# called like the object either way. For example __init__() takes an argument
# called `sample` which can be an ixdat.Sample or a string interpreted as the
# name of the sample to load. Subsequently, the sample name is accessible as
# the property `sample_name`. But in the database is only saved the sample's
# name as a string with the key/column "sample_name". So
# obj_as_dict["sample_name"] needs to be renamed obj_as_dict["sample"] before
# obj_as_dict can be passed to __init__.
# TODO: This is a rather general problem (see, e.g. DataSeries.unit vs
# DataSeries.unit_name) and as such should be moved to db.Saveable
# see: https://github.com/ixdat/ixdat/pull/5#discussion_r565090372.
# Will be fixed with the table definition PR.
objects_saved_as_their_name = ["sample"]
for object_type_str in objects_saved_as_their_name:
object_name_str = object_type_str + "_name"
if object_name_str in obj_as_dict:
obj_as_dict[object_type_str] = obj_as_dict[object_name_str]
del obj_as_dict[object_name_str]
if obj_as_dict["technique"] in TECHNIQUE_CLASSES:
# This makes it so that from_dict() can be used to initiate for any more
# derived technique, so long as obj_as_dict specifies the technique name!
technique_class = TECHNIQUE_CLASSES[obj_as_dict["technique"]]
if not issubclass(technique_class, cls):
# But we never want obj_as_dict["technique"] to take us to a *less*
# specific technique, if the user has been intentional about which
# class they call `as_dict` from (e.g. via a Reader)!
technique_class = cls
else:
technique_class = cls
try:
measurement = technique_class(**obj_as_dict)
except TypeError as e:
raise TechniqueError(
"ixdat ran into an error while trying to set up an object of type "
f"{technique_class}. This usually happens when ixdat isn't able "
f"to correctly determine the measurement technique.\n"
f"The error:\n {e}\n\n" # two space are intended
"Consider passing the `technique` argument into the read() function.\n"
"The available techniques are:\n"
f" {list(TECHNIQUE_CLASSES.keys())}" # again intended
) # adding `from None` here would avoid repeating the message in `e`...
# ...but it can be useful to have the full traceback!
return measurement
[docs] @classmethod
def read(cls, path_to_file, reader=None, **kwargs):
"""Return a Measurement object from parsing a file with the specified reader
Args:
path_to_file (Path or str): The path to the file to read
reader (str or Reader class): The (name of the) reader to read the file
with. If not specified, ixdat will try to determine the reader from the
file suffix.
kwargs: key-word arguments are passed on to the reader's read() method.
"""
if not reader:
# Check if there is a default reader based on the file's suffix
from .readers.reading_tools import get_default_reader_name
reader = get_default_reader_name(path_to_file)
if not reader:
raise ValueError(
f"There is no default reader for files of the type {path_to_file}. "
"Please specify a reader to read this file."
)
if isinstance(reader, str):
# TODO: see if there isn't a way to put the import at the top of the module.
# see: https://github.com/ixdat/ixdat/pull/1#discussion_r546437471
from .readers import READER_CLASSES
reader = READER_CLASSES[reader]()
obj = reader.read(path_to_file, cls=cls, **kwargs)
if getattr(obj.__class__, "essential_series_names", None):
for series_name in obj.__class__.essential_series_names:
try:
_ = obj[series_name] # this also caches it.
except SeriesNotFoundError:
raise SeriesNotFoundError(
f"{reader} loaded without {obj.__class__.__name__} "
f"essential series '{series_name}'"
)
return obj
[docs] @classmethod
def read_url(cls, url, reader=None, **kwargs):
"""Read a url (via a temporary file) using the specified reader"""
from .readers.reading_tools import url_to_file
path_to_temp_file = url_to_file(url)
measurement = cls.read(path_to_temp_file, reader=reader, **kwargs)
path_to_temp_file.unlink()
return measurement
[docs] @classmethod
def read_set(
cls,
path_to_file_start=True,
part=None,
suffix=None,
file_list=None,
reader=None,
**kwargs,
):
"""Read and append a set of files.
Args:
path_to_file_start (Path or str): The path to the files to read including
the shared start of the file name: `Path(path_to_file).parent` is
interpreted as the folder where the file are.
`Path(path_to_file).name` is interpreted as the shared start of the files
to be appended.
Alternatively, path_to_file_start can be a folder, in which case all
files in that folder (with the specified suffix) are included.
part (Path or str): A path where the folder is the folder containing data
and the name is a part of the name of each of the files to be read and
combined.
suffix (str): If a suffix is given, only files with the specified ending are
added to the file list
file_list (list of Path): As an alternative to path_to_file_start, the
exact files to append can be specified in a list
reader (str or Reader class): The (name of the) reader to read the files with
kwargs: Key-word arguments are passed via cls.read() to the reader's read()
method, AND to cls.from_component_measurements()
"""
from .readers.reading_tools import get_file_list
file_list = file_list or get_file_list(path_to_file_start, part, suffix)
if not file_list:
raise ReadError(
"No files found! Please check that there are files satisfying:\n"
f"path_to_file_start={path_to_file_start}, part={part}, suffix={suffix}"
)
component_measurements = [
cls.read(f, reader=reader, **kwargs) for f in file_list
]
measurement = None
for meas in component_measurements:
measurement = measurement + meas if measurement else meas
return measurement
[docs] @classmethod
def from_component_measurements(
cls, component_measurements, keep_originals=True, sorted=True, **kwargs
):
"""Return a measurement with the data contained in the component measurements
TODO: This function "builds" the resulting measurement, i.e. it appends series
of the same name rather than keeping all the original copies. This should be
made more explicit, and a `build()` method should take over some of the work.
Args:
component_measurements (list of Measurement)
keep_originals: Whether to keep a list of component_measurements referenced.
This may result in redundant numpy arrays in RAM.
sorted (bool): Whether to sort the series according to time
kwargs: key-word arguments are added to the dictionary for cls.from_dict()
Returns cls: the combined measurement.
"""
# First prepare everything but the series_list in the object dictionary
obj_as_dict = component_measurements[0].as_dict()
obj_as_dict.update(kwargs)
del obj_as_dict["m_ids"], obj_as_dict["s_ids"]
if keep_originals:
obj_as_dict["component_measurements"] = component_measurements
# Now, prepare the built series. First, we loop through the component
# measurements and get all the data and metadata organized in a dictionary:
series_as_dicts = {}
tstamp = component_measurements[0].tstamp
for meas in component_measurements:
tstamp_i = meas.tstamp # save this for later.
meas.tstamp = tstamp # so that the time vectors share a t=0
for s_name in meas.series_names:
series = meas[s_name]
if s_name in series_as_dicts:
series_as_dicts[s_name]["data"] = np.append(
series_as_dicts[s_name]["data"], series.data
)
else:
series_as_dicts[s_name] = series.as_dict()
series_as_dicts[s_name]["data"] = series.data
if isinstance(series, ValueSeries):
# This will serve to match it to a TimeSeries later:
series_as_dicts[s_name]["t_name"] = series.tseries.name
meas.tstamp = tstamp_i # so it's not changed in the outer scope
# Now we make DataSeries, starting with all the TimeSeries
tseries_dict = {}
sort_indeces = {}
for name, s_as_dict in series_as_dicts.items():
if "tstamp" in s_as_dict:
if sorted:
sort_indeces[name] = np.argsort(s_as_dict["data"])
s_as_dict["data"] = s_as_dict["data"][sort_indeces[name]]
tseries_dict[name] = TimeSeries.from_dict(s_as_dict)
# And then ValueSeries, and put both in with the TimeSeries
series_list = []
for name, s_as_dict in series_as_dicts.items():
if name in tseries_dict:
series_list.append(tseries_dict[name])
elif "t_name" in s_as_dict:
tseries = tseries_dict[s_as_dict["t_name"]]
if s_as_dict["data"].shape == tseries.shape:
# Then we assume that the time and value data have lined up
# successfully! :D
if sorted:
s_as_dict["data"] = s_as_dict["data"][sort_indeces[tseries.name]]
vseries = ValueSeries(
name=name,
data=s_as_dict["data"],
unit_name=s_as_dict["unit_name"],
tseries=tseries,
)
else:
# this will be the case if vseries sharing the same tseries
# are not present in the same subset of component_measurements.
# In that case just append the vseries even though some tdata gets
# duplicated.
vseries = append_series(
[
s
for m in component_measurements
for s in m.series_list
if s.name == name
],
sorted=sorted,
)
series_list.append(vseries)
# Finally, add the series to the dictionary representation and return the object
obj_as_dict["series_list"] = series_list
return cls.from_dict(obj_as_dict)
@property
def tstamp(self):
"""Float: The unix epoch time used by the measurement as t=0"""
return self._tstamp
@tstamp.setter
def tstamp(self, tstamp):
# Resetting the tstamp needs to clear the cache, so series are returned wrt the
# new timestamp.
self.clear_cache()
self._tstamp = tstamp
@property
def yyMdd(self):
return tstamp_to_string(self.tstamp, string_format="native_date")
@property
def metadata_json_string(self):
"""Measurement metadata as a JSON-formatted string"""
return json.dumps(self.metadata, indent=4)
@property
def sample_name(self):
"""Name of the sample on which the measurement was conducted"""
if self.sample:
return self.sample.name
@property
def component_measurements(self):
"""List of the component measurements of which this measurement is a combination
For a pure measurement (not a measurement set), this is itself in a list.
"""
for i, m in enumerate(self._component_measurements):
if isinstance(m, PlaceHolderObject):
# This is where we find objects from a Backend including MemoryBackend:
self._component_measurements[i] = m.get_object()
return self._component_measurements
@property
def m_ids(self):
"""List of the id's of a combined measurement's component measurements
FIXME: m.id can be (backend, id) if it's not on the active backend.
This is as of now necessary to find it if you're only given self.as_dict()
see https://github.com/ixdat/ixdat/pull/11#discussion_r746632897
"""
if not self._component_measurements:
return None
return [m.short_identity for m in self.component_measurements]
@property
def calibration_list(self):
"""List of calibrations (with placeholders filled)"""
for i, c in enumerate(self._calibration_list):
if isinstance(c, PlaceHolderObject):
# This is where we find objects from a Backend including MemoryBackend:
self._calibration_list[i] = c.get_object()
return self._calibration_list
@property
def calibrations(self):
"""For overriding: List of calibrations with any needed manipulation done."""
return self.calibration_list
@property
def c_ids(self):
"""List of the id's of the measurement's Calibrations
FIXME: c.id can be (backend, id) if it's not on the active backend.
This is as of now necessary to find it if you're only given self.as_dict()
see https://github.com/ixdat/ixdat/pull/11#discussion_r746632897
"""
return [c.short_identity for c in self.calibration_list]
def add_calibration(self, calibration):
self._calibration_list = [calibration] + self._calibration_list
self.clear_cache()
[docs] def calibrate(self, *args, **kwargs):
"""Add a calibration of the Measurement's default calibration type
The calibration class is determined by the measurement's `technique`.
*args and **kwargs are passed to the calibration class's `__init__`.
Raises:
TechniqueError if no calibration class for the measurement's technique
"""
from .techniques import CALIBRATION_CLASSES
if self.technique in CALIBRATION_CLASSES:
calibration_class = CALIBRATION_CLASSES[self.technique]
else:
raise TechniqueError(
f"{self!r} is of technique '{self.technique}, for which there is not an "
"available default calibration. Instead, import one of the following "
"classes to initiate a calibration, and then use `add_calibration`. "
f"\nOptions: \n{CALIBRATION_CLASSES}"
)
self.add_calibration(calibration_class(*args, **kwargs))
self.clear_cache()
@property
@deprecate(
last_supported_release="0.1",
update_message=(
"At present, ixdat measurements have a `calibration_list` but no compound "
"`calibration`, and this property just returns the first from the list."
),
hard_deprecation_release=None,
)
def calibration(self):
return self.calibration_list[0]
@calibration.setter
@deprecate(
last_supported_release="0.1",
update_message=(
"Setting `calibration` is deprecated. For now it clears `calibration_list` "
"and replaces it with a single calibration. "
"Use `add_calibration()` instead."
),
hard_deprecation_release="0.3",
)
def calibration(self, calibration):
self._calibration_list = [calibration]
@property
def series_list(self):
"""List of the DataSeries containing the measurement's data"""
for i, s in enumerate(self._series_list):
if isinstance(s, PlaceHolderObject):
# This is where we find objects from a Backend including MemoryBackend:
self._series_list[i] = s.get_object()
return self._series_list
@property
def s_ids(self):
"""List of the id's of the measurement's DataSeries
FIXME: m.id can be (backend, id) if it's not on the active backend.
This is as of now necessary to find it if you're only given self.as_dict()
see https://github.com/ixdat/ixdat/pull/11#discussion_r746632897
"""
return [series.short_identity for series in self._series_list]
@property
def series_names(self):
"""Set of the names of the series in the measurement"""
return set([series.name for series in self.series_list])
@property
def value_names(self):
"""Set of the names of the VSeries in the measurement's DataSeries"""
return set([vseries.name for vseries in self.value_series])
@property
def time_names(self):
"""Set of the names of the VSeries in the measurement's DataSeries"""
return set([tseries.name for tseries in self.time_series])
@property
def value_series(self):
"""List of the VSeries in the measurement's DataSeries"""
return [series for series in self.series_list if isinstance(series, ValueSeries)]
@property
def time_series(self):
"""List of the TSeries in the measurement's DataSeries. NOT timeshifted!"""
return [series for series in self.series_list if isinstance(series, TimeSeries)]
@property
def aliases(self):
"""Dictionary of {key: series_names} pointing to where desired raw data is
TODO: get the possible aliases based on calibrations, etc, in here.
"""
return self._aliases.copy()
@property
def reverse_aliases(self):
"""{series_name: standard_names} indicating how raw data can be accessed"""
rev_aliases = {}
for name, other_names in self.aliases.items():
for other_name in other_names:
if other_name in rev_aliases:
rev_aliases[other_name].append(name)
else:
rev_aliases[other_name] = [name]
return rev_aliases
[docs] def get_series_names(self, key):
"""Return list: series names for key found by (recursive) lookup in aliases"""
keys = [key] if key in self.series_names else []
for k in self.aliases.get(key, []):
keys += self.get_series_names(k)
return keys
def __getitem__(self, key):
"""Return the built measurement DataSeries with its name specified by key
This method does the following:
0. Check that the key is a string. If a technique supports lookup of other
types, the technique class should implement that in its `__getitem__`
before calling `super().__getitem__`.
1. check if `key` is in in the cache. If so return the cached data series
2. find or build the desired data series by the first possible of:
A. Check if `key` corresponds to a method in `series_constructors`. If
so, build the data series with that method.
B. Check if the `calibration`'s `calibrate_series` returns a data series
for `key` given the data in this measurement. (Note that the
`calibration` will typically start with raw data looked C, below.)
C. Generate a list of data series and append them:
i. Check if `key` is in `aliases`. If so, append all the data series
returned for each key in `aliases[key]`.
ii. Otherwise, check if there are data series in `series_list` that
have `key` as their `name`. If so, append them.
D. Finally, check if the user is using a suffix.
i. If `key` ends with "-y" or "-v", look it up with the suffix removed.
ii. If `key` ends with "-x" or "-t", look up `key` with the suffix
removed and use instead the corresponding `tseries`.
3. Cache and return the data series found or built in (2).
Step (2) above, the searching step, is outsourced to the method
`get_series(key)`.
Notice that some calls of `__getitem__` can be recursive. For example, we
suppose that a new `ECMeasurement` is read from a source that calls raw
potential `Ewe/V`, and that this measurement is then calibrated:
>>> ec_meas = Measurement.read(...)
>>> ec_meas.aliases
{..., 'raw_potential': ['Ewe/V'], ...}
>>> ec_meas["raw_potential"] # first lookup, explained below
ValueSeries("Ewe/V", ...)
>>> ec_meas.calibrate_RE(RE_vs_RHE=0.7)
>>> ec_meas["potential"] # second lookup, explained below
ValueSeries("U_{RHE} / [V]", ...)
- The first lookup, with `key="raw_potential"`, (1) checks for
"raw_potential" in the cache, doesn't find it; then (2A) checks in
`series_constructors`, doesn't find it; (2B) asks the calibration for
"raw_potential" and doesn't get anything back; and finally (2Ci) checks
`aliases` for raw potential where it finds that "raw_potential" is called
"Ewe/V". Then it looks up again, this time with `key="Ewe/V"`, which it doesn't
find in (1) the cache, (2A) `series_consturctors`, (2B) the calibration, or
(2Ci) `aliases`, but does find in (2Cii) `series_list`. There is only one
data series named "Ewe/V" so no appending is necessary, but it does ensure that
the series has the measurement's `tstamp` before cache'ing and returning it.
Now we're back in the original lookup, from which __getitem__ (3) caches
the data series (which still has the name "Ewe/V") as "raw_potential" and
returns it.
- The second lookup, with `key="potential"`, (1) checks for "potential" in
the cache, doesn't find it; then (2A) checks in `series_constructors`,
doesn't find it; and then (2B) asks the calibration for "potential". The
calibration knows that when asked for "potential" it should look for
"raw_potential" and add `RE_vs_RHE`. So it does a lookup with
`key="raw_potential"` and (1) finds it in the cache. The calibration does
the math and returns a new data series for the calibrated potential, bringing
us back to the original lookup. The data series returned by the
calibration is then (3) cached and returned to the user.
Note that, if the user had not looked up "raw_potential" before looking up
"potential", "raw_potential" would not have been in the cache and the first
lookup above would have been nested in the second.
Args:
key (str): The name of a DataSeries (see above)
Raises:
SeriesNotFoundError if none of the above lookups find the key.
Side-effects:
if key is not already in the cache, it gets added
Returns:
The (calibrated) (appended) dataseries for key with the right t=0.
"""
# step 0
if not isinstance(key, str):
message = f"Invalid lookup for {type(self)} object: {key}."
message += f" The key type was {type(key)}. Expected a string."
if isinstance(key, int):
message += (
" Note: Integer lookup is possible for SpectroMeasurement and"
" CyclicVoltammogram objects. If you expected a measurement"
" containing spectra or index-able cycles,"
" please check your file reading."
)
raise TypeError(message)
# step 1
if key in self._cached_series:
return self._cached_series[key]
# step 2
series = self.get_series(key)
# Finally, wherever we found the series, cache it and return it.
# step 3.
self._cache_series(key, series)
return series
def _cache_series(self, key, series):
"""Cache `series` such that it can be looked up with its name or with `key`."""
self._cached_series[key] = series # now it can be looked up with by `key`
# If the name of the series is not `key`, we can get in a situation where
# looking up the series name raises a SeriesNotFoundError. To avoid this
# problematic situation, we check if it can be looked up, and if not,
# add it a second time to the cached_series, now under `series.name`
try:
_ = self[series.name]
except SeriesNotFoundError:
self._cached_series[series.name] = series
[docs] def get_series(self, key):
"""Find or build the data series corresponding to key without direct cache'ing
See more detailed documentation under `__getitem__`, for which this is a
helper method. This method (A) looks for a method for `key` in the measurement's
`series_constructors`; (B) requests its `calibration` for `key`; and if those
fail appends the data series that either (Ci) are returned by looking up the
key's `aliases` or (Cii) have `key` as their name; and finally (D) check if the
user was using a key with a suffix.
Args:
key (str): The key to look up
Returns DataSeries: the data series corresponding to key
Raises SeriesNotFoundError if no series found for key
"""
# A
if key in self.series_constructors:
return getattr(self, self.series_constructors[key])()
# B
for calibration in self.calibrations:
series = calibration.calibrate_series(key, measurement=self)
# ^ the calibration will call __getitem__ with the name of the
# corresponding raw data and return a new series with calibrated data
# if possible. Otherwise it will return None.
if series:
return series
# C
series_to_append = []
if key in self.series_names: # ii
# Then we'll append any series matching the desired name
series_to_append += [s for s in self.series_list if s.name == key]
if key in self.aliases: # i
# Then we'll look up the aliases instead and append them
for k in self.aliases[key]:
if k == key: # this would result in infinite recursion.
print( # TODO: Real warnings.
"WARNING!!!\n"
f"\t{self!r} has {key} in its aliases for {key}:\n"
f"\tself.aliases['{key}'] = {self.aliases[key]}"
)
continue
try:
series_to_append.append(self[k])
except SeriesNotFoundError:
continue
# If the key is something in the data, by now we have series to append.
if series_to_append:
# the following if's are to do as little extra manipulation as possible:
if len(series_to_append) == 1: # no appending needed
if series_to_append[0].tstamp == self.tstamp: # no time-shifting needed
return series_to_append[0]
return time_shifted(series_to_append[0], tstamp=self.tstamp)
return append_series(series_to_append, name=key, tstamp=self.tstamp)
# D
if key.endswith("-t") or key.endswith("-x"):
return self[key[:-2]].tseries
if key.endswith("-v") or key.endswith("-y"):
return self[key[:-2]]
raise SeriesNotFoundError(f"{self!r} does not contain '{key}'")
[docs] def replace_series(self, series_name, new_series=None):
"""Remove an existing series, add a series to the measurement, or both.
FIXME: This will not appear to change the series for the user if the
measurement's calibration returns something for ´series_name´, since
__getitem__ asks the calibration before looking in series_list.
Args:
series_name (str): The name of a series. If the measurement has (raw) data
series with this name, cached series with this name, and/or aliases for
this name, they will be removed.
new_series (DataSeries): Optional new series to append to the measurement's
series_list. To sanity check, it must have ´series_name´ as its ´name´.
"""
if new_series and not series_name == new_series.name:
raise TypeError(
f"Cannot replace {series_name} in {self!r} with {new_series}. "
f"Names must agree."
)
if series_name in self._cached_series:
del self._cached_series[series_name]
if series_name in self._aliases:
del self._aliases[series_name]
new_series_list = [s for s in self.series_list if not s.name == series_name]
if new_series:
new_series_list.append(new_series)
self._series_list = new_series_list
[docs] def clear_cache(self):
"""Clear the cache so derived series are constructed again with updated info"""
self._cached_series = {}
[docs] def correct_data(self, value_name, new_data):
"""Replace the old data for ´value_name´ (str) with ´new_data` (np array)"""
old_vseries = self[value_name]
new_vseries = ValueSeries(
name=value_name,
unit_name=old_vseries.unit_name,
data=new_data,
tseries=old_vseries.tseries,
)
self.replace_series(value_name, new_vseries)
[docs] def grab(self, item, tspan=None, include_endpoints=False, tspan_bg=None):
"""Return a value vector with the corresponding time vector
Grab is the *canonical* way to retrieve numerical time-dependent data from a
measurement in ixdat. The first argument is always the name of the value to get
time-resolved data for (the name of a ValueSeries). The second, optional,
argument is a timespan to select the data for.
Two vectors are returned: first time (t), then value (v). They are of the same
length so that `v` can be plotted against `t`, integrated over `t`, interpolated
via `t`, etc. `t` and `v` are returned in the units of their DataSeries.
TODO: option to specifiy desired units
Typical usage::
t, v = measurement.grab("potential", tspan=[0, 100])
Args:
item (str): The name of the DataSeries to grab data for
TODO: Should this be called "name" or "key" instead? And/or, should
the argument to __getitem__ be called "item" instead of "key"?
tspan (iter of float): Defines the timespan with its first and last values.
Optional. By default the entire time of the measurement is included.
include_endpoints (bool): Whether to add a points at t = tspan[0] and
t = tspan[-1] to the data returned. This makes trapezoidal integration
less dependent on the time resolution. Default is False.
tspan_bg (iterable): Optional. A timespan defining when `item` is at its
baseline level. The average value of `item` in this interval will be
subtracted from the values returned.
"""
vseries = self[item]
tseries = vseries.tseries
v = vseries.data
t = tseries.data + tseries.tstamp - self.tstamp
if tspan is not None: # np arrays don't boolean well :(
if include_endpoints:
if t[0] < tspan[0]: # then add a point to include tspan[0]
v_0 = np.interp(tspan[0], t, v)
t = np.append(tspan[0], t)
v = np.append(v_0, v)
if tspan[-1] < t[-1]: # then add a point to include tspan[-1]
v_end = np.interp(tspan[-1], t, v)
t = np.append(t, tspan[-1])
v = np.append(v, v_end)
mask = np.logical_and(tspan[0] <= t, t <= tspan[-1])
t, v = t[mask], v[mask]
if tspan_bg:
t_bg, v_bg = self.grab(item, tspan=tspan_bg)
v = v - np.mean(v_bg)
return t, v
[docs] def grab_for_t(self, item, t, tspan_bg=None):
"""Return a numpy array with the value of item interpolated to time t
Args:
item (str): The name of the value to grab
t (np array): The time vector to grab the value for
tspan_bg (iterable): Optional. A timespan defining when `item` is at its
baseline level. The average value of `item` in this interval will be
subtracted from what is returned.
"""
vseries = self[item]
tseries = vseries.tseries
v_0 = vseries.data
t_0 = tseries.data + tseries.tstamp - self.tstamp
v = np.interp(t, t_0, v_0)
if tspan_bg:
t_bg, v_bg = self.grab(item, tspan=tspan_bg)
v = v - np.mean(v_bg)
return v
[docs] def integrate(self, item, tspan=None, ax=None):
"""Return the time integral of item in the specified timespan"""
t, v = self.grab(item, tspan, include_endpoints=True)
if ax:
if ax == "new":
ax = self.plotter.new_ax(ylabel=item)
# FIXME: xlabel=self[item].tseries.name gives a problem :(
ax.plot(t, v, color="k", label=item)
ax.fill_between(t, v, np.zeros(t.shape), where=v > 0, color="g", alpha=0.3)
ax.fill_between(
t, v, np.zeros(t.shape), where=v < 0, color="g", alpha=0.1, hatch="//"
)
return np.trapz(v, t)
@property
def t(self):
return self[self.control_series_name].t
@property
def t_name(self):
return self[self.control_series_name].tseries.name
def _build_file_number_series(self):
"""Build a `file_number` series based on component measurements times."""
series_to_append = []
for i, m in enumerate(self.component_measurements or [self]):
if (
self.control_technique_name
and not m.technique == self.control_technique_name
):
continue
if not self.control_series_name:
tseries = m.time_series[0]
else:
try:
tseries = m[self.control_series_name].tseries
except SeriesNotFoundError:
continue
series_to_append.append(
ConstantValue(name="file_number", unit_name="", data=i, tseries=tseries)
)
return append_series(series_to_append, name="file_number", tstamp=self.tstamp)
def _build_selector_series(
self, selector_string=None, columns=None, extra_columns=None
):
"""Build a `selector` series which demarcates the data.
The `selector` is a series which can be used to conveniently and powerfully
grab sections of the data. It is built up from less powerful demarcation series
in the raw data (like `cycle_number`, `step_number`, `loop_number`, etc) and
`file_number` by counting the cumulative changes in those series.
See slide 3 of:
https://www.dropbox.com/s/sjxzr52fw8yml5k/21E18_DWS3_cont.pptx?dl=0
Args:
selector_string (str): The name to use for the selector series
columns (list): The list of demarcation series. The demarcation series have
to have equal-length tseries, which should be the one pointed to by the
meausrement's `control_series_name`.
extra_columns (list): Extra demarcation series to include if needed.
"""
# the name of the selector series:
selector_string = selector_string or self.selector_name
# a vector that will be True at the points where a series changes:
changes = np.tile(False, self.t.shape)
# the names of the series which help demarcate the data
columns = columns or self.selection_series_names
if extra_columns:
columns += extra_columns
for col in columns:
try:
vseries = self[col]
except SeriesNotFoundError:
continue
values = vseries.data
if len(values) == 0:
print("WARNING: " + col + " is empty")
continue
elif not len(values) == len(changes):
print("WARNING: " + col + " has an unexpected length")
continue
# a vector which is shifted one.
last_value = np.append(values[0], values[:-1])
# comparing value and last_value shows where in the vector changes occur:
changes = np.logical_or(changes, last_value != values)
# taking the cumsum makes a vector that increases 1 each time one of the
# original demarcation vector changes
selector_data = np.cumsum(changes)
selector_series = ValueSeries(
name=selector_string,
unit_name="",
data=selector_data,
tseries=self[self.control_series_name].tseries,
)
return selector_series
[docs] def rebuild_selector(self, selector_string=None, columns=None, extra_columns=None):
"""Build a new selector series for the measurement and cache it.
This can be useful if a user wants to change how their measurement counts
sections (for example, only count sections when technique or file number changes)
Args:
selector_string (str): The name to use for the selector series
columns (list): The list of demarcation series. The demarcation series have
to have the same tseries, which should be the one pointed to by the
meausrement's `control_series_name`.
extra_columns (list): Extra demarcation series to include if needed.
"""
selector_string = selector_string or self.selector_name
selector_series = self._build_selector_series(
selector_string=selector_string,
columns=columns,
extra_columns=extra_columns,
)
self._cache_series(selector_string, selector_series)
return selector_series
@property
def selector(self):
return self[self.selector_name]
@property
def data_cols(self):
"""Return a set of the names of all of the measurement's VSeries and TSeries"""
return set([s.name for s in (self.value_series + self.time_series)])
[docs] def get_original_m_ids_of_series(self, series):
"""Return a list of id's of component measurements to which `series` belongs."""
m_id_list = []
for m in self.component_measurements:
if series.short_identity in m.s_ids:
# FIXME: the whole id vs short_identity issue
# see https://github.com/ixdat/ixdat/pull/11#discussion_r746632897
m_id_list.append(m.id)
return m_id_list
@property
def tspan(self):
"""The minimum timespan (with respect to self.tstamp) containing all the data"""
t_start = None
t_finish = None
if not self.time_names: # No TimeSeries in the measurement means no tspan.
return None
for t_name in self.time_names:
t = self[t_name].data
if len(t) == 0:
return None
t_start = t[0] if t_start is None else min(t_start, t[0])
t_finish = t[-1] if t_finish is None else max(t_finish, t[-1])
return [t_start, t_finish]
[docs] def cut(self, tspan, t_zero=None):
"""Return a new measurement with the data in the given time interval
Args:
tspan (iter of float): The time interval to use, relative to self.tstamp
tspan[0] is the start time of the interval, and tspan[-1] is the end
time of the interval. Using tspan[-1] means you can directly use a
long time vector that you have at hand to describe the time interval
you're looking for.
t_zero (float or str): The time in the measurement to set to t=0. If a
float, it is interpreted as wrt the original tstamp. String options
include "start", which puts t=0 at the start of the cut interval.
"""
# Start with self's dictionary representation, but
# we don't want original series (s_ids) or component_measurements (m_ids):
obj_as_dict = self.as_dict(exclude=["s_ids", "m_ids"])
# first, cut the series list:
new_series_list = []
time_cutting_stuff = {} # {tseries_id: (mask, new_tseries)}
for series in self.series_list:
try:
tseries = series.tseries
if tseries is None:
raise AttributeError
except AttributeError: # series independent of time are uneffected by cut
new_series_list.append(series)
else:
t_identity = tseries.full_identity
if t_identity in time_cutting_stuff:
mask, new_tseries = time_cutting_stuff[t_identity]
else:
t = tseries.t + tseries.tstamp - self.tstamp
mask = np.logical_and(tspan[0] <= t, t <= tspan[-1])
new_tseries = TimeSeries(
name=tseries.name,
unit_name=tseries.unit_name,
tstamp=tseries.tstamp,
data=tseries.data[mask],
)
time_cutting_stuff[t_identity] = (mask, new_tseries)
if True not in mask:
continue
if False not in mask:
new_series_list.append(series)
elif series.full_identity == t_identity:
new_series_list.append(new_tseries)
else:
new_series = series.__class__(
name=series.name,
unit_name=series.unit_name,
data=series.data[mask],
tseries=new_tseries,
)
new_series_list.append(new_series)
obj_as_dict["series_list"] = new_series_list
# then cut the component measurements.
new_component_measurements = []
for m in self._component_measurements:
# FIXME: This is perhaps overkill, to make new cut component measurements,
# as it duplicates data (a big no)... especially bad because
# new_measurement.save() saves them.
# The step is here in order for file_number to get built correctly.
if not m.tspan:
# if it has no TimeSeries it must be a "constant". Best to include:
new_component_measurements.append(m)
continue
# Otherwise we have to cut it according to the present tspan.
dt = m.tstamp - self.tstamp
try:
tspan_m = [tspan[0] - dt, tspan[1] - dt]
except IndexError: # Apparently this can happen for empty files. See:
continue # https://github.com/ixdat/ixdat/issues/93
if m.tspan[-1] < tspan_m[0] or tspan_m[-1] < m.tspan[0]:
continue
new_component_measurements.append(m.cut(tspan_m))
obj_as_dict["component_measurements"] = new_component_measurements
new_measurement = self.__class__.from_dict(obj_as_dict)
if t_zero:
if t_zero == "start":
new_measurement.tstamp += tspan[0]
else:
new_measurement.tstamp += t_zero
return new_measurement
[docs] def multicut(self, tspans):
"""Return a selection of the measurement including each of the given tspans"""
# go through the tspans, cuting the measurement and appending the results
new_measurement = None
for tspan in tspans:
if new_measurement:
new_measurement = new_measurement + self.cut(tspan)
else:
new_measurement = self.cut(tspan)
return new_measurement
[docs] def select_value(self, *args, **kwargs):
"""Return a selection of the measurement where a criterion is matched.
Specifically, this method returns a new Measurement where the time(s) returned
are those where the values match the provided criteria, i.e. the part of the
measurement where `self[series_name] == value`
Can only take one arg or kwarg!
The `series_name` is `self.selector_name` if given an argument without keyword.
If given a keyword argument, the kyword is the name of the series to select on.
Either way the argument is the `value` to be selected for.
The method finds all time intervals for which `self[series_name] == value`
It then cuts the measurement according to each time interval and adds these
segments together.
TODO: This can maybe be done better, i.e. without chopping series.
TODO: Some way of less than and greater than kwargs.
Ideally you should be able to say e.g., `select(cycle=1, 0.5<potential<1)`
But this is hard,
see: https://github.com/ixdat/ixdat/pull/11#discussion_r677272239
"""
if len(args) + len(kwargs) != 1:
raise BuildError("Need exactly 1 arg. Use `select_values` for more.")
if args:
if not self.selector_name:
raise BuildError(
f"{self!r} does not have a default selection string "
f"(Measurement.sel_str), and so selection only works with kwargs."
)
kwargs[self.selector_name] = args[0]
((series_name, value),) = kwargs.items()
# The time and values of the series to be selected on:
t, v = self.grab(series_name)
# This mask is true everywhere on `t` that the condition is met:
mask = v == value # linter doesn't realize this is a np array
# Now we have to convert that to timespans on which `t` is met. This means
# finding the start and finish times of the intervals on which mask is True.
# this is done with a helper function:
tspans = get_tspans_from_mask(t, mask)
# now we go through the tspans, cuting the measurement and appending the results:
return self.multicut(tspans)
[docs] def select_values(self, *args, selector_name=None, **kwargs):
"""Return a selection of the measurement based on one or several criteria
Specifically, this method returns a new Measurement where the time(s) returned
are those where the values match the provided criteria, i.e. the part of the
measurement where `self[series_name] == value`
Any series can be selected for using the series name as a key-word. Arguments
can be single acceptable values or lists of acceptable values.
You can select for one or more series without valid python variable names by
providing the kwargs using ** notation (see last example below).
Arguments without key-word are considered valid values of the default
selector, which is normally `self.selector_name` but can also be specified
here using the key-word argument `selector_name`. Multiple criteria are
applied sequentially, i.e. you get the intersection of satisfying parts.
Examples of valid calls given a measurement `meas`:
```
# to select where the default selector is 3, use:
meas.select_values(3)
# to select for where the default selector is 4 or 5:
meas.select_values(4, 5)
# to select for where "cycle" (i.e. the value of meas["cycle"].data) is 4:
meas.select_values(cycle=4)
# to select for where "loop_number" is 1 AND "cycle" is 3, 4, or 5:
meas.select_values(loop_number=1, cycle=[3, 4, 5])
# to select for where "cycle number" (notice the space) is 2 or 3:
meas.select_values([2, 3], selector_name="cycle number")
# which is equivalent to:
meas.select_values(**{"cycle number": [2, 3]})
Args:
args (tuple): Argument(s) given without keyword are understood as acceptable
value(s) for the selector (that named by selector_name or
self.selector_name).
selector_name: The name of the selector to which the args specify
kwargs (dict): Each key-word arguments is understood as the name
of a series and its acceptable value(s).
"""
if args:
# Then we must interpret the arguments as allowed values of a selector,
# either specified in the kwargs or the Measurement's default selector:
selector_name = selector_name or self.selector_name
if not selector_name:
raise BuildError(
f"{self:r} does not have a default selector_name "
f"(Measurement.selector_name), and so selection only works "
f"with a selector_name specified "
f"(see `help(Measurement.select_values)`)"
)
# Get the args into a simple list:
flat_args = []
for arg in args:
if hasattr(arg, "__iter__"):
flat_args += list(arg)
else:
flat_args.append(arg)
if selector_name in kwargs:
raise ValueError(
"Don't call select_values with both arguments and "
"'{self.selector_name}' as a key-word argument"
)
kwargs[self.selector_name] = flat_args
t = self.t
mask = np.tile(np.array([True]), t.shape)
for series_name, allowed_values in kwargs.items():
if not hasattr(allowed_values, "__iter__"):
allowed_values = [allowed_values]
v = self.grab_for_t(series_name, t)
submask = np.tile(np.array([False]), t.shape)
for allowed_value in allowed_values:
submask = np.logical_or(submask, v == allowed_value)
mask = np.logical_and(mask, submask)
tspans = get_tspans_from_mask(t, mask)
return self.multicut(tspans)
[docs] def select(self, *args, tspan=None, **kwargs):
"""`cut` (with tspan) and `select_values` (with *args and/or **kwargs).
These all work for measurements that have a default selector and/or the
indicated columns:
- `meas.select(1, 2)`
- `meas.select(tspan=[200, 300])`
- `meas.select(range(10))`
- `meas.select(cycle=4)`
- `meas.select(**{"cycle number": [20, 21]})
- `meas.select(loop_number=1, tspan=[1000, 2000])
- `meas.select(1, range(5, 20), file_number=1, tspan=[1000, 2000])`
"""
new_measurement = self
if tspan:
new_measurement = new_measurement.cut(tspan=tspan)
if args or kwargs:
new_measurement = new_measurement.select_values(*args, **kwargs)
return new_measurement
[docs] def copy(self):
"""Make a copy of the Measurement via its dictionary representation"""
return self.__class__.from_dict(self.as_dict())
def __add__(self, other):
"""Addition of measurements appends the series and component measurements lists.
Adding results in a new Measurement. If the combination of the two measurements'
techniques is a recognized hyphenated technique, it returns an object of that
technique's measurement class. Otherwise it returns an object of Measurement.
metadata, sample, and logentry come from the first measurement.
An important point about addition is that it is almost but not quite associative
and commutative i.e.
A + (B + C) == (A + B) + C == C + B + A is not quite true
Each one results in the same series and component measurements. They will even
appear in the same order in A + (B + C) and (A + B) + C. However, the technique
might be different, as a new technique might be determined each time.
Note also that there is no difference between hyphenating (simultaneous EC and
MS datasets, for example) and appending (sequential EC datasets). Either way,
all the raw series (or their placeholders) are just stored in the lists.
"""
from .spectra import SpectrumSeries, add_spectrum_series_to_measurement
if isinstance(other, SpectrumSeries):
return add_spectrum_series_to_measurement(self, other)
new_name = self.name + " AND " + other.name
new_technique = get_combined_technique(self.technique, other.technique)
# TODO: see if there isn't a way to put the import at the top of the module.
# see: https://github.com/ixdat/ixdat/pull/1#discussion_r546437410
from .techniques import TECHNIQUE_CLASSES
if new_technique in TECHNIQUE_CLASSES:
cls = TECHNIQUE_CLASSES[new_technique]
elif self.__class__ is other.__class__:
cls = self.__class__
else:
cls = Measurement
new_series_list = list(set(self.series_list + other.series_list))
new_component_measurements = list(
set(
(self.component_measurements or [self])
+ (other.component_measurements or [other])
)
)
new_calibration_list = list(
set(self._calibration_list + other._calibration_list)
)
new_aliases = self.aliases.copy()
for key, names in other.aliases.items():
if key in new_aliases:
new_aliases[key] = list(set(new_aliases[key] + other.aliases[key]))
else:
new_aliases[key] = other.aliases[key]
obj_as_dict = self.as_dict()
other_as_dict = other.as_dict()
for k, v in other_as_dict.items():
# Looking forward to the "|" operator!
if k not in obj_as_dict:
obj_as_dict[k] = v
obj_as_dict.update(
name=new_name,
technique=new_technique,
series_list=new_series_list,
component_measurements=new_component_measurements,
calibration_list=new_calibration_list,
aliases=new_aliases,
)
# don't want the original calibrations, component measurements, or series:
del obj_as_dict["c_ids"]
del obj_as_dict["m_ids"]
del obj_as_dict["s_ids"]
return cls.from_dict(obj_as_dict)
[docs] def join(self, other, join_on=None):
"""Join two measurements based on a shared data series
This involves projecting all timeseries from other's data series so that the
variable named by `join_on` is shared between all data series.
This is analogous to an explicit inner join.
Args:
other (Measurement): a second measurement to join to self
join_on (str or tuple): Either a string, if the value to join on is called
the same thing in both measurements, or a tuple of two strings where
the first is the name of the variable in self and the second in other.
The variable described by join_on must be monotonically increasing in
both measurements.
"""
raise NotImplementedError
[docs]class Calibration(Saveable):
"""Base class for calibrations."""
table_name = "calibration"
column_attrs = {
"name",
"technique",
"tstamp",
}
def __init__(self, *, name=None, technique=None, tstamp=None, measurement=None):
"""Initiate a Calibration
Args:
name (str): The name of the calibration
technique (str): The technique of the calibration
tstamp (float): The time at which the calibration took place or is valid
measurement (Measurement): Optional. A measurement to calibrate by default.
"""
super().__init__()
# NOTE: The :r syntax in f-strings doesn't work on None
self.name = name or f"{self.__class__.__name__}({repr(measurement)})"
self.technique = technique
self.tstamp = tstamp or (measurement.tstamp if measurement else None)
self.measurement = measurement
[docs] @classmethod
def from_dict(cls, obj_as_dict):
"""Return an object of the Calibration class of the right technique
Args:
obj_as_dict (dict): The full serializaiton (rows from table and aux
tables) of the measurement. obj_as_dict["technique"] specifies the
technique class to use, from TECHNIQUE_CLASSES
"""
# TODO: see if there isn't a way to put the import at the top of the module.
# see: https://github.com/ixdat/ixdat/pull/1#discussion_r546437410
from .techniques import CALIBRATION_CLASSES
if obj_as_dict["technique"] in CALIBRATION_CLASSES:
calibration_class = CALIBRATION_CLASSES[obj_as_dict["technique"]]
else:
calibration_class = cls
try:
calibration = calibration_class(**obj_as_dict)
except Exception:
raise
return calibration
[docs] def export(self, path_to_file=None):
"""Export an ECMSCalibration as a json-formatted text file"""
path_to_file = path_to_file or (self.name + ".ix")
self_as_dict = self.as_dict()
with open(path_to_file, "w") as f:
json.dump(self_as_dict, f, indent=4)
[docs] @classmethod
def read(cls, path_to_file):
"""Read a Calibration from a json-formatted text file"""
with open(path_to_file) as f:
obj_as_dict = json.load(f)
return cls.from_dict(obj_as_dict)
[docs] def calibrate_series(self, key, measurement=None):
"""This should be overwritten in real calibration classes.
FIXME: Add more documentation about how to write this in inheriting classes.
"""
raise NotImplementedError
[docs]def get_combined_technique(technique_1, technique_2):
"""Return the name of the technique resulting from adding two techniques"""
# TODO: see if there isn't a way to put the import at the top of the module.
# see: https://github.com/ixdat/ixdat/pull/1#discussion_r546437410
if technique_1 == technique_2:
return technique_1
# if we're a component technique of a hyphenated technique to that hyphenated
# technique, the result is still the hyphenated technique. e.g. EC-MS + MS = EC-MS
if "-" in technique_1 and technique_2 in technique_1.split("-"):
return technique_1
elif "-" in technique_2 and technique_1 in technique_2.split("-"):
return technique_2
# if we're adding two independent technique which are components of a hyphenated
# technique, then we want that hyphenated technique. e.g. EC + MS = EC-MS
from .techniques import TECHNIQUE_CLASSES
for hyphenated in [
technique_1 + "-" + technique_2,
technique_2 + "-" + technique_1,
]:
if hyphenated in TECHNIQUE_CLASSES:
return hyphenated
# if all else fails, we just join them with " and ". e.g. MS + XRD = MS and XRD
return technique_1 + " and " + technique_2