From c332161a5edc9d12a52f628ef1602ffffe9af950 Mon Sep 17 00:00:00 2001 From: "tom.hempel" Date: Wed, 24 Sep 2025 17:35:06 +0200 Subject: [PATCH] added simple outlier detection --- aggregate_segments_analysis.py | 46 +++++++++++++++++++++++++++++----- plot_meditation_data.py | 37 +++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/aggregate_segments_analysis.py b/aggregate_segments_analysis.py index ec8a9b6..2de6f08 100644 --- a/aggregate_segments_analysis.py +++ b/aggregate_segments_analysis.py @@ -23,6 +23,34 @@ def read_signal_csv(csv_path: Path, value_column: str) -> pd.DataFrame: return df +def clean_rr_ms(rr_df: pd.DataFrame, col: str = 'rr_ms', source_name: str | None = None) -> pd.DataFrame: + """RR cleaning with simple thresholding + interpolation and reporting. + + - Coerce to numeric; mark non-finite as NaN (count) + - Mark out-of-range [300, 2000] ms as NaN (count) + - Interpolate in time; ffill/bfill edges + - Print counts + """ + if rr_df is None or rr_df.empty or col not in rr_df.columns: + return rr_df + df = rr_df.copy() + df[col] = pd.to_numeric(df[col], errors='coerce') + nonfinite_mask = ~pd.notna(df[col]) + range_mask = (df[col] < 300) | (df[col] > 2000) + flagged = nonfinite_mask | range_mask + df.loc[flagged, col] = np.nan + + if isinstance(df.index, pd.DatetimeIndex): + df[col] = df[col].interpolate(method='time', limit_direction='both') + else: + df[col] = df[col].interpolate(limit_direction='both') + df[col] = df[col].ffill().bfill() + + if source_name is None: + source_name = 'RR cleaning' + print(f"{source_name} - RR filter: nonfinite={int(nonfinite_mask.sum())}, out_of_range={int(range_mask.sum())}, total_flagged={int(flagged.sum())}") + return df + def read_marks(csv_path: Path) -> pd.Series: if not csv_path.exists(): return pd.Series([], dtype='datetime64[ns]') @@ -288,9 +316,10 @@ def compute_and_plot_aligned_hr(recordings_root: Path, out_root: Path) -> None: return Y = np.vstack(curves) - mean = np.nanmean(Y, axis=0) - std = np.nanstd(Y, axis=0) n = np.sum(~np.isnan(Y), axis=0) + marr = np.ma.array(Y, mask=np.isnan(Y)) + mean = marr.mean(axis=0).filled(np.nan) + std = marr.std(axis=0).filled(np.nan) # Plot fig, ax = plt.subplots(figsize=(12, 5)) @@ -329,6 +358,7 @@ def compute_and_plot_aligned_rr(recordings_root: Path, out_root: Path) -> None: if not rr_csv.exists() or not ts_csv.exists(): continue rr_df = read_signal_csv(rr_csv, 'rr_ms') + rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_dir.name} (aligned RR)') marks = read_marks(ts_csv) if rr_df.empty or marks is None or len(marks) != 4: continue @@ -411,9 +441,10 @@ def compute_and_plot_aligned_rr(recordings_root: Path, out_root: Path) -> None: return Y = np.vstack(curves) - mean = np.nanmean(Y, axis=0) - std = np.nanstd(Y, axis=0) n = np.sum(~np.isnan(Y), axis=0) + marr = np.ma.array(Y, mask=np.isnan(Y)) + mean = marr.mean(axis=0).filled(np.nan) + std = marr.std(axis=0).filled(np.nan) fig, ax = plt.subplots(figsize=(12, 5)) ax.plot(x_sec, mean, color='tab:green', label='Mean RR') @@ -457,6 +488,7 @@ def compute_and_plot_aligned_rmssd(recordings_root: Path, out_root: Path, window if not rr_csv.exists() or not ts_csv.exists(): continue rr_df = read_signal_csv(rr_csv, 'rr_ms') + rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_dir.name} (aligned RMSSD)') marks = read_marks(ts_csv) if rr_df.empty or marks is None or len(marks) != 4: continue @@ -545,9 +577,10 @@ def compute_and_plot_aligned_rmssd(recordings_root: Path, out_root: Path, window return Y = np.vstack(curves) - mean = np.nanmean(Y, axis=0) - std = np.nanstd(Y, axis=0) n = np.sum(~np.isnan(Y), axis=0) + marr = np.ma.array(Y, mask=np.isnan(Y)) + mean = marr.mean(axis=0).filled(np.nan) + std = marr.std(axis=0).filled(np.nan) fig, ax = plt.subplots(figsize=(12, 5)) ax.plot(x_sec, mean, color='tab:red', label=f'Mean RMSSD ({window_seconds}s window)') @@ -578,6 +611,7 @@ def main() -> None: rec_name = rec_dir.name hr_df = read_signal_csv(rec_dir / 'hr.csv', 'hr') rr_df = read_signal_csv(rec_dir / 'rr.csv', 'rr_ms') + rr_df = clean_rr_ms(rr_df, 'rr_ms', source_name=f'{rec_name} (summary)') marks = read_marks(rec_dir / 'timestamps.csv') if marks is None or len(marks) != 4: diff --git a/plot_meditation_data.py b/plot_meditation_data.py index f713e7d..9a107cc 100644 --- a/plot_meditation_data.py +++ b/plot_meditation_data.py @@ -77,6 +77,42 @@ def read_marks(csv_path: Path) -> pd.Series: return ts +def clean_rr_ms(rr_df: pd.DataFrame, col: str = 'rr_ms', source_name: str | None = None) -> pd.DataFrame: + """Basic NN editing for RR in ms with interpolation and reporting. + + Steps: + - Coerce to numeric and mark non-finite as NaN (count) + - Mark out-of-range [300, 2000] ms as NaN (count) + - Mark robust outliers via 15s rolling median/MAD (z > 3.5) as NaN (count) + - Time-based interpolation to fill flagged values (then ffill/bfill) + - Print counts summary + """ + if rr_df is None or rr_df.empty or col not in rr_df.columns: + return rr_df + df = rr_df.copy() + df[col] = pd.to_numeric(df[col], errors='coerce') + # Track flags (only threshold filtering per request) + nonfinite_mask = ~pd.notna(df[col]) + range_mask = (df[col] < 300) | (df[col] > 2000) + # Combine flags: non-finite or out-of-range + flagged = nonfinite_mask | range_mask + # Set flagged to NaN for interpolation + df.loc[flagged, col] = np.nan + + # Interpolate in time, then ffill/bfill for edges + if isinstance(df.index, pd.DatetimeIndex): + df[col] = df[col].interpolate(method='time', limit_direction='both') + else: + df[col] = df[col].interpolate(limit_direction='both') + df[col] = df[col].ffill().bfill() + + # Reporting + if source_name is None: + source_name = 'RR cleaning' + print(f"{source_name} - RR filter: nonfinite={int(nonfinite_mask.sum())}, out_of_range={int(range_mask.sum())}, total_flagged={int(flagged.sum())}") + return df + + def segment_bounds_from_marks(marks: pd.Series, start_ts: pd.Timestamp, end_ts: pd.Timestamp) -> list[tuple[pd.Timestamp, pd.Timestamp]]: """Create segments between consecutive marks, plus the final segment from last mark to end. @@ -242,6 +278,7 @@ def process_recording(rec_dir: Path, plots_root: Path) -> None: hr_df = read_signal_csv(hr_csv, 'hr') rr_df = read_signal_csv(rr_csv, 'rr_ms') + rr_df = clean_rr_ms(rr_df, 'rr_ms') marks = read_marks(ts_csv) if hr_df.empty and rr_df.empty: