"""Module defining the ixdat csv reader, so ixdat can read the files it exports."""
from pathlib import Path
import json
import numpy as np
import re
import pandas as pd
from .. import Measurement
from ..exceptions import ReadError
from ..data_series import ValueSeries, TimeSeries, DataSeries, Field
from ..spectra import Spectrum, SpectrumSeries
from ..techniques import TECHNIQUE_CLASSES
regular_expressions = {
"name": r"^name = (.+)\n",
"tstamp": r"tstamp = ([0-9\.]+)\n",
"technique": r"technique = (.+)\n",
"N_header_lines": r"N_header_lines = ([0-9]+)\n",
"continuous": r"continuous = (.+)\n",
"backend_name": r"backend_name = (.+)\n",
"id": r"id = ([0-9]+)",
"timecol": r"timecol '(.+)' for: (?:'(.+)')$",
"unit": r"/ \[(.+)\]",
"aux_file": r"'(.*)' in file: '(.*)'",
}
bad_keys = ("time_step",)
[docs]class IxdatCSVReader:
"""A class that reads the csv's made by ixdat.exporters.csv_exporter.CSVExporter
read() is the important method - it takes the path to the mpt file as argument
and returns an ECMeasurement object (ec_measurement) representing that file.
The ECMeasurement contains a reference to the BiologicMPTReader object, as
ec_measurement.reader. This makes available all the following stuff, likely
useful for debugging.
Attributes:
path_to_file (Path): the location and name of the file read by the reader
n_line (int): the number of the last line read by the reader
place_in_file (str): The last location in the file read by the reader. This
is used internally to tell the reader how to parse each line. Options are:
"header", "column names", and "data".
header_lines (list of str): a list of the header lines of the files. This
includes the column name line. The header can be nicely viewed with the
print_header() function.
tstamp (str): The unix time corresponding to t=0
technique (str): The name of the technique
N_header_lines (int): The number of lines in the header of the file
column_names (list of str): The names of the data columns in the file
column_data (dict of str: np.array): The data in the file as a dict.
Note that the np arrays are the same ones as in the measurement's DataSeries,
so this does not waste memory.
file_has_been_read (bool): This is used to make sure read() is only successfully
called once by the Reader. False until read() is called, then True.
measurement (Measurement): The measurement returned by read() when the file is
read. self.measureemnt is None before read() is called.
"""
delim = ","
def __init__(self):
"""Initialize a Reader for ixdat-exported .csv files. See class docstring."""
self.name = None
self.path_to_file = None
self.n_line = 0 # TODO: decide if this is part of API.
# as per https://github.com/ixdat/ixdat/pull/30/files#r816204939
self.place_in_file = "header"
self.header_lines = []
self.tstamp = None
self.N_header_lines = None
self.timecols = {}
self.column_names = []
self.column_data = {}
self.technique = None
self.aux_file_objects = {}
self.measurement_class = Measurement
self.file_has_been_read = False
self.measurement = None
self.meas_as_dict = {}
[docs] def read(self, path_to_file, name=None, cls=None, **kwargs):
"""Return a Measurement with the data and metadata recorded in path_to_file
This loops through the lines of the file, processing one at a time. For header
lines, this involves searching for metadata. For the column name line, this
involves creating empty arrays for each data series. For the data lines, this
involves appending to these arrays. After going through all the lines, it
converts the arrays to DataSeries.
The technique is specified in the header, and used to pick the
TechniqueMeasurement class.
Finally, the method returns a TechniqueMeasurement object `measurement`
with these DataSeries. All attributes of this reader can be accessed from the
measurement as `measurement.reader.attribute_name`.
Args:
path_to_file (Path): The full abs or rel path including the ".mpt" extension
name (str): The name of the measurement to return (defaults to path_to_file)
cls (Measurement subclass): The class of measurement to return. By default,
cls will be determined from the technique specified in the header of
path_to_file.
**kwargs (dict): Key-word arguments are passed to ECMeasurement.__init__
Returns cls: a Measurement of type cls
"""
path_to_file = Path(path_to_file) if path_to_file else self.path_to_file
if self.file_has_been_read:
print(
f"This {self.__class__.__name__} has already read {self.path_to_file}."
" Returning the measurement resulting from the original read. "
"Use a new Reader if you want to read another file."
)
return self.measurement
self.name = name or path_to_file.name
self.path_to_file = path_to_file
with open(self.path_to_file, "r") as f:
for line in f:
self.process_line(line)
for name in self.column_names:
self.column_data[name] = np.array(self.column_data[name])
data_series_dict = {}
for tcol_name in self.timecols: # then it's time!
data_series_dict[tcol_name] = TimeSeries(
name=tcol_name,
unit_name=get_column_unit(tcol_name) or "s",
data=self.column_data[tcol_name],
tstamp=self.tstamp,
)
for column_name, data in self.column_data.items():
if column_name in self.timecols:
continue
try:
tcol_name = next(
tcol_name
for tcol_name in self.timecols
if column_name in self.timecols[tcol_name]
)
except StopIteration: # debugging
raise ReadError(
f"can't find tcol for {column_name}. timecols={self.timecols}"
)
tseries = data_series_dict[tcol_name]
vseries = ValueSeries(
name=column_name,
data=data,
tseries=tseries,
unit_name=get_column_unit(column_name),
)
data_series_dict[column_name] = vseries
data_series_list = list(data_series_dict.values())
self.meas_as_dict.update(
name=self.name,
technique=self.technique,
reader=self,
series_list=data_series_list,
tstamp=self.tstamp,
)
self.meas_as_dict.update(self.aux_file_objects)
self.meas_as_dict.update(kwargs)
if issubclass(cls, self.measurement_class):
self.measurement_class = cls
self.measurement = self.measurement_class.from_dict(self.meas_as_dict)
self.file_has_been_read = True
return self.measurement
[docs] def process_line(self, line):
"""Call the correct line processing method depending on self.place_in_file"""
if self.place_in_file == "header":
self.process_header_line(line)
elif self.place_in_file == "column names":
self.process_column_line(line)
elif self.place_in_file == "data":
self.process_data_line(line)
else: # just for debugging
raise ReadError(f"place_in_file = {self.place_in_file}")
self.n_line += 1
[docs] def process_column_line(self, line):
"""Split the line to get the names of the file's data columns"""
self.header_lines.append(line)
self.column_names = [name.strip() for name in line.split(self.delim)]
self.column_data.update({name: [] for name in self.column_names})
self.place_in_file = "data"
[docs] def process_data_line(self, line):
"""Split the line and append the numbers the corresponding data column arrays"""
data_strings_from_line = line.strip().split(self.delim)
for name, value_string in zip(self.column_names, data_strings_from_line):
if value_string:
try:
value = float(value_string)
except ValueError:
# That is probably because different columns are different length.
# so we just skip it!
continue
# raise ReadError(f"can't parse value string '{value_string}'")
self.column_data[name].append(value)
[docs] def read_aux_file(self, path_to_aux_file, name):
"""Read an auxiliary file and include its series list in the measurement"""
spec = IxdatSpectrumReader().read(path_to_aux_file, name=name)
self.aux_file_objects[name] = spec
[docs]def get_column_unit(column_name):
"""Return the unit name of an ixdat column, i.e the part of the name after the '/'"""
unit_match = re.search(regular_expressions["unit"], column_name)
if unit_match:
unit_name = unit_match.group(1)
else:
unit_name = None
return unit_name
[docs]class IxdatSpectrumReader(IxdatCSVReader):
"""A reader for ixdat spectra."""
[docs] def read(self, path_to_file, name=None, cls=Spectrum, **kwargs):
"""Read an ixdat spectrum.
This reads the header with the process_line() function inherited from
IxdatCSVReader. Then it uses pandas to read the data.
Args:
path_to_file (Path): The full absolute or relative path including extension
name (str): The name of the measurement to return (defaults to path_to_file)
cls (Spectrum subclass): The class of measurement to return. By default,
cls will be determined from the technique specified in the header of
path_to_file.
**kwargs (dict): Key-word arguments are passed to ECMeasurement.__init__
Returns cls: a Spectrum of type cls
"""
path_to_file = Path(path_to_file)
self.name = name or path_to_file.name
with open(path_to_file, "r") as f:
for line in f:
if self.place_in_file == "header":
self.process_line(line)
else:
break
df = pd.read_csv(path_to_file, sep=",", header=self.N_header_lines - 2)
if issubclass(TECHNIQUE_CLASSES.get(self.technique, int), Spectrum):
cls = TECHNIQUE_CLASSES[self.technique]
elif self.technique.endswith("spectrum"):
cls = cls if issubclass(cls, Spectrum) else Spectrum
elif self.technique.endswith("spectra"):
cls = cls if issubclass(cls, SpectrumSeries) else SpectrumSeries
if issubclass(cls, SpectrumSeries): # return a SpectrumSeries object
names = {}
units = {}
swap_axes = False
for line in self.header_lines:
for line_start in ("values", "first row", "first column"):
if line.startswith(line_start):
t_x_or_y = re.search("([yxt])=", line).group(1)
names[t_x_or_y] = re.search(r"\'(.*)\'", line).group(1)
units[t_x_or_y] = re.search(r"\[(.*)\]", line).group(1)
if "row" in line_start and t_x_or_y == "t": # check!
swap_axes = True
z1 = np.array([float(key) for key in list(df.keys())[1:]])
z1_and_y = df.to_numpy()
z0 = z1_and_y[:, 0]
y = z1_and_y[:, 1:]
if swap_axes:
# This is the case if the file was export with spectra_as_rows = False.
t = z1
x = z0
y = y.swapaxes(0, 1)
else:
t = z0
x = z1
tseries = TimeSeries(
name=names["t"], unit_name=units["t"], data=t, tstamp=self.tstamp
)
xseries = DataSeries(name=names["x"], unit_name=units["x"], data=x)
field = Field(
name=names["y"],
unit_name=units["y"],
data=y,
axes_series=[tseries, xseries],
)
self.meas_as_dict.update(
field=field, name=self.name, technique=self.technique, tstamp=self.tstamp
)
return cls.from_dict(self.meas_as_dict)
else: # return a simple Spectrum object
x_name, y_name = tuple(df.keys())
x = df[x_name].to_numpy()
y = df[y_name].to_numpy()
return cls.from_data( # see Spectrum.from_data()
x,
y,
self.tstamp,
x_name,
y_name,
name=self.name,
technique=self.technique,
reader=self,
)