import math from pathlib import Path import numpy as np import pandas as pd import matplotlib.pyplot as plt def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def read_signal_csv(csv_path: Path, value_column: str) -> pd.DataFrame: if not csv_path.exists(): return pd.DataFrame(columns=[value_column]) df = pd.read_csv(csv_path) if 'timestamp' not in df.columns or value_column not in df.columns: return pd.DataFrame(columns=[value_column]) df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce') df = df.dropna(subset=['timestamp']).set_index('timestamp').sort_index() df[value_column] = pd.to_numeric(df[value_column], errors='coerce') df = df.dropna(subset=[value_column]) return df def clean_rr_ms(rr_df: pd.DataFrame, col: str = 'rr_ms', source_name: str | None = None) -> pd.DataFrame: """RR cleaning with simple thresholding + interpolation and reporting. - Coerce to numeric; mark non-finite as NaN (count) - Mark out-of-range [300, 2000] ms as NaN (count) - Interpolate in time; ffill/bfill edges - Print counts """ if rr_df is None or rr_df.empty or col not in rr_df.columns: return rr_df df = rr_df.copy() df[col] = pd.to_numeric(df[col], errors='coerce') nonfinite_mask = ~pd.notna(df[col]) range_mask = (df[col] < 300) | (df[col] > 2000) flagged = nonfinite_mask | range_mask df.loc[flagged, col] = np.nan if isinstance(df.index, pd.DatetimeIndex): df[col] = df[col].interpolate(method='time', limit_direction='both') else: df[col] = df[col].interpolate(limit_direction='both') df[col] = df[col].ffill().bfill() if source_name is None: source_name = 'RR cleaning' print(f"{source_name} - RR filter: nonfinite={int(nonfinite_mask.sum())}, out_of_range={int(range_mask.sum())}, total_flagged={int(flagged.sum())}") return df def read_marks(csv_path: Path) -> pd.Series: if not csv_path.exists(): return pd.Series([], dtype='datetime64[ns]') df = pd.read_csv(csv_path) if 'timestamp' not in df.columns: return pd.Series([], dtype='datetime64[ns]') ts = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce').dropna().sort_values() return ts def get_recording_bounds(hr_df: pd.DataFrame, rr_df: pd.DataFrame) -> tuple[pd.Timestamp, pd.Timestamp] | tuple[None, None]: if (hr_df is None or hr_df.empty) and (rr_df is None or rr_df.empty): return None, None starts = [] ends = [] if hr_df is not None and not hr_df.empty: starts.append(hr_df.index.min()) ends.append(hr_df.index.max()) if rr_df is not None and not rr_df.empty: starts.append(rr_df.index.min()) ends.append(rr_df.index.max()) return min(starts), max(ends) def segment_slices(marks: list[pd.Timestamp], start_ts: pd.Timestamp, end_ts: pd.Timestamp) -> dict: """Return dict with segments: pre, betweens (list of (a,b)), post. Assumes exactly 4 marks provided. """ assert len(marks) == 4 a, b, c, d = marks pre = (start_ts, a) betweens = [(a, b), (b, c), (c, d)] post = (d, end_ts) return {'pre': pre, 'betweens': betweens, 'post': post} def rmssd(rr_ms_series: pd.Series) -> float | None: """Compute RMSSD (ms) for a series of RR values in milliseconds. Returns None if insufficient data. """ if rr_ms_series is None or len(rr_ms_series) < 2: return None rr = rr_ms_series.values.astype(float) diff = np.diff(rr) if diff.size == 0: return None return float(np.sqrt(np.mean(np.square(diff)))) def summarize_segments_for_recording(rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks: pd.Series) -> dict | None: # Only include recordings with exactly 4 marks if marks is None or len(marks) != 4: return None start_ts, end_ts = get_recording_bounds(hr_df, rr_df) if start_ts is None or end_ts is None: return None segs = segment_slices(list(marks), start_ts, end_ts) # Helper to subset by time range def subset(df: pd.DataFrame, value_col: str, a: pd.Timestamp, b: pd.Timestamp) -> pd.Series: if df is None or df.empty: return pd.Series([], dtype=float) return df.loc[(df.index >= a) & (df.index < b), value_col] # Pre segment pre_hr = subset(hr_df, 'hr', *segs['pre']) pre_rr = subset(rr_df, 'rr_ms', *segs['pre']) pre_rmssd = rmssd(pre_rr) if len(pre_rr) >= 2 else None # Three between segments: compute individually between_vals = [] # list of dicts with hr, rr, rmssd for a, b in segs['betweens']: bh = subset(hr_df, 'hr', a, b) br = subset(rr_df, 'rr_ms', a, b) between_vals.append({ 'hr': bh.values if len(bh) > 0 else None, 'rr': br.values if len(br) > 0 else None, 'rmssd': rmssd(br) if len(br) >= 2 else None, }) # Post segment post_hr = subset(hr_df, 'hr', *segs['post']) post_rr = subset(rr_df, 'rr_ms', *segs['post']) post_rmssd = rmssd(post_rr) if len(post_rr) >= 2 else None # Build summary dict (use medians for HR/RR to be robust) for five named conditions def safe_median(arr_like) -> float | None: if arr_like is None: return None vals = np.asarray(arr_like, dtype=float) vals = vals[~np.isnan(vals)] if vals.size == 0: return None return float(np.median(vals)) summary = { 'recording': rec_name, # HR medians 'hr_breathing1': safe_median(pre_hr.values if len(pre_hr) > 0 else None), 'hr_spring': safe_median(between_vals[0]['hr'] if between_vals and between_vals[0]['hr'] is not None else None), 'hr_summer': safe_median(between_vals[1]['hr'] if len(between_vals) > 1 and between_vals[1]['hr'] is not None else None), 'hr_autumn': safe_median(between_vals[2]['hr'] if len(between_vals) > 2 and between_vals[2]['hr'] is not None else None), 'hr_breathing2': safe_median(post_hr.values if len(post_hr) > 0 else None), # RR medians 'rr_breathing1': safe_median(pre_rr.values if len(pre_rr) > 0 else None), 'rr_spring': safe_median(between_vals[0]['rr'] if between_vals and between_vals[0]['rr'] is not None else None), 'rr_summer': safe_median(between_vals[1]['rr'] if len(between_vals) > 1 and between_vals[1]['rr'] is not None else None), 'rr_autumn': safe_median(between_vals[2]['rr'] if len(between_vals) > 2 and between_vals[2]['rr'] is not None else None), 'rr_breathing2': safe_median(post_rr.values if len(post_rr) > 0 else None), # RMSSD ms 'rmssd_breathing1_ms': pre_rmssd if pre_rmssd is not None and not math.isnan(pre_rmssd) else None, 'rmssd_spring_ms': between_vals[0]['rmssd'] if between_vals and between_vals[0]['rmssd'] is not None and not math.isnan(between_vals[0]['rmssd']) else None, 'rmssd_summer_ms': between_vals[1]['rmssd'] if len(between_vals) > 1 and between_vals[1]['rmssd'] is not None and not math.isnan(between_vals[1]['rmssd']) else None, 'rmssd_autumn_ms': between_vals[2]['rmssd'] if len(between_vals) > 2 and between_vals[2]['rmssd'] is not None and not math.isnan(between_vals[2]['rmssd']) else None, 'rmssd_breathing2_ms': post_rmssd if post_rmssd is not None and not math.isnan(post_rmssd) else None, } return summary def boxplot_five_categories(values_df: pd.DataFrame, value_cols: tuple[str, str, str, str, str], title: str, ylabel: str, out_path: Path) -> dict: """Create a boxplot comparing five named conditions across recordings. value_cols is a tuple of (breathing1, spring, summer, autumn, breathing2) """ b1, sp, su, au, b2 = value_cols df = values_df[[b1, sp, su, au, b2]].dropna(how='all') if df.empty: return {} data = [df[b1].dropna().values, df[sp].dropna().values, df[su].dropna().values, df[au].dropna().values, df[b2].dropna().values] labels = ['Breathing Scene 1', 'Spring Scene', 'Summer Scene', 'Autumn Scene', 'Breathing Scene 2'] fig, ax = plt.subplots(figsize=(10, 5)) ax.boxplot(data, tick_labels=labels, vert=True, patch_artist=True) # Overlay small dot for the mean per category means = [float(np.mean(d)) if len(d) > 0 else np.nan for d in data] ax.scatter(range(1, 6), means, color='black', s=20, zorder=3) ax.set_title(title) ax.set_ylabel(ylabel) ax.grid(True, axis='y', alpha=0.3) fig.tight_layout() fig.savefig(out_path, dpi=150) plt.close(fig) # Return mapping of label -> mean return {label: mean for label, mean in zip(labels, means)} def compute_and_plot_aligned_hr(recordings_root: Path, out_root: Path) -> None: """Compute an across-recordings averaged HR curve aligned by the 4 timestamps. Approach: - Use recordings with exactly 4 marks and available HR - Compute median durations for the 5 segments (pre, 3 betweens, post) - Map each recording piecewise-linearly so marks align to the same normalized positions based on the median segment-length proportions - Interpolate each recording to a common grid and average """ recs: list[dict] = [] seg_durations: list[tuple[float, float, float, float, float]] = [] for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]): rec_name = rec_dir.name hr_csv = rec_dir / 'hr.csv' ts_csv = rec_dir / 'timestamps.csv' if not hr_csv.exists() or not ts_csv.exists(): continue hr_df = read_signal_csv(hr_csv, 'hr') marks = read_marks(ts_csv) if hr_df.empty or marks is None or len(marks) != 4: continue # Determine bounds start_ts = hr_df.index.min() end_ts = hr_df.index.max() a, b, c, d = list(marks) # Ensure marks fall within HR bounds if not (start_ts < a < b < c < d < end_ts): # Skip if pathological continue # Segment durations in seconds pre_d = (a - start_ts).total_seconds() d12 = (b - a).total_seconds() d23 = (c - b).total_seconds() d34 = (d - c).total_seconds() post_d = (end_ts - d).total_seconds() seg_durations.append((pre_d, d12, d23, d34, post_d)) # Store for second pass recs.append({'name': rec_name, 'hr': hr_df, 'start': start_ts, 'marks': (a, b, c, d), 'end': end_ts}) if not recs: return # Median segment durations and normalized cumulative boundaries med = np.nanmedian(np.array(seg_durations), axis=0) total = float(np.sum(med)) if total <= 0: return proportions = med / total boundaries = np.concatenate(([0.0], np.cumsum(proportions))) # length 6, last == 1.0 # Common grid (normalized [0,1]) and scaled seconds by median total duration N = 1000 x_norm = np.linspace(0.0, 1.0, N) x_sec = x_norm * total # Helper: piecewise-linear map original timestamp to normalized coordinate def map_times_to_norm(hr_df: pd.DataFrame, start: pd.Timestamp, marks: tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp], end: pd.Timestamp) -> tuple[np.ndarray, np.ndarray]: a, b, c, d = marks segments = [ (start, a, boundaries[0], boundaries[1]), (a, b, boundaries[1], boundaries[2]), (b, c, boundaries[2], boundaries[3]), (c, d, boundaries[3], boundaries[4]), (d, end, boundaries[4], boundaries[5]), ] u_list = [] y_list = [] for seg_start, seg_end, u0, u1 in segments: seg = hr_df.loc[(hr_df.index >= seg_start) & (hr_df.index <= seg_end), 'hr'] if seg.empty: continue dur_ns = (seg_end.value - seg_start.value) if dur_ns <= 0: continue rel = (seg.index.view('int64') - np.int64(seg_start.value)) / np.float64(dur_ns) rel = np.clip(rel.astype(float), 0.0, 1.0) u = u0 + rel * (u1 - u0) u_list.append(u) y_list.append(seg.values.astype(float)) if not u_list: return np.array([]), np.array([]) u_all = np.concatenate(u_list) y_all = np.concatenate(y_list) order = np.argsort(u_all) u_sorted = u_all[order] y_sorted = y_all[order] uniq_u, idx_start = np.unique(u_sorted, return_index=True) counts = np.diff(np.append(idx_start, len(u_sorted))) y_avg = [] for i, cnt in enumerate(counts): start_idx = idx_start[i] y_avg.append(np.mean(y_sorted[start_idx:start_idx + cnt])) y_avg = np.asarray(y_avg) return uniq_u, y_avg curves = [] for rec in recs: u, y = map_times_to_norm(rec['hr'], rec['start'], rec['marks'], rec['end']) if u.size < 2: continue y_grid = np.interp(x_norm, u, y, left=np.nan, right=np.nan) y_grid[x_norm < u.min()] = np.nan y_grid[x_norm > u.max()] = np.nan curves.append(y_grid) if not curves: return Y = np.vstack(curves) n = np.sum(~np.isnan(Y), axis=0) marr = np.ma.array(Y, mask=np.isnan(Y)) mean = marr.mean(axis=0).filled(np.nan) std = marr.std(axis=0).filled(np.nan) # Plot fig, ax = plt.subplots(figsize=(12, 5)) ax.plot(x_sec, mean, color='tab:blue', label='Mean HR') ax.fill_between(x_sec, mean - std, mean + std, color='tab:blue', alpha=0.2, label='±1 SD') for xb in boundaries * total: ax.axvline(xb, color='k', linestyle='--', linewidth=0.8, alpha=0.5) ax.set_xlabel('Aligned time (s)') ax.set_ylabel('HR (bpm)') ax.set_title('Aligned Average HR Across Recordings (4 marks)') ax.grid(True, alpha=0.3) ax.legend(loc='upper right') fig.tight_layout() fig.savefig(out_root / 'HR_average_aligned.png', dpi=150) plt.close(fig) out_csv = out_root / 'HR_average_aligned.csv' out_df = pd.DataFrame({ 'normalized_time': x_norm, 'aligned_seconds': x_sec, 'hr_mean': mean, 'hr_std': std, 'n': n, }) out_df.to_csv(out_csv, index=False) def compute_and_plot_aligned_rr(recordings_root: Path, out_root: Path) -> None: """Aligned average RR (ms) across recordings with 4 marks (same method as HR).""" recs: list[dict] = [] seg_durations: list[tuple[float, float, float, float, float]] = [] for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]): rr_csv = rec_dir / 'rr.csv' ts_csv = rec_dir / 'timestamps.csv' if not rr_csv.exists() or not ts_csv.exists(): continue rr_df = read_signal_csv(rr_csv, 'rr_ms') rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_dir.name} (aligned RR)') marks = read_marks(ts_csv) if rr_df.empty or marks is None or len(marks) != 4: continue start_ts = rr_df.index.min() end_ts = rr_df.index.max() a, b, c, d = list(marks) if not (start_ts < a < b < c < d < end_ts): continue pre_d = (a - start_ts).total_seconds() d12 = (b - a).total_seconds() d23 = (c - b).total_seconds() d34 = (d - c).total_seconds() post_d = (end_ts - d).total_seconds() seg_durations.append((pre_d, d12, d23, d34, post_d)) recs.append({'df': rr_df, 'start': start_ts, 'marks': (a, b, c, d), 'end': end_ts}) if not recs: return med = np.nanmedian(np.array(seg_durations), axis=0) total = float(np.sum(med)) if total <= 0: return boundaries = np.concatenate(([0.0], np.cumsum(med / total))) N = 1000 x_norm = np.linspace(0.0, 1.0, N) x_sec = x_norm * total def map_times_to_norm(df: pd.DataFrame, start: pd.Timestamp, marks: tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp], end: pd.Timestamp) -> tuple[np.ndarray, np.ndarray]: a, b, c, d = marks segments = [ (start, a, boundaries[0], boundaries[1]), (a, b, boundaries[1], boundaries[2]), (b, c, boundaries[2], boundaries[3]), (c, d, boundaries[3], boundaries[4]), (d, end, boundaries[4], boundaries[5]), ] u_list = [] y_list = [] for seg_start, seg_end, u0, u1 in segments: seg = df.loc[(df.index >= seg_start) & (df.index <= seg_end), 'rr_ms'] if seg.empty: continue dur_ns = (seg_end.value - seg_start.value) if dur_ns <= 0: continue rel = (seg.index.view('int64') - np.int64(seg_start.value)) / np.float64(dur_ns) rel = np.clip(rel.astype(float), 0.0, 1.0) u = u0 + rel * (u1 - u0) u_list.append(u) y_list.append(seg.values.astype(float)) if not u_list: return np.array([]), np.array([]) u_all = np.concatenate(u_list) y_all = np.concatenate(y_list) order = np.argsort(u_all) u_sorted = u_all[order] y_sorted = y_all[order] uniq_u, idx_start = np.unique(u_sorted, return_index=True) counts = np.diff(np.append(idx_start, len(u_sorted))) y_avg = [] for i, cnt in enumerate(counts): start_idx = idx_start[i] y_avg.append(np.mean(y_sorted[start_idx:start_idx + cnt])) y_avg = np.asarray(y_avg) return uniq_u, y_avg curves = [] for rec in recs: u, y = map_times_to_norm(rec['df'], rec['start'], rec['marks'], rec['end']) if u.size < 2: continue y_grid = np.interp(x_norm, u, y, left=np.nan, right=np.nan) y_grid[x_norm < u.min()] = np.nan y_grid[x_norm > u.max()] = np.nan curves.append(y_grid) if not curves: return Y = np.vstack(curves) n = np.sum(~np.isnan(Y), axis=0) marr = np.ma.array(Y, mask=np.isnan(Y)) mean = marr.mean(axis=0).filled(np.nan) std = marr.std(axis=0).filled(np.nan) fig, ax = plt.subplots(figsize=(12, 5)) ax.plot(x_sec, mean, color='tab:green', label='Mean RR') ax.fill_between(x_sec, mean - std, mean + std, color='tab:green', alpha=0.2, label='±1 SD') for xb in boundaries * total: ax.axvline(xb, color='k', linestyle='--', linewidth=0.8, alpha=0.5) ax.set_xlabel('Aligned time (s)') ax.set_ylabel('RR (ms)') ax.set_title('Aligned Average RR Across Recordings (4 marks)') ax.grid(True, alpha=0.3) ax.legend(loc='upper right') fig.tight_layout() fig.savefig(out_root / 'RR_average_aligned.png', dpi=150) plt.close(fig) out_csv = out_root / 'RR_average_aligned.csv' pd.DataFrame({'normalized_time': x_norm, 'aligned_seconds': x_sec, 'rr_mean': mean, 'rr_std': std, 'n': n}).to_csv(out_csv, index=False) def compute_and_plot_aligned_rmssd(recordings_root: Path, out_root: Path, window_seconds: int = 30) -> None: """Aligned average rolling RMSSD (ms) across recordings with 4 marks. RMSSD is computed as a time-based rolling window over RR (default 30s). """ def rolling_rmssd(rr: pd.Series) -> pd.Series: def rmssd_func(x: np.ndarray) -> float: if x.size < 2: return np.nan d = np.diff(x.astype(float)) if d.size == 0: return np.nan return float(np.sqrt(np.mean(d * d))) return rr.rolling(f'{window_seconds}s', min_periods=5).apply(rmssd_func, raw=True) recs: list[dict] = [] seg_durations: list[tuple[float, float, float, float, float]] = [] for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]): rr_csv = rec_dir / 'rr.csv' ts_csv = rec_dir / 'timestamps.csv' if not rr_csv.exists() or not ts_csv.exists(): continue rr_df = read_signal_csv(rr_csv, 'rr_ms') rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_dir.name} (aligned RMSSD)') marks = read_marks(ts_csv) if rr_df.empty or marks is None or len(marks) != 4: continue # Compute rolling RMSSD series rmssd_series = rolling_rmssd(rr_df['rr_ms']).dropna() if rmssd_series.empty: continue rmssd_df = rmssd_series.to_frame(name='rmssd') start_ts = rmssd_df.index.min() end_ts = rmssd_df.index.max() a, b, c, d = list(marks) if not (start_ts < a < b < c < d < end_ts): continue pre_d = (a - start_ts).total_seconds() d12 = (b - a).total_seconds() d23 = (c - b).total_seconds() d34 = (d - c).total_seconds() post_d = (end_ts - d).total_seconds() seg_durations.append((pre_d, d12, d23, d34, post_d)) recs.append({'df': rmssd_df, 'start': start_ts, 'marks': (a, b, c, d), 'end': end_ts}) if not recs: return med = np.nanmedian(np.array(seg_durations), axis=0) total = float(np.sum(med)) if total <= 0: return boundaries = np.concatenate(([0.0], np.cumsum(med / total))) N = 1000 x_norm = np.linspace(0.0, 1.0, N) x_sec = x_norm * total def map_times_to_norm(df: pd.DataFrame, start: pd.Timestamp, marks: tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp], end: pd.Timestamp) -> tuple[np.ndarray, np.ndarray]: a, b, c, d = marks segments = [ (start, a, boundaries[0], boundaries[1]), (a, b, boundaries[1], boundaries[2]), (b, c, boundaries[2], boundaries[3]), (c, d, boundaries[3], boundaries[4]), (d, end, boundaries[4], boundaries[5]), ] u_list = [] y_list = [] for seg_start, seg_end, u0, u1 in segments: seg = df.loc[(df.index >= seg_start) & (df.index <= seg_end), 'rmssd'] if seg.empty: continue dur_ns = (seg_end.value - seg_start.value) if dur_ns <= 0: continue rel = (seg.index.view('int64') - np.int64(seg_start.value)) / np.float64(dur_ns) rel = np.clip(rel.astype(float), 0.0, 1.0) u = u0 + rel * (u1 - u0) u_list.append(u) y_list.append(seg.values.astype(float)) if not u_list: return np.array([]), np.array([]) u_all = np.concatenate(u_list) y_all = np.concatenate(y_list) order = np.argsort(u_all) u_sorted = u_all[order] y_sorted = y_all[order] uniq_u, idx_start = np.unique(u_sorted, return_index=True) counts = np.diff(np.append(idx_start, len(u_sorted))) y_avg = [] for i, cnt in enumerate(counts): start_idx = idx_start[i] y_avg.append(np.mean(y_sorted[start_idx:start_idx + cnt])) y_avg = np.asarray(y_avg) return uniq_u, y_avg curves = [] for rec in recs: u, y = map_times_to_norm(rec['df'], rec['start'], rec['marks'], rec['end']) if u.size < 2: continue y_grid = np.interp(x_norm, u, y, left=np.nan, right=np.nan) y_grid[x_norm < u.min()] = np.nan y_grid[x_norm > u.max()] = np.nan curves.append(y_grid) if not curves: return Y = np.vstack(curves) n = np.sum(~np.isnan(Y), axis=0) marr = np.ma.array(Y, mask=np.isnan(Y)) mean = marr.mean(axis=0).filled(np.nan) std = marr.std(axis=0).filled(np.nan) fig, ax = plt.subplots(figsize=(12, 5)) ax.plot(x_sec, mean, color='tab:red', label=f'Mean RMSSD ({window_seconds}s window)') ax.fill_between(x_sec, mean - std, mean + std, color='tab:red', alpha=0.2, label='±1 SD') for xb in boundaries * total: ax.axvline(xb, color='k', linestyle='--', linewidth=0.8, alpha=0.5) ax.set_xlabel('Aligned time (s)') ax.set_ylabel('RMSSD (ms)') ax.set_title('Aligned Average RMSSD Across Recordings (4 marks)') ax.grid(True, alpha=0.3) ax.legend(loc='upper right') fig.tight_layout() fig.savefig(out_root / 'RMSSD_average_aligned.png', dpi=150) plt.close(fig) out_csv = out_root / 'RMSSD_average_aligned.csv' pd.DataFrame({'normalized_time': x_norm, 'aligned_seconds': x_sec, 'rmssd_mean': mean, 'rmssd_std': std, 'n': n}).to_csv(out_csv, index=False) def main() -> None: root = Path(__file__).resolve().parent recordings_root = root / 'SingleRecordings' out_root = root / 'Plots' / 'Aggregate' ensure_dir(out_root) summaries: list[dict] = [] for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]): rec_name = rec_dir.name hr_df = read_signal_csv(rec_dir / 'hr.csv', 'hr') rr_df = read_signal_csv(rec_dir / 'rr.csv', 'rr_ms') rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_name} (summary)') marks = read_marks(rec_dir / 'timestamps.csv') if marks is None or len(marks) != 4: # Ignore recordings not having exactly 4 timestamps continue s = summarize_segments_for_recording(rec_name, hr_df, rr_df, marks) if s is not None: summaries.append(s) if not summaries: print('No qualifying recordings (exactly 4 timestamps) found for aggregation.') return df = pd.DataFrame(summaries) df.to_csv(out_root / 'summary_metrics.csv', index=False) # Generate aggregate boxplots for HR, RR, RMSSD with five categories hr_means = boxplot_five_categories( df, ('hr_breathing1', 'hr_spring', 'hr_summer', 'hr_autumn', 'hr_breathing2'), 'HR Median Across Conditions (All Recordings)', 'HR (bpm)', out_root / 'HR_boxplot_conditions.png', ) rr_means = boxplot_five_categories( df, ('rr_breathing1', 'rr_spring', 'rr_summer', 'rr_autumn', 'rr_breathing2'), 'RR Median Across Conditions (All Recordings)', 'RR (ms)', out_root / 'RR_boxplot_conditions.png', ) rmssd_means = boxplot_five_categories( df, ('rmssd_breathing1_ms', 'rmssd_spring_ms', 'rmssd_summer_ms', 'rmssd_autumn_ms', 'rmssd_breathing2_ms'), 'RMSSD Across Conditions (All Recordings)', 'RMSSD (ms)', out_root / 'RMSSD_boxplot_conditions.png', ) # Write means to a txt file means_txt = out_root / 'condition_means.txt' with means_txt.open('w', encoding='utf-8') as f: def write_block(name: str, mapping: dict): if not mapping: return f.write(f'{name} means by condition\n') for label, val in mapping.items(): if val is None or np.isnan(val): f.write(f' {label}: NA\n') else: f.write(f' {label}: {val:.2f}\n') f.write('\n') write_block('HR (bpm)', hr_means) write_block('RR (ms)', rr_means) write_block('RMSSD (ms)', rmssd_means) print(f'Aggregation complete. Outputs at: {out_root}') # Also compute HR average over aligned time across recordings with 4 marks compute_and_plot_aligned_hr(recordings_root, out_root) compute_and_plot_aligned_rr(recordings_root, out_root) compute_and_plot_aligned_rmssd(recordings_root, out_root) if __name__ == '__main__': main()