initial upload
This commit is contained in:
328
plot_meditation_data.py
Normal file
328
plot_meditation_data.py
Normal file
@ -0,0 +1,328 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
|
||||
def ensure_dir(path: Path) -> None:
|
||||
path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
def read_signal_csv(csv_path: Path, value_column: str) -> pd.DataFrame:
|
||||
"""Read a CSV with columns ['timestamp', value_column] into a DataFrame with datetime index.
|
||||
|
||||
Returns empty DataFrame if file does not exist.
|
||||
"""
|
||||
if not csv_path.exists():
|
||||
return pd.DataFrame(columns=[value_column])
|
||||
|
||||
df = pd.read_csv(csv_path)
|
||||
if 'timestamp' not in df.columns or value_column not in df.columns:
|
||||
return pd.DataFrame(columns=[value_column])
|
||||
|
||||
# Convert timestamp (ms) to datetime and set as index
|
||||
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce')
|
||||
df = df.dropna(subset=['timestamp'])
|
||||
df = df.set_index('timestamp').sort_index()
|
||||
# Coerce values to numeric
|
||||
df[value_column] = pd.to_numeric(df[value_column], errors='coerce')
|
||||
df = df.dropna(subset=[value_column])
|
||||
return df
|
||||
|
||||
|
||||
def compute_time_based_moving_averages(df: pd.DataFrame, value_col: str) -> pd.DataFrame:
|
||||
"""Add 5s, 10s, 30s, 60s time-based rolling means for an irregularly sampled series.
|
||||
|
||||
Requires a DatetimeIndex.
|
||||
"""
|
||||
if df.empty:
|
||||
return df
|
||||
|
||||
# Ensure index is datetime for time-based rolling windows
|
||||
if not isinstance(df.index, pd.DatetimeIndex):
|
||||
raise ValueError('DataFrame index must be a DatetimeIndex for time-based rolling windows')
|
||||
|
||||
df = df.copy()
|
||||
for seconds in (5, 10, 30, 60):
|
||||
window = f'{seconds}s'
|
||||
df[f'{value_col}_ma_{seconds}s'] = (
|
||||
df[value_col].rolling(window=window, min_periods=1).mean()
|
||||
)
|
||||
return df
|
||||
|
||||
|
||||
def add_seconds_from_start(df: pd.DataFrame, start_ts: pd.Timestamp) -> pd.DataFrame:
|
||||
if df.empty:
|
||||
return df
|
||||
df = df.copy()
|
||||
df['seconds'] = (df.index - start_ts).total_seconds()
|
||||
return df
|
||||
|
||||
|
||||
def read_marks(csv_path: Path) -> pd.Series:
|
||||
"""Read marked timestamps as a pandas Series of pd.Timestamp, sorted ascending.
|
||||
|
||||
Returns empty Series if file missing or no valid rows.
|
||||
"""
|
||||
if not csv_path.exists():
|
||||
return pd.Series([], dtype='datetime64[ns]')
|
||||
|
||||
df = pd.read_csv(csv_path)
|
||||
if 'timestamp' not in df.columns:
|
||||
return pd.Series([], dtype='datetime64[ns]')
|
||||
# Convert to datetime
|
||||
ts = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce').dropna().sort_values()
|
||||
return ts
|
||||
|
||||
|
||||
def segment_bounds_from_marks(marks: pd.Series, start_ts: pd.Timestamp, end_ts: pd.Timestamp) -> list[tuple[pd.Timestamp, pd.Timestamp]]:
|
||||
"""Create segments between consecutive marks, plus the final segment from last mark to end.
|
||||
|
||||
Does NOT include the pre-first mark segment.
|
||||
If there are no marks, returns a single segment from start to end.
|
||||
"""
|
||||
segments: list[tuple[pd.Timestamp, pd.Timestamp]] = []
|
||||
if marks is None or len(marks) == 0:
|
||||
return [(start_ts, end_ts)]
|
||||
|
||||
# Between consecutive marks
|
||||
marks_list = list(marks)
|
||||
for i in range(len(marks_list) - 1):
|
||||
a = marks_list[i]
|
||||
b = marks_list[i + 1]
|
||||
if a < b and a < end_ts:
|
||||
segments.append((max(a, start_ts), min(b, end_ts)))
|
||||
|
||||
# From last mark to end
|
||||
last = marks_list[-1]
|
||||
if last < end_ts:
|
||||
segments.append((max(last, start_ts), end_ts))
|
||||
return segments
|
||||
|
||||
|
||||
def plot_timeseries_and_mas(out_dir: Path, rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks_sec: list[float]) -> None:
|
||||
ensure_dir(out_dir)
|
||||
|
||||
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
|
||||
|
||||
# HR plot
|
||||
ax = axes[0]
|
||||
if not hr_df.empty:
|
||||
ax.plot(hr_df['seconds'], hr_df['hr'], label='HR', color='tab:blue', linewidth=0.8, alpha=0.7)
|
||||
for seconds, color in zip((5, 10, 30, 60), ('tab:orange', 'tab:green', 'tab:red', 'tab:purple')):
|
||||
col = f'hr_ma_{seconds}s'
|
||||
if col in hr_df.columns:
|
||||
ax.plot(hr_df['seconds'], hr_df[col], label=f'HR MA {seconds}s', color=color, linewidth=1.5)
|
||||
ax.set_ylabel('HR (bpm)')
|
||||
ax.grid(True, alpha=0.3)
|
||||
|
||||
# RR plot
|
||||
ax = axes[1]
|
||||
if not rr_df.empty:
|
||||
ax.plot(rr_df['seconds'], rr_df['rr_ms'], label='RR (ms)', color='tab:blue', linewidth=0.8, alpha=0.7)
|
||||
for seconds, color in zip((5, 10, 30, 60), ('tab:orange', 'tab:green', 'tab:red', 'tab:purple')):
|
||||
col = f'rr_ms_ma_{seconds}s'
|
||||
if col in rr_df.columns:
|
||||
ax.plot(rr_df['seconds'], rr_df[col], label=f'RR MA {seconds}s', color=color, linewidth=1.5)
|
||||
ax.set_ylabel('RR (ms)')
|
||||
ax.grid(True, alpha=0.3)
|
||||
|
||||
# Vertical lines for marks on both subplots
|
||||
for ax in axes:
|
||||
for s in marks_sec:
|
||||
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
|
||||
ax.legend(loc='upper right', ncol=2, fontsize=8)
|
||||
|
||||
axes[-1].set_xlabel('Time (s)')
|
||||
fig.suptitle(f'{rec_name} - HR and RR with Moving Averages')
|
||||
fig.tight_layout(rect=(0, 0, 1, 0.97))
|
||||
|
||||
out_path = out_dir / f'{rec_name}_timeseries.png'
|
||||
fig.savefig(out_path, dpi=150)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def plot_separate_moving_averages(out_dir: Path, rec_name: str, hr_df: pd.DataFrame, rr_df: pd.DataFrame, marks_sec: list[float]) -> None:
|
||||
"""Save separate plots for each moving average window (5s, 10s, 30s, 60s)
|
||||
into a dedicated subdirectory per recording.
|
||||
|
||||
Each figure contains two subplots (HR MA and RR MA) sharing the x-axis in seconds.
|
||||
"""
|
||||
if (hr_df is None or hr_df.empty) and (rr_df is None or rr_df.empty):
|
||||
return
|
||||
|
||||
ma_dir = out_dir / 'MovingAverages'
|
||||
ensure_dir(ma_dir)
|
||||
|
||||
windows = (5, 10, 30, 60)
|
||||
for seconds in windows:
|
||||
fig, axes = plt.subplots(2, 1, figsize=(14, 8), sharex=True)
|
||||
|
||||
# HR MA subplot
|
||||
ax = axes[0]
|
||||
plotted_any = False
|
||||
if hr_df is not None and not hr_df.empty:
|
||||
col = f'hr_ma_{seconds}s'
|
||||
if col in hr_df.columns:
|
||||
ax.plot(hr_df['seconds'], hr_df[col], label=f'HR MA {seconds}s', color='tab:orange', linewidth=1.5)
|
||||
plotted_any = True
|
||||
if plotted_any:
|
||||
for s in marks_sec:
|
||||
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
|
||||
ax.set_ylabel('HR (bpm)')
|
||||
ax.grid(True, alpha=0.3)
|
||||
ax.legend(loc='upper right', fontsize=9)
|
||||
|
||||
# RR MA subplot
|
||||
ax = axes[1]
|
||||
plotted_any = False
|
||||
if rr_df is not None and not rr_df.empty:
|
||||
col = f'rr_ms_ma_{seconds}s'
|
||||
if col in rr_df.columns:
|
||||
ax.plot(rr_df['seconds'], rr_df[col], label=f'RR MA {seconds}s', color='tab:green', linewidth=1.5)
|
||||
plotted_any = True
|
||||
if plotted_any:
|
||||
for s in marks_sec:
|
||||
ax.axvline(s, color='k', linestyle='--', linewidth=0.8, alpha=0.7)
|
||||
ax.set_ylabel('RR (ms)')
|
||||
ax.grid(True, alpha=0.3)
|
||||
ax.legend(loc='upper right', fontsize=9)
|
||||
|
||||
axes[-1].set_xlabel('Time (s)')
|
||||
fig.suptitle(f'{rec_name} - Moving Average {seconds}s')
|
||||
fig.tight_layout(rect=(0, 0, 1, 0.96))
|
||||
|
||||
out_path = ma_dir / f'{rec_name}_ma_{seconds}s.png'
|
||||
fig.savefig(out_path, dpi=150)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def plot_segment_boxplots(out_dir: Path, rec_name: str, signal_df: pd.DataFrame, value_col: str, segments: list[tuple[pd.Timestamp, pd.Timestamp]], label_prefix: str) -> None:
|
||||
"""Create boxplots of values within each [start, end) segment.
|
||||
|
||||
If no non-empty segments, skips plotting.
|
||||
"""
|
||||
if signal_df.empty or not segments:
|
||||
return
|
||||
|
||||
# Collect data per segment
|
||||
data = []
|
||||
labels = []
|
||||
start_ts = signal_df.index.min()
|
||||
for (a, b) in segments:
|
||||
seg = signal_df.loc[(signal_df.index >= a) & (signal_df.index < b), value_col]
|
||||
if seg.empty:
|
||||
continue
|
||||
data.append(seg.values)
|
||||
labels.append(f'{label_prefix} {int((a - start_ts).total_seconds())}-{int((b - start_ts).total_seconds())}s')
|
||||
|
||||
if not data:
|
||||
return
|
||||
|
||||
fig, ax = plt.subplots(figsize=(max(8, len(data) * 1.2), 5))
|
||||
ax.boxplot(data, tick_labels=labels, vert=True, patch_artist=True)
|
||||
ax.set_ylabel(value_col)
|
||||
ax.set_title(f'{rec_name} - {value_col} Boxplots by Marked Segments')
|
||||
ax.grid(True, axis='y', alpha=0.3)
|
||||
fig.tight_layout()
|
||||
|
||||
out_path = out_dir / f'{rec_name}_{value_col}_boxplots.png'
|
||||
fig.savefig(out_path, dpi=150)
|
||||
plt.close(fig)
|
||||
|
||||
|
||||
def process_recording(rec_dir: Path, plots_root: Path) -> None:
|
||||
rec_name = rec_dir.name
|
||||
|
||||
hr_csv = rec_dir / 'hr.csv'
|
||||
rr_csv = rec_dir / 'rr.csv'
|
||||
ts_csv = rec_dir / 'timestamps.csv'
|
||||
|
||||
hr_df = read_signal_csv(hr_csv, 'hr')
|
||||
rr_df = read_signal_csv(rr_csv, 'rr_ms')
|
||||
marks = read_marks(ts_csv)
|
||||
|
||||
if hr_df.empty and rr_df.empty:
|
||||
return
|
||||
|
||||
# Establish global start and end for the recording (across HR and RR)
|
||||
candidates = []
|
||||
if not hr_df.empty:
|
||||
candidates.append(hr_df.index.min())
|
||||
if not rr_df.empty:
|
||||
candidates.append(rr_df.index.min())
|
||||
start_ts = min(candidates)
|
||||
|
||||
end_candidates = []
|
||||
if not hr_df.empty:
|
||||
end_candidates.append(hr_df.index.max())
|
||||
if not rr_df.empty:
|
||||
end_candidates.append(rr_df.index.max())
|
||||
end_ts = max(end_candidates)
|
||||
|
||||
# Compute moving averages
|
||||
if not hr_df.empty:
|
||||
hr_df = compute_time_based_moving_averages(hr_df, 'hr')
|
||||
hr_df = add_seconds_from_start(hr_df, start_ts)
|
||||
if not rr_df.empty:
|
||||
rr_df = compute_time_based_moving_averages(rr_df, 'rr_ms')
|
||||
rr_df = add_seconds_from_start(rr_df, start_ts)
|
||||
|
||||
# Marks in seconds relative to start
|
||||
marks_sec = []
|
||||
if marks is not None and len(marks) > 0:
|
||||
marks_sec = [max(0.0, (t - start_ts).total_seconds()) for t in marks]
|
||||
|
||||
# Output directory for this recording
|
||||
out_dir = plots_root / rec_name
|
||||
ensure_dir(out_dir)
|
||||
|
||||
# Time series plots
|
||||
plot_timeseries_and_mas(out_dir, rec_name, hr_df, rr_df, marks_sec)
|
||||
|
||||
# Separate moving average plots per window
|
||||
plot_separate_moving_averages(out_dir, rec_name, hr_df, rr_df, marks_sec)
|
||||
|
||||
# Segments and boxplots (by instruction: between marked timestamps, plus last->end)
|
||||
segments = segment_bounds_from_marks(marks, start_ts, end_ts)
|
||||
|
||||
if not hr_df.empty:
|
||||
plot_segment_boxplots(out_dir, rec_name, hr_df, 'hr', segments, label_prefix='t')
|
||||
if not rr_df.empty:
|
||||
plot_segment_boxplots(out_dir, rec_name, rr_df, 'rr_ms', segments, label_prefix='t')
|
||||
|
||||
|
||||
def main() -> None:
|
||||
root = Path(__file__).resolve().parent
|
||||
recordings_root = root / 'SingleRecordings'
|
||||
plots_root = root / 'Plots' / 'SingleRecordings'
|
||||
ensure_dir(plots_root)
|
||||
|
||||
if not recordings_root.exists():
|
||||
print(f'No SingleRecordings directory found at: {recordings_root}')
|
||||
return
|
||||
|
||||
# Iterate through subdirectories that look like recordings
|
||||
rec_dirs = sorted([p for p in recordings_root.iterdir() if p.is_dir()])
|
||||
if not rec_dirs:
|
||||
print('No recording subdirectories found in SingleRecordings/')
|
||||
return
|
||||
|
||||
for rec_dir in rec_dirs:
|
||||
# Only process if at least one of the expected CSVs exists
|
||||
if any((rec_dir / f).exists() for f in ('hr.csv', 'rr.csv')):
|
||||
print(f'Processing {rec_dir.name}...')
|
||||
try:
|
||||
process_recording(rec_dir, plots_root)
|
||||
except Exception as e:
|
||||
print(f'Failed to process {rec_dir.name}: {e}')
|
||||
|
||||
print(f'All done. Plots saved under: {plots_root}')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user