Files
Bachelor-Arbeit-Sophia-Habe…/aggregate_segments_analysis.py
2025-09-24 17:35:06 +02:00

682 lines
27 KiB
Python

import math
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def read_signal_csv(csv_path: Path, value_column: str) -> pd.DataFrame:
if not csv_path.exists():
return pd.DataFrame(columns=[value_column])
df = pd.read_csv(csv_path)
if 'timestamp' not in df.columns or value_column not in df.columns:
return pd.DataFrame(columns=[value_column])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce')
df = df.dropna(subset=['timestamp']).set_index('timestamp').sort_index()
df[value_column] = pd.to_numeric(df[value_column], errors='coerce')
df = df.dropna(subset=[value_column])
return df
def clean_rr_ms(rr_df: pd.DataFrame, col: str = 'rr_ms', source_name: str | None = None) -> pd.DataFrame:
"""RR cleaning with simple thresholding + interpolation and reporting.
- Coerce to numeric; mark non-finite as NaN (count)
- Mark out-of-range [300, 2000] ms as NaN (count)
- Interpolate in time; ffill/bfill edges
- Print counts
"""
if rr_df is None or rr_df.empty or col not in rr_df.columns:
return rr_df
df = rr_df.copy()
df[col] = pd.to_numeric(df[col], errors='coerce')
nonfinite_mask = ~pd.notna(df[col])
range_mask = (df[col] < 300) | (df[col] > 2000)
flagged = nonfinite_mask | range_mask
df.loc[flagged, col] = np.nan
if isinstance(df.index, pd.DatetimeIndex):
df[col] = df[col].interpolate(method='time', limit_direction='both')
else:
df[col] = df[col].interpolate(limit_direction='both')
df[col] = df[col].ffill().bfill()
if source_name is None:
source_name = 'RR cleaning'
print(f"{source_name} - RR filter: nonfinite={int(nonfinite_mask.sum())}, out_of_range={int(range_mask.sum())}, total_flagged={int(flagged.sum())}")
return df
def read_marks(csv_path: Path) -> pd.Series:
if not csv_path.exists():
return pd.Series([], dtype='datetime64[ns]')
df = pd.read_csv(csv_path)
if 'timestamp' not in df.columns:
return pd.Series([], dtype='datetime64[ns]')
ts = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce').dropna().sort_values()
return ts
def get_recording_bounds(hr_df: pd.DataFrame, rr_df: pd.DataFrame) -> tuple[pd.Timestamp, pd.Timestamp] | tuple[None, None]:
if (hr_df is None or hr_df.empty) and (rr_df is None or rr_df.empty):
return None, None
starts = []
ends = []
if hr_df is not None and not hr_df.empty:
starts.append(hr_df.index.min())
ends.append(hr_df.index.max())
if rr_df is not None and not rr_df.empty:
starts.append(rr_df.index.min())
ends.append(rr_df.index.max())
return min(starts), max(ends)
def segment_slices(marks: list[pd.Timestamp], start_ts: pd.Timestamp, end_ts: pd.Timestamp) -> dict:
"""Return dict with segments: pre, betweens (list of (a,b)), post.
Assumes exactly 4 marks provided.
"""
assert len(marks) == 4
a, b, c, d = marks
pre = (start_ts, a)
betweens = [(a, b), (b, c), (c, d)]
post = (d, end_ts)
return {'pre': pre, 'betweens': betweens, 'post': post}
def rmssd(rr_ms_series: pd.Series) -> float | None:
"""Compute RMSSD (ms) for a series of RR values in milliseconds.
Returns None if insufficient data.
"""
if rr_ms_series is None or len(rr_ms_series) < 2:
return None
rr = rr_ms_series.values.astype(float)
diff = np.diff(rr)
if diff.size == 0:
return None
return float(np.sqrt(np.mean(np.square(diff))))
def summarize_segments_for_recording(rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks: pd.Series) -> dict | None:
# Only include recordings with exactly 4 marks
if marks is None or len(marks) != 4:
return None
start_ts, end_ts = get_recording_bounds(hr_df, rr_df)
if start_ts is None or end_ts is None:
return None
segs = segment_slices(list(marks), start_ts, end_ts)
# Helper to subset by time range
def subset(df: pd.DataFrame, value_col: str, a: pd.Timestamp, b: pd.Timestamp) -> pd.Series:
if df is None or df.empty:
return pd.Series([], dtype=float)
return df.loc[(df.index >= a) & (df.index < b), value_col]
# Pre segment
pre_hr = subset(hr_df, 'hr', *segs['pre'])
pre_rr = subset(rr_df, 'rr_ms', *segs['pre'])
pre_rmssd = rmssd(pre_rr) if len(pre_rr) >= 2 else None
# Three between segments: compute individually
between_vals = [] # list of dicts with hr, rr, rmssd
for a, b in segs['betweens']:
bh = subset(hr_df, 'hr', a, b)
br = subset(rr_df, 'rr_ms', a, b)
between_vals.append({
'hr': bh.values if len(bh) > 0 else None,
'rr': br.values if len(br) > 0 else None,
'rmssd': rmssd(br) if len(br) >= 2 else None,
})
# Post segment
post_hr = subset(hr_df, 'hr', *segs['post'])
post_rr = subset(rr_df, 'rr_ms', *segs['post'])
post_rmssd = rmssd(post_rr) if len(post_rr) >= 2 else None
# Build summary dict (use medians for HR/RR to be robust) for five named conditions
def safe_median(arr_like) -> float | None:
if arr_like is None:
return None
vals = np.asarray(arr_like, dtype=float)
vals = vals[~np.isnan(vals)]
if vals.size == 0:
return None
return float(np.median(vals))
summary = {
'recording': rec_name,
# HR medians
'hr_breathing1': safe_median(pre_hr.values if len(pre_hr) > 0 else None),
'hr_spring': safe_median(between_vals[0]['hr'] if between_vals and between_vals[0]['hr'] is not None else None),
'hr_summer': safe_median(between_vals[1]['hr'] if len(between_vals) > 1 and between_vals[1]['hr'] is not None else None),
'hr_autumn': safe_median(between_vals[2]['hr'] if len(between_vals) > 2 and between_vals[2]['hr'] is not None else None),
'hr_breathing2': safe_median(post_hr.values if len(post_hr) > 0 else None),
# RR medians
'rr_breathing1': safe_median(pre_rr.values if len(pre_rr) > 0 else None),
'rr_spring': safe_median(between_vals[0]['rr'] if between_vals and between_vals[0]['rr'] is not None else None),
'rr_summer': safe_median(between_vals[1]['rr'] if len(between_vals) > 1 and between_vals[1]['rr'] is not None else None),
'rr_autumn': safe_median(between_vals[2]['rr'] if len(between_vals) > 2 and between_vals[2]['rr'] is not None else None),
'rr_breathing2': safe_median(post_rr.values if len(post_rr) > 0 else None),
# RMSSD ms
'rmssd_breathing1_ms': pre_rmssd if pre_rmssd is not None and not math.isnan(pre_rmssd) else None,
'rmssd_spring_ms': between_vals[0]['rmssd'] if between_vals and between_vals[0]['rmssd'] is not None and not math.isnan(between_vals[0]['rmssd']) else None,
'rmssd_summer_ms': between_vals[1]['rmssd'] if len(between_vals) > 1 and between_vals[1]['rmssd'] is not None and not math.isnan(between_vals[1]['rmssd']) else None,
'rmssd_autumn_ms': between_vals[2]['rmssd'] if len(between_vals) > 2 and between_vals[2]['rmssd'] is not None and not math.isnan(between_vals[2]['rmssd']) else None,
'rmssd_breathing2_ms': post_rmssd if post_rmssd is not None and not math.isnan(post_rmssd) else None,
}
return summary
def boxplot_five_categories(values_df: pd.DataFrame, value_cols: tuple[str, str, str, str, str], title: str, ylabel: str, out_path: Path) -> dict:
"""Create a boxplot comparing five named conditions across recordings.
value_cols is a tuple of (breathing1, spring, summer, autumn, breathing2)
"""
b1, sp, su, au, b2 = value_cols
df = values_df[[b1, sp, su, au, b2]].dropna(how='all')
if df.empty:
return {}
data = [df[b1].dropna().values, df[sp].dropna().values, df[su].dropna().values, df[au].dropna().values, df[b2].dropna().values]
labels = ['Breathing Scene 1', 'Spring Scene', 'Summer Scene', 'Autumn Scene', 'Breathing Scene 2']
fig, ax = plt.subplots(figsize=(10, 5))
ax.boxplot(data, tick_labels=labels, vert=True, patch_artist=True)
# Overlay small dot for the mean per category
means = [float(np.mean(d)) if len(d) > 0 else np.nan for d in data]
ax.scatter(range(1, 6), means, color='black', s=20, zorder=3)
ax.set_title(title)
ax.set_ylabel(ylabel)
ax.grid(True, axis='y', alpha=0.3)
fig.tight_layout()
fig.savefig(out_path, dpi=150)
plt.close(fig)
# Return mapping of label -> mean
return {label: mean for label, mean in zip(labels, means)}
def compute_and_plot_aligned_hr(recordings_root: Path, out_root: Path) -> None:
"""Compute an across-recordings averaged HR curve aligned by the 4 timestamps.
Approach:
- Use recordings with exactly 4 marks and available HR
- Compute median durations for the 5 segments (pre, 3 betweens, post)
- Map each recording piecewise-linearly so marks align to the same normalized positions
based on the median segment-length proportions
- Interpolate each recording to a common grid and average
"""
recs: list[dict] = []
seg_durations: list[tuple[float, float, float, float, float]] = []
for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]):
rec_name = rec_dir.name
hr_csv = rec_dir / 'hr.csv'
ts_csv = rec_dir / 'timestamps.csv'
if not hr_csv.exists() or not ts_csv.exists():
continue
hr_df = read_signal_csv(hr_csv, 'hr')
marks = read_marks(ts_csv)
if hr_df.empty or marks is None or len(marks) != 4:
continue
# Determine bounds
start_ts = hr_df.index.min()
end_ts = hr_df.index.max()
a, b, c, d = list(marks)
# Ensure marks fall within HR bounds
if not (start_ts < a < b < c < d < end_ts):
# Skip if pathological
continue
# Segment durations in seconds
pre_d = (a - start_ts).total_seconds()
d12 = (b - a).total_seconds()
d23 = (c - b).total_seconds()
d34 = (d - c).total_seconds()
post_d = (end_ts - d).total_seconds()
seg_durations.append((pre_d, d12, d23, d34, post_d))
# Store for second pass
recs.append({'name': rec_name, 'hr': hr_df, 'start': start_ts, 'marks': (a, b, c, d), 'end': end_ts})
if not recs:
return
# Median segment durations and normalized cumulative boundaries
med = np.nanmedian(np.array(seg_durations), axis=0)
total = float(np.sum(med))
if total <= 0:
return
proportions = med / total
boundaries = np.concatenate(([0.0], np.cumsum(proportions))) # length 6, last == 1.0
# Common grid (normalized [0,1]) and scaled seconds by median total duration
N = 1000
x_norm = np.linspace(0.0, 1.0, N)
x_sec = x_norm * total
# Helper: piecewise-linear map original timestamp to normalized coordinate
def map_times_to_norm(hr_df: pd.DataFrame, start: pd.Timestamp, marks: tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp], end: pd.Timestamp) -> tuple[np.ndarray, np.ndarray]:
a, b, c, d = marks
segments = [
(start, a, boundaries[0], boundaries[1]),
(a, b, boundaries[1], boundaries[2]),
(b, c, boundaries[2], boundaries[3]),
(c, d, boundaries[3], boundaries[4]),
(d, end, boundaries[4], boundaries[5]),
]
u_list = []
y_list = []
for seg_start, seg_end, u0, u1 in segments:
seg = hr_df.loc[(hr_df.index >= seg_start) & (hr_df.index <= seg_end), 'hr']
if seg.empty:
continue
dur_ns = (seg_end.value - seg_start.value)
if dur_ns <= 0:
continue
rel = (seg.index.view('int64') - np.int64(seg_start.value)) / np.float64(dur_ns)
rel = np.clip(rel.astype(float), 0.0, 1.0)
u = u0 + rel * (u1 - u0)
u_list.append(u)
y_list.append(seg.values.astype(float))
if not u_list:
return np.array([]), np.array([])
u_all = np.concatenate(u_list)
y_all = np.concatenate(y_list)
order = np.argsort(u_all)
u_sorted = u_all[order]
y_sorted = y_all[order]
uniq_u, idx_start = np.unique(u_sorted, return_index=True)
counts = np.diff(np.append(idx_start, len(u_sorted)))
y_avg = []
for i, cnt in enumerate(counts):
start_idx = idx_start[i]
y_avg.append(np.mean(y_sorted[start_idx:start_idx + cnt]))
y_avg = np.asarray(y_avg)
return uniq_u, y_avg
curves = []
for rec in recs:
u, y = map_times_to_norm(rec['hr'], rec['start'], rec['marks'], rec['end'])
if u.size < 2:
continue
y_grid = np.interp(x_norm, u, y, left=np.nan, right=np.nan)
y_grid[x_norm < u.min()] = np.nan
y_grid[x_norm > u.max()] = np.nan
curves.append(y_grid)
if not curves:
return
Y = np.vstack(curves)
n = np.sum(~np.isnan(Y), axis=0)
marr = np.ma.array(Y, mask=np.isnan(Y))
mean = marr.mean(axis=0).filled(np.nan)
std = marr.std(axis=0).filled(np.nan)
# Plot
fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(x_sec, mean, color='tab:blue', label='Mean HR')
ax.fill_between(x_sec, mean - std, mean + std, color='tab:blue', alpha=0.2, label='±1 SD')
for xb in boundaries * total:
ax.axvline(xb, color='k', linestyle='--', linewidth=0.8, alpha=0.5)
ax.set_xlabel('Aligned time (s)')
ax.set_ylabel('HR (bpm)')
ax.set_title('Aligned Average HR Across Recordings (4 marks)')
ax.grid(True, alpha=0.3)
ax.legend(loc='upper right')
fig.tight_layout()
fig.savefig(out_root / 'HR_average_aligned.png', dpi=150)
plt.close(fig)
out_csv = out_root / 'HR_average_aligned.csv'
out_df = pd.DataFrame({
'normalized_time': x_norm,
'aligned_seconds': x_sec,
'hr_mean': mean,
'hr_std': std,
'n': n,
})
out_df.to_csv(out_csv, index=False)
def compute_and_plot_aligned_rr(recordings_root: Path, out_root: Path) -> None:
"""Aligned average RR (ms) across recordings with 4 marks (same method as HR)."""
recs: list[dict] = []
seg_durations: list[tuple[float, float, float, float, float]] = []
for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]):
rr_csv = rec_dir / 'rr.csv'
ts_csv = rec_dir / 'timestamps.csv'
if not rr_csv.exists() or not ts_csv.exists():
continue
rr_df = read_signal_csv(rr_csv, 'rr_ms')
rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_dir.name} (aligned RR)')
marks = read_marks(ts_csv)
if rr_df.empty or marks is None or len(marks) != 4:
continue
start_ts = rr_df.index.min()
end_ts = rr_df.index.max()
a, b, c, d = list(marks)
if not (start_ts < a < b < c < d < end_ts):
continue
pre_d = (a - start_ts).total_seconds()
d12 = (b - a).total_seconds()
d23 = (c - b).total_seconds()
d34 = (d - c).total_seconds()
post_d = (end_ts - d).total_seconds()
seg_durations.append((pre_d, d12, d23, d34, post_d))
recs.append({'df': rr_df, 'start': start_ts, 'marks': (a, b, c, d), 'end': end_ts})
if not recs:
return
med = np.nanmedian(np.array(seg_durations), axis=0)
total = float(np.sum(med))
if total <= 0:
return
boundaries = np.concatenate(([0.0], np.cumsum(med / total)))
N = 1000
x_norm = np.linspace(0.0, 1.0, N)
x_sec = x_norm * total
def map_times_to_norm(df: pd.DataFrame, start: pd.Timestamp, marks: tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp], end: pd.Timestamp) -> tuple[np.ndarray, np.ndarray]:
a, b, c, d = marks
segments = [
(start, a, boundaries[0], boundaries[1]),
(a, b, boundaries[1], boundaries[2]),
(b, c, boundaries[2], boundaries[3]),
(c, d, boundaries[3], boundaries[4]),
(d, end, boundaries[4], boundaries[5]),
]
u_list = []
y_list = []
for seg_start, seg_end, u0, u1 in segments:
seg = df.loc[(df.index >= seg_start) & (df.index <= seg_end), 'rr_ms']
if seg.empty:
continue
dur_ns = (seg_end.value - seg_start.value)
if dur_ns <= 0:
continue
rel = (seg.index.view('int64') - np.int64(seg_start.value)) / np.float64(dur_ns)
rel = np.clip(rel.astype(float), 0.0, 1.0)
u = u0 + rel * (u1 - u0)
u_list.append(u)
y_list.append(seg.values.astype(float))
if not u_list:
return np.array([]), np.array([])
u_all = np.concatenate(u_list)
y_all = np.concatenate(y_list)
order = np.argsort(u_all)
u_sorted = u_all[order]
y_sorted = y_all[order]
uniq_u, idx_start = np.unique(u_sorted, return_index=True)
counts = np.diff(np.append(idx_start, len(u_sorted)))
y_avg = []
for i, cnt in enumerate(counts):
start_idx = idx_start[i]
y_avg.append(np.mean(y_sorted[start_idx:start_idx + cnt]))
y_avg = np.asarray(y_avg)
return uniq_u, y_avg
curves = []
for rec in recs:
u, y = map_times_to_norm(rec['df'], rec['start'], rec['marks'], rec['end'])
if u.size < 2:
continue
y_grid = np.interp(x_norm, u, y, left=np.nan, right=np.nan)
y_grid[x_norm < u.min()] = np.nan
y_grid[x_norm > u.max()] = np.nan
curves.append(y_grid)
if not curves:
return
Y = np.vstack(curves)
n = np.sum(~np.isnan(Y), axis=0)
marr = np.ma.array(Y, mask=np.isnan(Y))
mean = marr.mean(axis=0).filled(np.nan)
std = marr.std(axis=0).filled(np.nan)
fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(x_sec, mean, color='tab:green', label='Mean RR')
ax.fill_between(x_sec, mean - std, mean + std, color='tab:green', alpha=0.2, label='±1 SD')
for xb in boundaries * total:
ax.axvline(xb, color='k', linestyle='--', linewidth=0.8, alpha=0.5)
ax.set_xlabel('Aligned time (s)')
ax.set_ylabel('RR (ms)')
ax.set_title('Aligned Average RR Across Recordings (4 marks)')
ax.grid(True, alpha=0.3)
ax.legend(loc='upper right')
fig.tight_layout()
fig.savefig(out_root / 'RR_average_aligned.png', dpi=150)
plt.close(fig)
out_csv = out_root / 'RR_average_aligned.csv'
pd.DataFrame({'normalized_time': x_norm, 'aligned_seconds': x_sec, 'rr_mean': mean, 'rr_std': std, 'n': n}).to_csv(out_csv, index=False)
def compute_and_plot_aligned_rmssd(recordings_root: Path, out_root: Path, window_seconds: int = 30) -> None:
"""Aligned average rolling RMSSD (ms) across recordings with 4 marks.
RMSSD is computed as a time-based rolling window over RR (default 30s).
"""
def rolling_rmssd(rr: pd.Series) -> pd.Series:
def rmssd_func(x: np.ndarray) -> float:
if x.size < 2:
return np.nan
d = np.diff(x.astype(float))
if d.size == 0:
return np.nan
return float(np.sqrt(np.mean(d * d)))
return rr.rolling(f'{window_seconds}s', min_periods=5).apply(rmssd_func, raw=True)
recs: list[dict] = []
seg_durations: list[tuple[float, float, float, float, float]] = []
for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]):
rr_csv = rec_dir / 'rr.csv'
ts_csv = rec_dir / 'timestamps.csv'
if not rr_csv.exists() or not ts_csv.exists():
continue
rr_df = read_signal_csv(rr_csv, 'rr_ms')
rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_dir.name} (aligned RMSSD)')
marks = read_marks(ts_csv)
if rr_df.empty or marks is None or len(marks) != 4:
continue
# Compute rolling RMSSD series
rmssd_series = rolling_rmssd(rr_df['rr_ms']).dropna()
if rmssd_series.empty:
continue
rmssd_df = rmssd_series.to_frame(name='rmssd')
start_ts = rmssd_df.index.min()
end_ts = rmssd_df.index.max()
a, b, c, d = list(marks)
if not (start_ts < a < b < c < d < end_ts):
continue
pre_d = (a - start_ts).total_seconds()
d12 = (b - a).total_seconds()
d23 = (c - b).total_seconds()
d34 = (d - c).total_seconds()
post_d = (end_ts - d).total_seconds()
seg_durations.append((pre_d, d12, d23, d34, post_d))
recs.append({'df': rmssd_df, 'start': start_ts, 'marks': (a, b, c, d), 'end': end_ts})
if not recs:
return
med = np.nanmedian(np.array(seg_durations), axis=0)
total = float(np.sum(med))
if total <= 0:
return
boundaries = np.concatenate(([0.0], np.cumsum(med / total)))
N = 1000
x_norm = np.linspace(0.0, 1.0, N)
x_sec = x_norm * total
def map_times_to_norm(df: pd.DataFrame, start: pd.Timestamp, marks: tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp], end: pd.Timestamp) -> tuple[np.ndarray, np.ndarray]:
a, b, c, d = marks
segments = [
(start, a, boundaries[0], boundaries[1]),
(a, b, boundaries[1], boundaries[2]),
(b, c, boundaries[2], boundaries[3]),
(c, d, boundaries[3], boundaries[4]),
(d, end, boundaries[4], boundaries[5]),
]
u_list = []
y_list = []
for seg_start, seg_end, u0, u1 in segments:
seg = df.loc[(df.index >= seg_start) & (df.index <= seg_end), 'rmssd']
if seg.empty:
continue
dur_ns = (seg_end.value - seg_start.value)
if dur_ns <= 0:
continue
rel = (seg.index.view('int64') - np.int64(seg_start.value)) / np.float64(dur_ns)
rel = np.clip(rel.astype(float), 0.0, 1.0)
u = u0 + rel * (u1 - u0)
u_list.append(u)
y_list.append(seg.values.astype(float))
if not u_list:
return np.array([]), np.array([])
u_all = np.concatenate(u_list)
y_all = np.concatenate(y_list)
order = np.argsort(u_all)
u_sorted = u_all[order]
y_sorted = y_all[order]
uniq_u, idx_start = np.unique(u_sorted, return_index=True)
counts = np.diff(np.append(idx_start, len(u_sorted)))
y_avg = []
for i, cnt in enumerate(counts):
start_idx = idx_start[i]
y_avg.append(np.mean(y_sorted[start_idx:start_idx + cnt]))
y_avg = np.asarray(y_avg)
return uniq_u, y_avg
curves = []
for rec in recs:
u, y = map_times_to_norm(rec['df'], rec['start'], rec['marks'], rec['end'])
if u.size < 2:
continue
y_grid = np.interp(x_norm, u, y, left=np.nan, right=np.nan)
y_grid[x_norm < u.min()] = np.nan
y_grid[x_norm > u.max()] = np.nan
curves.append(y_grid)
if not curves:
return
Y = np.vstack(curves)
n = np.sum(~np.isnan(Y), axis=0)
marr = np.ma.array(Y, mask=np.isnan(Y))
mean = marr.mean(axis=0).filled(np.nan)
std = marr.std(axis=0).filled(np.nan)
fig, ax = plt.subplots(figsize=(12, 5))
ax.plot(x_sec, mean, color='tab:red', label=f'Mean RMSSD ({window_seconds}s window)')
ax.fill_between(x_sec, mean - std, mean + std, color='tab:red', alpha=0.2, label='±1 SD')
for xb in boundaries * total:
ax.axvline(xb, color='k', linestyle='--', linewidth=0.8, alpha=0.5)
ax.set_xlabel('Aligned time (s)')
ax.set_ylabel('RMSSD (ms)')
ax.set_title('Aligned Average RMSSD Across Recordings (4 marks)')
ax.grid(True, alpha=0.3)
ax.legend(loc='upper right')
fig.tight_layout()
fig.savefig(out_root / 'RMSSD_average_aligned.png', dpi=150)
plt.close(fig)
out_csv = out_root / 'RMSSD_average_aligned.csv'
pd.DataFrame({'normalized_time': x_norm, 'aligned_seconds': x_sec, 'rmssd_mean': mean, 'rmssd_std': std, 'n': n}).to_csv(out_csv, index=False)
def main() -> None:
root = Path(__file__).resolve().parent
recordings_root = root / 'SingleRecordings'
out_root = root / 'Plots' / 'Aggregate'
ensure_dir(out_root)
summaries: list[dict] = []
for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]):
rec_name = rec_dir.name
hr_df = read_signal_csv(rec_dir / 'hr.csv', 'hr')
rr_df = read_signal_csv(rec_dir / 'rr.csv', 'rr_ms')
rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_name} (summary)')
marks = read_marks(rec_dir / 'timestamps.csv')
if marks is None or len(marks) != 4:
# Ignore recordings not having exactly 4 timestamps
continue
s = summarize_segments_for_recording(rec_name, hr_df, rr_df, marks)
if s is not None:
summaries.append(s)
if not summaries:
print('No qualifying recordings (exactly 4 timestamps) found for aggregation.')
return
df = pd.DataFrame(summaries)
df.to_csv(out_root / 'summary_metrics.csv', index=False)
# Generate aggregate boxplots for HR, RR, RMSSD with five categories
hr_means = boxplot_five_categories(
df,
('hr_breathing1', 'hr_spring', 'hr_summer', 'hr_autumn', 'hr_breathing2'),
'HR Median Across Conditions (All Recordings)',
'HR (bpm)',
out_root / 'HR_boxplot_conditions.png',
)
rr_means = boxplot_five_categories(
df,
('rr_breathing1', 'rr_spring', 'rr_summer', 'rr_autumn', 'rr_breathing2'),
'RR Median Across Conditions (All Recordings)',
'RR (ms)',
out_root / 'RR_boxplot_conditions.png',
)
rmssd_means = boxplot_five_categories(
df,
('rmssd_breathing1_ms', 'rmssd_spring_ms', 'rmssd_summer_ms', 'rmssd_autumn_ms', 'rmssd_breathing2_ms'),
'RMSSD Across Conditions (All Recordings)',
'RMSSD (ms)',
out_root / 'RMSSD_boxplot_conditions.png',
)
# Write means to a txt file
means_txt = out_root / 'condition_means.txt'
with means_txt.open('w', encoding='utf-8') as f:
def write_block(name: str, mapping: dict):
if not mapping:
return
f.write(f'{name} means by condition\n')
for label, val in mapping.items():
if val is None or np.isnan(val):
f.write(f' {label}: NA\n')
else:
f.write(f' {label}: {val:.2f}\n')
f.write('\n')
write_block('HR (bpm)', hr_means)
write_block('RR (ms)', rr_means)
write_block('RMSSD (ms)', rmssd_means)
print(f'Aggregation complete. Outputs at: {out_root}')
# Also compute HR average over aligned time across recordings with 4 marks
compute_and_plot_aligned_hr(recordings_root, out_root)
compute_and_plot_aligned_rr(recordings_root, out_root)
compute_and_plot_aligned_rmssd(recordings_root, out_root)
if __name__ == '__main__':
main()