import os from pathlib import Path import numpy as np import pandas as pd import matplotlib.pyplot as plt def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def read_signal_csv(csv_path: Path, value_column: str) -> pd.DataFrame: """Read a CSV with columns ['timestamp', value_column] into a DataFrame with datetime index. Returns empty DataFrame if file does not exist. """ if not csv_path.exists(): return pd.DataFrame(columns=[value_column]) df = pd.read_csv(csv_path) if 'timestamp' not in df.columns or value_column not in df.columns: return pd.DataFrame(columns=[value_column]) # Convert timestamp (ms) to datetime and set as index df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce') df = df.dropna(subset=['timestamp']) df = df.set_index('timestamp').sort_index() # Coerce values to numeric df[value_column] = pd.to_numeric(df[value_column], errors='coerce') df = df.dropna(subset=[value_column]) return df def compute_time_based_moving_averages(df: pd.DataFrame, value_col: str) -> pd.DataFrame: """Add 5s, 10s, 30s, 60s time-based rolling means for an irregularly sampled series. Requires a DatetimeIndex. """ if df.empty: return df # Ensure index is datetime for time-based rolling windows if not isinstance(df.index, pd.DatetimeIndex): raise ValueError('DataFrame index must be a DatetimeIndex for time-based rolling windows') df = df.copy() for seconds in (5, 10, 30, 60): window = f'{seconds}s' df[f'{value_col}_ma_{seconds}s'] = ( df[value_col].rolling(window=window, min_periods=1).mean() ) return df def add_seconds_from_start(df: pd.DataFrame, start_ts: pd.Timestamp) -> pd.DataFrame: if df.empty: return df df = df.copy() df['seconds'] = (df.index - start_ts).total_seconds() return df def read_marks(csv_path: Path) -> pd.Series: """Read marked timestamps as a pandas Series of pd.Timestamp, sorted ascending. Returns empty Series if file missing or no valid rows. """ if not csv_path.exists(): return pd.Series([], dtype='datetime64[ns]') df = pd.read_csv(csv_path) if 'timestamp' not in df.columns: return pd.Series([], dtype='datetime64[ns]') # Convert to datetime ts = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce').dropna().sort_values() return ts def clean_rr_ms(rr_df: pd.DataFrame, col: str = 'rr_ms', source_name: str | None = None) -> pd.DataFrame: """Basic NN editing for RR in ms with interpolation and reporting. Steps: - Coerce to numeric and mark non-finite as NaN (count) - Mark out-of-range [300, 2000] ms as NaN (count) - Mark robust outliers via 15s rolling median/MAD (z > 3.5) as NaN (count) - Time-based interpolation to fill flagged values (then ffill/bfill) - Print counts summary """ if rr_df is None or rr_df.empty or col not in rr_df.columns: return rr_df df = rr_df.copy() df[col] = pd.to_numeric(df[col], errors='coerce') # Track flags (only threshold filtering per request) nonfinite_mask = ~pd.notna(df[col]) range_mask = (df[col] < 300) | (df[col] > 2000) # Combine flags: non-finite or out-of-range flagged = nonfinite_mask | range_mask # Set flagged to NaN for interpolation df.loc[flagged, col] = np.nan # Interpolate in time, then ffill/bfill for edges if isinstance(df.index, pd.DatetimeIndex): df[col] = df[col].interpolate(method='time', limit_direction='both') else: df[col] = df[col].interpolate(limit_direction='both') df[col] = df[col].ffill().bfill() # Reporting if source_name is None: source_name = 'RR cleaning' print(f"{source_name} - RR filter: nonfinite={int(nonfinite_mask.sum())}, out_of_range={int(range_mask.sum())}, total_flagged={int(flagged.sum())}") return df def segment_bounds_from_marks(marks: pd.Series, start_ts: pd.Timestamp, end_ts: pd.Timestamp) -> list[tuple[pd.Timestamp, pd.Timestamp]]: """Create segments between consecutive marks, plus the final segment from last mark to end. Does NOT include the pre-first mark segment. If there are no marks, returns a single segment from start to end. """ segments: list[tuple[pd.Timestamp, pd.Timestamp]] = [] if marks is None or len(marks) == 0: return [(start_ts, end_ts)] # Between consecutive marks marks_list = list(marks) for i in range(len(marks_list) - 1): a = marks_list[i] b = marks_list[i + 1] if a < b and a < end_ts: segments.append((max(a, start_ts), min(b, end_ts))) # From last mark to end last = marks_list[-1] if last < end_ts: segments.append((max(last, start_ts), end_ts)) return segments def plot_timeseries_and_mas(out_dir: Path, rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks_sec: list[float]) -> None: ensure_dir(out_dir) fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True) # HR plot ax = axes[0] if not hr_df.empty: ax.plot(hr_df['seconds'], hr_df['hr'], label='HR', color='tab:blue', linewidth=0.8, alpha=0.7) for seconds, color in zip((5, 10, 30, 60), ('tab:orange', 'tab:green', 'tab:red', 'tab:purple')): col = f'hr_ma_{seconds}s' if col in hr_df.columns: ax.plot(hr_df['seconds'], hr_df[col], label=f'HR MA {seconds}s', color=color, linewidth=1.5) ax.set_ylabel('HR (bpm)') ax.grid(True, alpha=0.3) # RR plot ax = axes[1] if not rr_df.empty: ax.plot(rr_df['seconds'], rr_df['rr_ms'], label='RR (ms)', color='tab:blue', linewidth=0.8, alpha=0.7) for seconds, color in zip((5, 10, 30, 60), ('tab:orange', 'tab:green', 'tab:red', 'tab:purple')): col = f'rr_ms_ma_{seconds}s' if col in rr_df.columns: ax.plot(rr_df['seconds'], rr_df[col], label=f'RR MA {seconds}s', color=color, linewidth=1.5) ax.set_ylabel('RR (ms)') ax.grid(True, alpha=0.3) # Vertical lines for marks on both subplots for ax in axes: for s in marks_sec: ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7) ax.legend(loc='upper right', ncol=2, fontsize=8) axes[-1].set_xlabel('Time (s)') fig.suptitle(f'{rec_name} - HR and RR with Moving Averages') fig.tight_layout(rect=(0, 0, 1, 0.97)) out_path = out_dir / f'{rec_name}_timeseries.png' fig.savefig(out_path, dpi=150) plt.close(fig) def plot_separate_moving_averages(out_dir: Path, rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks_sec: list[float]) -> None: """Save separate plots for each moving average window (5s, 10s, 30s, 60s) into a dedicated subdirectory per recording. Each figure contains two subplots (HR MA and RR MA) sharing the x-axis in seconds. """ if (hr_df is None or hr_df.empty) and (rr_df is None or rr_df.empty): return ma_dir = out_dir / 'MovingAverages' ensure_dir(ma_dir) windows = (5, 10, 30, 60) for seconds in windows: fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True) # HR MA subplot ax = axes[0] plotted_any = False if hr_df is not None and not hr_df.empty: col = f'hr_ma_{seconds}s' if col in hr_df.columns: ax.plot(hr_df['seconds'], hr_df[col], label=f'HR MA {seconds}s', color='tab:orange', linewidth=1.5) plotted_any = True if plotted_any: for s in marks_sec: ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7) ax.set_ylabel('HR (bpm)') ax.grid(True, alpha=0.3) ax.legend(loc='upper right', fontsize=9) # RR MA subplot ax = axes[1] plotted_any = False if rr_df is not None and not rr_df.empty: col = f'rr_ms_ma_{seconds}s' if col in rr_df.columns: ax.plot(rr_df['seconds'], rr_df[col], label=f'RR MA {seconds}s', color='tab:green', linewidth=1.5) plotted_any = True if plotted_any: for s in marks_sec: ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7) ax.set_ylabel('RR (ms)') ax.grid(True, alpha=0.3) ax.legend(loc='upper right', fontsize=9) axes[-1].set_xlabel('Time (s)') fig.suptitle(f'{rec_name} - Moving Average {seconds}s') fig.tight_layout(rect=(0, 0, 1, 0.96)) out_path = ma_dir / f'{rec_name}_ma_{seconds}s.png' fig.savefig(out_path, dpi=150) plt.close(fig) def plot_segment_boxplots(out_dir: Path, rec_name: str, signal_df: pd.DataFrame, value_col: str, segments: list[tuple[pd.Timestamp, pd.Timestamp]], label_prefix: str) -> None: """Create boxplots of values within each [start, end) segment. If no non-empty segments, skips plotting. """ if signal_df.empty or not segments: return # Collect data per segment data = [] labels = [] start_ts = signal_df.index.min() for (a, b) in segments: seg = signal_df.loc[(signal_df.index >= a) & (signal_df.index < b), value_col] if seg.empty: continue data.append(seg.values) labels.append(f'{label_prefix} {int((a - start_ts).total_seconds())}-{int((b - start_ts).total_seconds())}s') if not data: return fig, ax = plt.subplots(figsize=(max(8, len(data) * 1.2), 5)) ax.boxplot(data, tick_labels=labels, vert=True, patch_artist=True) ax.set_ylabel(value_col) ax.set_title(f'{rec_name} - {value_col} Boxplots by Marked Segments') ax.grid(True, axis='y', alpha=0.3) fig.tight_layout() out_path = out_dir / f'{rec_name}_{value_col}_boxplots.png' fig.savefig(out_path, dpi=150) plt.close(fig) def process_recording(rec_dir: Path, plots_root: Path) -> None: rec_name = rec_dir.name hr_csv = rec_dir / 'hr.csv' rr_csv = rec_dir / 'rr.csv' ts_csv = rec_dir / 'timestamps.csv' hr_df = read_signal_csv(hr_csv, 'hr') rr_df = read_signal_csv(rr_csv, 'rr_ms') rr_df = clean_rr_ms(rr_df, 'rr_ms') marks = read_marks(ts_csv) if hr_df.empty and rr_df.empty: return # Establish global start and end for the recording (across HR and RR) candidates = [] if not hr_df.empty: candidates.append(hr_df.index.min()) if not rr_df.empty: candidates.append(rr_df.index.min()) start_ts = min(candidates) end_candidates = [] if not hr_df.empty: end_candidates.append(hr_df.index.max()) if not rr_df.empty: end_candidates.append(rr_df.index.max()) end_ts = max(end_candidates) # Compute moving averages if not hr_df.empty: hr_df = compute_time_based_moving_averages(hr_df, 'hr') hr_df = add_seconds_from_start(hr_df, start_ts) if not rr_df.empty: rr_df = compute_time_based_moving_averages(rr_df, 'rr_ms') rr_df = add_seconds_from_start(rr_df, start_ts) # Marks in seconds relative to start marks_sec = [] if marks is not None and len(marks) > 0: marks_sec = [max(0.0, (t - start_ts).total_seconds()) for t in marks] # Output directory for this recording out_dir = plots_root / rec_name ensure_dir(out_dir) # Time series plots plot_timeseries_and_mas(out_dir, rec_name, hr_df, rr_df, marks_sec) # Separate moving average plots per window plot_separate_moving_averages(out_dir, rec_name, hr_df, rr_df, marks_sec) # Segments and boxplots (by instruction: between marked timestamps, plus last->end) segments = segment_bounds_from_marks(marks, start_ts, end_ts) if not hr_df.empty: plot_segment_boxplots(out_dir, rec_name, hr_df, 'hr', segments, label_prefix='t') if not rr_df.empty: plot_segment_boxplots(out_dir, rec_name, rr_df, 'rr_ms', segments, label_prefix='t') def main() -> None: root = Path(__file__).resolve().parent recordings_root = root / 'SingleRecordings' plots_root = root / 'Plots' / 'SingleRecordings' ensure_dir(plots_root) if not recordings_root.exists(): print(f'No SingleRecordings directory found at: {recordings_root}') return # Iterate through subdirectories that look like recordings rec_dirs = sorted([p for p in recordings_root.iterdir() if p.is_dir()]) if not rec_dirs: print('No recording subdirectories found in SingleRecordings/') return for rec_dir in rec_dirs: # Only process if at least one of the expected CSVs exists if any((rec_dir / f).exists() for f in ('hr.csv', 'rr.csv')): print(f'Processing {rec_dir.name}...') try: process_recording(rec_dir, plots_root) except Exception as e: print(f'Failed to process {rec_dir.name}: {e}') print(f'All done. Plots saved under: {plots_root}') if __name__ == '__main__': main()