Files
Bachelor-Arbeit-Sophia-Habe…/plot_meditation_data.py
2025-09-24 17:35:06 +02:00

366 lines
13 KiB
Python

import os
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def read_signal_csv(csv_path: Path, value_column: str) -> pd.DataFrame:
"""Read a CSV with columns ['timestamp', value_column] into a DataFrame with datetime index.
Returns empty DataFrame if file does not exist.
"""
if not csv_path.exists():
return pd.DataFrame(columns=[value_column])
df = pd.read_csv(csv_path)
if 'timestamp' not in df.columns or value_column not in df.columns:
return pd.DataFrame(columns=[value_column])
# Convert timestamp (ms) to datetime and set as index
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce')
df = df.dropna(subset=['timestamp'])
df = df.set_index('timestamp').sort_index()
# Coerce values to numeric
df[value_column] = pd.to_numeric(df[value_column], errors='coerce')
df = df.dropna(subset=[value_column])
return df
def compute_time_based_moving_averages(df: pd.DataFrame, value_col: str) -> pd.DataFrame:
"""Add 5s, 10s, 30s, 60s time-based rolling means for an irregularly sampled series.
Requires a DatetimeIndex.
"""
if df.empty:
return df
# Ensure index is datetime for time-based rolling windows
if not isinstance(df.index, pd.DatetimeIndex):
raise ValueError('DataFrame index must be a DatetimeIndex for time-based rolling windows')
df = df.copy()
for seconds in (5, 10, 30, 60):
window = f'{seconds}s'
df[f'{value_col}_ma_{seconds}s'] = (
df[value_col].rolling(window=window, min_periods=1).mean()
)
return df
def add_seconds_from_start(df: pd.DataFrame, start_ts: pd.Timestamp) -> pd.DataFrame:
if df.empty:
return df
df = df.copy()
df['seconds'] = (df.index - start_ts).total_seconds()
return df
def read_marks(csv_path: Path) -> pd.Series:
"""Read marked timestamps as a pandas Series of pd.Timestamp, sorted ascending.
Returns empty Series if file missing or no valid rows.
"""
if not csv_path.exists():
return pd.Series([], dtype='datetime64[ns]')
df = pd.read_csv(csv_path)
if 'timestamp' not in df.columns:
return pd.Series([], dtype='datetime64[ns]')
# Convert to datetime
ts = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce').dropna().sort_values()
return ts
def clean_rr_ms(rr_df: pd.DataFrame, col: str = 'rr_ms', source_name: str | None = None) -> pd.DataFrame:
"""Basic NN editing for RR in ms with interpolation and reporting.
Steps:
- Coerce to numeric and mark non-finite as NaN (count)
- Mark out-of-range [300, 2000] ms as NaN (count)
- Mark robust outliers via 15s rolling median/MAD (z > 3.5) as NaN (count)
- Time-based interpolation to fill flagged values (then ffill/bfill)
- Print counts summary
"""
if rr_df is None or rr_df.empty or col not in rr_df.columns:
return rr_df
df = rr_df.copy()
df[col] = pd.to_numeric(df[col], errors='coerce')
# Track flags (only threshold filtering per request)
nonfinite_mask = ~pd.notna(df[col])
range_mask = (df[col] < 300) | (df[col] > 2000)
# Combine flags: non-finite or out-of-range
flagged = nonfinite_mask | range_mask
# Set flagged to NaN for interpolation
df.loc[flagged, col] = np.nan
# Interpolate in time, then ffill/bfill for edges
if isinstance(df.index, pd.DatetimeIndex):
df[col] = df[col].interpolate(method='time', limit_direction='both')
else:
df[col] = df[col].interpolate(limit_direction='both')
df[col] = df[col].ffill().bfill()
# Reporting
if source_name is None:
source_name = 'RR cleaning'
print(f"{source_name} - RR filter: nonfinite={int(nonfinite_mask.sum())}, out_of_range={int(range_mask.sum())}, total_flagged={int(flagged.sum())}")
return df
def segment_bounds_from_marks(marks: pd.Series, start_ts: pd.Timestamp, end_ts: pd.Timestamp) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
"""Create segments between consecutive marks, plus the final segment from last mark to end.
Does NOT include the pre-first mark segment.
If there are no marks, returns a single segment from start to end.
"""
segments: list[tuple[pd.Timestamp, pd.Timestamp]] = []
if marks is None or len(marks) == 0:
return [(start_ts, end_ts)]
# Between consecutive marks
marks_list = list(marks)
for i in range(len(marks_list) - 1):
a = marks_list[i]
b = marks_list[i + 1]
if a < b and a < end_ts:
segments.append((max(a, start_ts), min(b, end_ts)))
# From last mark to end
last = marks_list[-1]
if last < end_ts:
segments.append((max(last, start_ts), end_ts))
return segments
def plot_timeseries_and_mas(out_dir: Path, rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks_sec: list[float]) -> None:
ensure_dir(out_dir)
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
# HR plot
ax = axes[0]
if not hr_df.empty:
ax.plot(hr_df['seconds'], hr_df['hr'], label='HR', color='tab:blue', linewidth=0.8, alpha=0.7)
for seconds, color in zip((5, 10, 30, 60), ('tab:orange', 'tab:green', 'tab:red', 'tab:purple')):
col = f'hr_ma_{seconds}s'
if col in hr_df.columns:
ax.plot(hr_df['seconds'], hr_df[col], label=f'HR MA {seconds}s', color=color, linewidth=1.5)
ax.set_ylabel('HR (bpm)')
ax.grid(True, alpha=0.3)
# RR plot
ax = axes[1]
if not rr_df.empty:
ax.plot(rr_df['seconds'], rr_df['rr_ms'], label='RR (ms)', color='tab:blue', linewidth=0.8, alpha=0.7)
for seconds, color in zip((5, 10, 30, 60), ('tab:orange', 'tab:green', 'tab:red', 'tab:purple')):
col = f'rr_ms_ma_{seconds}s'
if col in rr_df.columns:
ax.plot(rr_df['seconds'], rr_df[col], label=f'RR MA {seconds}s', color=color, linewidth=1.5)
ax.set_ylabel('RR (ms)')
ax.grid(True, alpha=0.3)
# Vertical lines for marks on both subplots
for ax in axes:
for s in marks_sec:
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
ax.legend(loc='upper right', ncol=2, fontsize=8)
axes[-1].set_xlabel('Time (s)')
fig.suptitle(f'{rec_name} - HR and RR with Moving Averages')
fig.tight_layout(rect=(0, 0, 1, 0.97))
out_path = out_dir / f'{rec_name}_timeseries.png'
fig.savefig(out_path, dpi=150)
plt.close(fig)
def plot_separate_moving_averages(out_dir: Path, rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks_sec: list[float]) -> None:
"""Save separate plots for each moving average window (5s, 10s, 30s, 60s)
into a dedicated subdirectory per recording.
Each figure contains two subplots (HR MA and RR MA) sharing the x-axis in seconds.
"""
if (hr_df is None or hr_df.empty) and (rr_df is None or rr_df.empty):
return
ma_dir = out_dir / 'MovingAverages'
ensure_dir(ma_dir)
windows = (5, 10, 30, 60)
for seconds in windows:
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
# HR MA subplot
ax = axes[0]
plotted_any = False
if hr_df is not None and not hr_df.empty:
col = f'hr_ma_{seconds}s'
if col in hr_df.columns:
ax.plot(hr_df['seconds'], hr_df[col], label=f'HR MA {seconds}s', color='tab:orange', linewidth=1.5)
plotted_any = True
if plotted_any:
for s in marks_sec:
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
ax.set_ylabel('HR (bpm)')
ax.grid(True, alpha=0.3)
ax.legend(loc='upper right', fontsize=9)
# RR MA subplot
ax = axes[1]
plotted_any = False
if rr_df is not None and not rr_df.empty:
col = f'rr_ms_ma_{seconds}s'
if col in rr_df.columns:
ax.plot(rr_df['seconds'], rr_df[col], label=f'RR MA {seconds}s', color='tab:green', linewidth=1.5)
plotted_any = True
if plotted_any:
for s in marks_sec:
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
ax.set_ylabel('RR (ms)')
ax.grid(True, alpha=0.3)
ax.legend(loc='upper right', fontsize=9)
axes[-1].set_xlabel('Time (s)')
fig.suptitle(f'{rec_name} - Moving Average {seconds}s')
fig.tight_layout(rect=(0, 0, 1, 0.96))
out_path = ma_dir / f'{rec_name}_ma_{seconds}s.png'
fig.savefig(out_path, dpi=150)
plt.close(fig)
def plot_segment_boxplots(out_dir: Path, rec_name: str, signal_df: pd.DataFrame, value_col: str, segments: list[tuple[pd.Timestamp, pd.Timestamp]], label_prefix: str) -> None:
"""Create boxplots of values within each [start, end) segment.
If no non-empty segments, skips plotting.
"""
if signal_df.empty or not segments:
return
# Collect data per segment
data = []
labels = []
start_ts = signal_df.index.min()
for (a, b) in segments:
seg = signal_df.loc[(signal_df.index >= a) & (signal_df.index < b), value_col]
if seg.empty:
continue
data.append(seg.values)
labels.append(f'{label_prefix} {int((a - start_ts).total_seconds())}-{int((b - start_ts).total_seconds())}s')
if not data:
return
fig, ax = plt.subplots(figsize=(max(8, len(data) * 1.2), 5))
ax.boxplot(data, tick_labels=labels, vert=True, patch_artist=True)
ax.set_ylabel(value_col)
ax.set_title(f'{rec_name} - {value_col} Boxplots by Marked Segments')
ax.grid(True, axis='y', alpha=0.3)
fig.tight_layout()
out_path = out_dir / f'{rec_name}_{value_col}_boxplots.png'
fig.savefig(out_path, dpi=150)
plt.close(fig)
def process_recording(rec_dir: Path, plots_root: Path) -> None:
rec_name = rec_dir.name
hr_csv = rec_dir / 'hr.csv'
rr_csv = rec_dir / 'rr.csv'
ts_csv = rec_dir / 'timestamps.csv'
hr_df = read_signal_csv(hr_csv, 'hr')
rr_df = read_signal_csv(rr_csv, 'rr_ms')
rr_df = clean_rr_ms(rr_df, 'rr_ms')
marks = read_marks(ts_csv)
if hr_df.empty and rr_df.empty:
return
# Establish global start and end for the recording (across HR and RR)
candidates = []
if not hr_df.empty:
candidates.append(hr_df.index.min())
if not rr_df.empty:
candidates.append(rr_df.index.min())
start_ts = min(candidates)
end_candidates = []
if not hr_df.empty:
end_candidates.append(hr_df.index.max())
if not rr_df.empty:
end_candidates.append(rr_df.index.max())
end_ts = max(end_candidates)
# Compute moving averages
if not hr_df.empty:
hr_df = compute_time_based_moving_averages(hr_df, 'hr')
hr_df = add_seconds_from_start(hr_df, start_ts)
if not rr_df.empty:
rr_df = compute_time_based_moving_averages(rr_df, 'rr_ms')
rr_df = add_seconds_from_start(rr_df, start_ts)
# Marks in seconds relative to start
marks_sec = []
if marks is not None and len(marks) > 0:
marks_sec = [max(0.0, (t - start_ts).total_seconds()) for t in marks]
# Output directory for this recording
out_dir = plots_root / rec_name
ensure_dir(out_dir)
# Time series plots
plot_timeseries_and_mas(out_dir, rec_name, hr_df, rr_df, marks_sec)
# Separate moving average plots per window
plot_separate_moving_averages(out_dir, rec_name, hr_df, rr_df, marks_sec)
# Segments and boxplots (by instruction: between marked timestamps, plus last->end)
segments = segment_bounds_from_marks(marks, start_ts, end_ts)
if not hr_df.empty:
plot_segment_boxplots(out_dir, rec_name, hr_df, 'hr', segments, label_prefix='t')
if not rr_df.empty:
plot_segment_boxplots(out_dir, rec_name, rr_df, 'rr_ms', segments, label_prefix='t')
def main() -> None:
root = Path(__file__).resolve().parent
recordings_root = root / 'SingleRecordings'
plots_root = root / 'Plots' / 'SingleRecordings'
ensure_dir(plots_root)
if not recordings_root.exists():
print(f'No SingleRecordings directory found at: {recordings_root}')
return
# Iterate through subdirectories that look like recordings
rec_dirs = sorted([p for p in recordings_root.iterdir() if p.is_dir()])
if not rec_dirs:
print('No recording subdirectories found in SingleRecordings/')
return
for rec_dir in rec_dirs:
# Only process if at least one of the expected CSVs exists
if any((rec_dir / f).exists() for f in ('hr.csv', 'rr.csv')):
print(f'Processing {rec_dir.name}...')
try:
process_recording(rec_dir, plots_root)
except Exception as e:
print(f'Failed to process {rec_dir.name}: {e}')
print(f'All done. Plots saved under: {plots_root}')
if __name__ == '__main__':
main()