366 lines
13 KiB
Python
366 lines
13 KiB
Python
import os
|
|
from pathlib import Path
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
def ensure_dir(path: Path) -> None:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def read_signal_csv(csv_path: Path, value_column: str) -> pd.DataFrame:
|
|
"""Read a CSV with columns ['timestamp', value_column] into a DataFrame with datetime index.
|
|
|
|
Returns empty DataFrame if file does not exist.
|
|
"""
|
|
if not csv_path.exists():
|
|
return pd.DataFrame(columns=[value_column])
|
|
|
|
df = pd.read_csv(csv_path)
|
|
if 'timestamp' not in df.columns or value_column not in df.columns:
|
|
return pd.DataFrame(columns=[value_column])
|
|
|
|
# Convert timestamp (ms) to datetime and set as index
|
|
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce')
|
|
df = df.dropna(subset=['timestamp'])
|
|
df = df.set_index('timestamp').sort_index()
|
|
# Coerce values to numeric
|
|
df[value_column] = pd.to_numeric(df[value_column], errors='coerce')
|
|
df = df.dropna(subset=[value_column])
|
|
return df
|
|
|
|
|
|
def compute_time_based_moving_averages(df: pd.DataFrame, value_col: str) -> pd.DataFrame:
|
|
"""Add 5s, 10s, 30s, 60s time-based rolling means for an irregularly sampled series.
|
|
|
|
Requires a DatetimeIndex.
|
|
"""
|
|
if df.empty:
|
|
return df
|
|
|
|
# Ensure index is datetime for time-based rolling windows
|
|
if not isinstance(df.index, pd.DatetimeIndex):
|
|
raise ValueError('DataFrame index must be a DatetimeIndex for time-based rolling windows')
|
|
|
|
df = df.copy()
|
|
for seconds in (5, 10, 30, 60):
|
|
window = f'{seconds}s'
|
|
df[f'{value_col}_ma_{seconds}s'] = (
|
|
df[value_col].rolling(window=window, min_periods=1).mean()
|
|
)
|
|
return df
|
|
|
|
|
|
def add_seconds_from_start(df: pd.DataFrame, start_ts: pd.Timestamp) -> pd.DataFrame:
|
|
if df.empty:
|
|
return df
|
|
df = df.copy()
|
|
df['seconds'] = (df.index - start_ts).total_seconds()
|
|
return df
|
|
|
|
|
|
def read_marks(csv_path: Path) -> pd.Series:
|
|
"""Read marked timestamps as a pandas Series of pd.Timestamp, sorted ascending.
|
|
|
|
Returns empty Series if file missing or no valid rows.
|
|
"""
|
|
if not csv_path.exists():
|
|
return pd.Series([], dtype='datetime64[ns]')
|
|
|
|
df = pd.read_csv(csv_path)
|
|
if 'timestamp' not in df.columns:
|
|
return pd.Series([], dtype='datetime64[ns]')
|
|
# Convert to datetime
|
|
ts = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce').dropna().sort_values()
|
|
return ts
|
|
|
|
|
|
def clean_rr_ms(rr_df: pd.DataFrame, col: str = 'rr_ms', source_name: str | None = None) -> pd.DataFrame:
|
|
"""Basic NN editing for RR in ms with interpolation and reporting.
|
|
|
|
Steps:
|
|
- Coerce to numeric and mark non-finite as NaN (count)
|
|
- Mark out-of-range [300, 2000] ms as NaN (count)
|
|
- Mark robust outliers via 15s rolling median/MAD (z > 3.5) as NaN (count)
|
|
- Time-based interpolation to fill flagged values (then ffill/bfill)
|
|
- Print counts summary
|
|
"""
|
|
if rr_df is None or rr_df.empty or col not in rr_df.columns:
|
|
return rr_df
|
|
df = rr_df.copy()
|
|
df[col] = pd.to_numeric(df[col], errors='coerce')
|
|
# Track flags (only threshold filtering per request)
|
|
nonfinite_mask = ~pd.notna(df[col])
|
|
range_mask = (df[col] < 300) | (df[col] > 2000)
|
|
# Combine flags: non-finite or out-of-range
|
|
flagged = nonfinite_mask | range_mask
|
|
# Set flagged to NaN for interpolation
|
|
df.loc[flagged, col] = np.nan
|
|
|
|
# Interpolate in time, then ffill/bfill for edges
|
|
if isinstance(df.index, pd.DatetimeIndex):
|
|
df[col] = df[col].interpolate(method='time', limit_direction='both')
|
|
else:
|
|
df[col] = df[col].interpolate(limit_direction='both')
|
|
df[col] = df[col].ffill().bfill()
|
|
|
|
# Reporting
|
|
if source_name is None:
|
|
source_name = 'RR cleaning'
|
|
print(f"{source_name} - RR filter: nonfinite={int(nonfinite_mask.sum())}, out_of_range={int(range_mask.sum())}, total_flagged={int(flagged.sum())}")
|
|
return df
|
|
|
|
|
|
def segment_bounds_from_marks(marks: pd.Series, start_ts: pd.Timestamp, end_ts: pd.Timestamp) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
|
|
"""Create segments between consecutive marks, plus the final segment from last mark to end.
|
|
|
|
Does NOT include the pre-first mark segment.
|
|
If there are no marks, returns a single segment from start to end.
|
|
"""
|
|
segments: list[tuple[pd.Timestamp, pd.Timestamp]] = []
|
|
if marks is None or len(marks) == 0:
|
|
return [(start_ts, end_ts)]
|
|
|
|
# Between consecutive marks
|
|
marks_list = list(marks)
|
|
for i in range(len(marks_list) - 1):
|
|
a = marks_list[i]
|
|
b = marks_list[i + 1]
|
|
if a < b and a < end_ts:
|
|
segments.append((max(a, start_ts), min(b, end_ts)))
|
|
|
|
# From last mark to end
|
|
last = marks_list[-1]
|
|
if last < end_ts:
|
|
segments.append((max(last, start_ts), end_ts))
|
|
return segments
|
|
|
|
|
|
def plot_timeseries_and_mas(out_dir: Path, rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks_sec: list[float]) -> None:
|
|
ensure_dir(out_dir)
|
|
|
|
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
|
|
|
|
# HR plot
|
|
ax = axes[0]
|
|
if not hr_df.empty:
|
|
ax.plot(hr_df['seconds'], hr_df['hr'], label='HR', color='tab:blue', linewidth=0.8, alpha=0.7)
|
|
for seconds, color in zip((5, 10, 30, 60), ('tab:orange', 'tab:green', 'tab:red', 'tab:purple')):
|
|
col = f'hr_ma_{seconds}s'
|
|
if col in hr_df.columns:
|
|
ax.plot(hr_df['seconds'], hr_df[col], label=f'HR MA {seconds}s', color=color, linewidth=1.5)
|
|
ax.set_ylabel('HR (bpm)')
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
# RR plot
|
|
ax = axes[1]
|
|
if not rr_df.empty:
|
|
ax.plot(rr_df['seconds'], rr_df['rr_ms'], label='RR (ms)', color='tab:blue', linewidth=0.8, alpha=0.7)
|
|
for seconds, color in zip((5, 10, 30, 60), ('tab:orange', 'tab:green', 'tab:red', 'tab:purple')):
|
|
col = f'rr_ms_ma_{seconds}s'
|
|
if col in rr_df.columns:
|
|
ax.plot(rr_df['seconds'], rr_df[col], label=f'RR MA {seconds}s', color=color, linewidth=1.5)
|
|
ax.set_ylabel('RR (ms)')
|
|
ax.grid(True, alpha=0.3)
|
|
|
|
# Vertical lines for marks on both subplots
|
|
for ax in axes:
|
|
for s in marks_sec:
|
|
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
|
|
ax.legend(loc='upper right', ncol=2, fontsize=8)
|
|
|
|
axes[-1].set_xlabel('Time (s)')
|
|
fig.suptitle(f'{rec_name} - HR and RR with Moving Averages')
|
|
fig.tight_layout(rect=(0, 0, 1, 0.97))
|
|
|
|
out_path = out_dir / f'{rec_name}_timeseries.png'
|
|
fig.savefig(out_path, dpi=150)
|
|
plt.close(fig)
|
|
|
|
|
|
def plot_separate_moving_averages(out_dir: Path, rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks_sec: list[float]) -> None:
|
|
"""Save separate plots for each moving average window (5s, 10s, 30s, 60s)
|
|
into a dedicated subdirectory per recording.
|
|
|
|
Each figure contains two subplots (HR MA and RR MA) sharing the x-axis in seconds.
|
|
"""
|
|
if (hr_df is None or hr_df.empty) and (rr_df is None or rr_df.empty):
|
|
return
|
|
|
|
ma_dir = out_dir / 'MovingAverages'
|
|
ensure_dir(ma_dir)
|
|
|
|
windows = (5, 10, 30, 60)
|
|
for seconds in windows:
|
|
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
|
|
|
|
# HR MA subplot
|
|
ax = axes[0]
|
|
plotted_any = False
|
|
if hr_df is not None and not hr_df.empty:
|
|
col = f'hr_ma_{seconds}s'
|
|
if col in hr_df.columns:
|
|
ax.plot(hr_df['seconds'], hr_df[col], label=f'HR MA {seconds}s', color='tab:orange', linewidth=1.5)
|
|
plotted_any = True
|
|
if plotted_any:
|
|
for s in marks_sec:
|
|
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
|
|
ax.set_ylabel('HR (bpm)')
|
|
ax.grid(True, alpha=0.3)
|
|
ax.legend(loc='upper right', fontsize=9)
|
|
|
|
# RR MA subplot
|
|
ax = axes[1]
|
|
plotted_any = False
|
|
if rr_df is not None and not rr_df.empty:
|
|
col = f'rr_ms_ma_{seconds}s'
|
|
if col in rr_df.columns:
|
|
ax.plot(rr_df['seconds'], rr_df[col], label=f'RR MA {seconds}s', color='tab:green', linewidth=1.5)
|
|
plotted_any = True
|
|
if plotted_any:
|
|
for s in marks_sec:
|
|
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
|
|
ax.set_ylabel('RR (ms)')
|
|
ax.grid(True, alpha=0.3)
|
|
ax.legend(loc='upper right', fontsize=9)
|
|
|
|
axes[-1].set_xlabel('Time (s)')
|
|
fig.suptitle(f'{rec_name} - Moving Average {seconds}s')
|
|
fig.tight_layout(rect=(0, 0, 1, 0.96))
|
|
|
|
out_path = ma_dir / f'{rec_name}_ma_{seconds}s.png'
|
|
fig.savefig(out_path, dpi=150)
|
|
plt.close(fig)
|
|
|
|
|
|
def plot_segment_boxplots(out_dir: Path, rec_name: str, signal_df: pd.DataFrame, value_col: str, segments: list[tuple[pd.Timestamp, pd.Timestamp]], label_prefix: str) -> None:
|
|
"""Create boxplots of values within each [start, end) segment.
|
|
|
|
If no non-empty segments, skips plotting.
|
|
"""
|
|
if signal_df.empty or not segments:
|
|
return
|
|
|
|
# Collect data per segment
|
|
data = []
|
|
labels = []
|
|
start_ts = signal_df.index.min()
|
|
for (a, b) in segments:
|
|
seg = signal_df.loc[(signal_df.index >= a) & (signal_df.index < b), value_col]
|
|
if seg.empty:
|
|
continue
|
|
data.append(seg.values)
|
|
labels.append(f'{label_prefix} {int((a - start_ts).total_seconds())}-{int((b - start_ts).total_seconds())}s')
|
|
|
|
if not data:
|
|
return
|
|
|
|
fig, ax = plt.subplots(figsize=(max(8, len(data) * 1.2), 5))
|
|
ax.boxplot(data, tick_labels=labels, vert=True, patch_artist=True)
|
|
ax.set_ylabel(value_col)
|
|
ax.set_title(f'{rec_name} - {value_col} Boxplots by Marked Segments')
|
|
ax.grid(True, axis='y', alpha=0.3)
|
|
fig.tight_layout()
|
|
|
|
out_path = out_dir / f'{rec_name}_{value_col}_boxplots.png'
|
|
fig.savefig(out_path, dpi=150)
|
|
plt.close(fig)
|
|
|
|
|
|
def process_recording(rec_dir: Path, plots_root: Path) -> None:
|
|
rec_name = rec_dir.name
|
|
|
|
hr_csv = rec_dir / 'hr.csv'
|
|
rr_csv = rec_dir / 'rr.csv'
|
|
ts_csv = rec_dir / 'timestamps.csv'
|
|
|
|
hr_df = read_signal_csv(hr_csv, 'hr')
|
|
rr_df = read_signal_csv(rr_csv, 'rr_ms')
|
|
rr_df = clean_rr_ms(rr_df, 'rr_ms')
|
|
marks = read_marks(ts_csv)
|
|
|
|
if hr_df.empty and rr_df.empty:
|
|
return
|
|
|
|
# Establish global start and end for the recording (across HR and RR)
|
|
candidates = []
|
|
if not hr_df.empty:
|
|
candidates.append(hr_df.index.min())
|
|
if not rr_df.empty:
|
|
candidates.append(rr_df.index.min())
|
|
start_ts = min(candidates)
|
|
|
|
end_candidates = []
|
|
if not hr_df.empty:
|
|
end_candidates.append(hr_df.index.max())
|
|
if not rr_df.empty:
|
|
end_candidates.append(rr_df.index.max())
|
|
end_ts = max(end_candidates)
|
|
|
|
# Compute moving averages
|
|
if not hr_df.empty:
|
|
hr_df = compute_time_based_moving_averages(hr_df, 'hr')
|
|
hr_df = add_seconds_from_start(hr_df, start_ts)
|
|
if not rr_df.empty:
|
|
rr_df = compute_time_based_moving_averages(rr_df, 'rr_ms')
|
|
rr_df = add_seconds_from_start(rr_df, start_ts)
|
|
|
|
# Marks in seconds relative to start
|
|
marks_sec = []
|
|
if marks is not None and len(marks) > 0:
|
|
marks_sec = [max(0.0, (t - start_ts).total_seconds()) for t in marks]
|
|
|
|
# Output directory for this recording
|
|
out_dir = plots_root / rec_name
|
|
ensure_dir(out_dir)
|
|
|
|
# Time series plots
|
|
plot_timeseries_and_mas(out_dir, rec_name, hr_df, rr_df, marks_sec)
|
|
|
|
# Separate moving average plots per window
|
|
plot_separate_moving_averages(out_dir, rec_name, hr_df, rr_df, marks_sec)
|
|
|
|
# Segments and boxplots (by instruction: between marked timestamps, plus last->end)
|
|
segments = segment_bounds_from_marks(marks, start_ts, end_ts)
|
|
|
|
if not hr_df.empty:
|
|
plot_segment_boxplots(out_dir, rec_name, hr_df, 'hr', segments, label_prefix='t')
|
|
if not rr_df.empty:
|
|
plot_segment_boxplots(out_dir, rec_name, rr_df, 'rr_ms', segments, label_prefix='t')
|
|
|
|
|
|
def main() -> None:
|
|
root = Path(__file__).resolve().parent
|
|
recordings_root = root / 'SingleRecordings'
|
|
plots_root = root / 'Plots' / 'SingleRecordings'
|
|
ensure_dir(plots_root)
|
|
|
|
if not recordings_root.exists():
|
|
print(f'No SingleRecordings directory found at: {recordings_root}')
|
|
return
|
|
|
|
# Iterate through subdirectories that look like recordings
|
|
rec_dirs = sorted([p for p in recordings_root.iterdir() if p.is_dir()])
|
|
if not rec_dirs:
|
|
print('No recording subdirectories found in SingleRecordings/')
|
|
return
|
|
|
|
for rec_dir in rec_dirs:
|
|
# Only process if at least one of the expected CSVs exists
|
|
if any((rec_dir / f).exists() for f in ('hr.csv', 'rr.csv')):
|
|
print(f'Processing {rec_dir.name}...')
|
|
try:
|
|
process_recording(rec_dir, plots_root)
|
|
except Exception as e:
|
|
print(f'Failed to process {rec_dir.name}: {e}')
|
|
|
|
print(f'All done. Plots saved under: {plots_root}')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|
|
|