calculation for convergence with average HR

This commit is contained in:
tom.hempel
2025-09-25 15:06:30 +02:00
parent be64562da1
commit 3c6ca84c6e
3 changed files with 1326 additions and 0 deletions

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 239 KiB

325
combined_hr_analysis.py Normal file
View File

@ -0,0 +1,325 @@
#!/usr/bin/env python3
"""
Combined script that merges aggregate_segments_analysis.py and hr_convergence_analysis.py
functionality to compute HR alignment across recordings and mark the first convergence point.
"""
import math
from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def read_signal_csv(csv_path: Path, value_column: str) -> pd.DataFrame:
if not csv_path.exists():
return pd.DataFrame(columns=[value_column])
df = pd.read_csv(csv_path)
if 'timestamp' not in df.columns or value_column not in df.columns:
return pd.DataFrame(columns=[value_column])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce')
df = df.dropna(subset=['timestamp']).set_index('timestamp').sort_index()
df[value_column] = pd.to_numeric(df[value_column], errors='coerce')
df = df.dropna(subset=[value_column])
return df
def read_marks(csv_path: Path) -> pd.Series:
if not csv_path.exists():
return pd.Series([], dtype='datetime64[ns]')
df = pd.read_csv(csv_path)
if 'timestamp' not in df.columns:
return pd.Series([], dtype='datetime64[ns]')
ts = pd.to_datetime(df['timestamp'], unit='ms', errors='coerce').dropna().sort_values()
return ts
def find_hr_convergence(df: pd.DataFrame) -> dict:
"""
Find when the HR mean reaches the overall average HR.
Args:
df: DataFrame with columns ['hr_mean', 'aligned_seconds', 'n']
Returns:
dict: Analysis results including convergence time and statistics
"""
# Remove rows where hr_mean is NaN (if any)
df_clean = df.dropna(subset=['hr_mean'])
if df_clean.empty:
return None
# Calculate overall average HR across all time points
overall_avg_hr = df_clean['hr_mean'].mean()
# Calculate weighted average (considering sample sizes at each time point)
if 'n' in df_clean.columns:
weighted_avg_hr = np.average(df_clean['hr_mean'], weights=df_clean['n'])
target_hr = weighted_avg_hr
else:
target_hr = overall_avg_hr
# Find when HR mean first reaches the overall average
hr_values = df_clean['hr_mean'].values
time_values = df_clean['aligned_seconds'].values
# Find crossings of the target HR
crossings = []
for i in range(1, len(hr_values)):
prev_hr = hr_values[i-1]
curr_hr = hr_values[i]
# Check if we crossed the target (either from above or below)
if (prev_hr <= target_hr <= curr_hr) or (prev_hr >= target_hr >= curr_hr):
# Linear interpolation to find exact crossing time
if curr_hr != prev_hr: # Avoid division by zero
t_prev = time_values[i-1]
t_curr = time_values[i]
# Interpolate to find exact crossing time
t_cross = t_prev + (target_hr - prev_hr) * (t_curr - t_prev) / (curr_hr - prev_hr)
crossings.append({
'time': t_cross,
'direction': 'up' if curr_hr > prev_hr else 'down',
'hr_before': prev_hr,
'hr_after': curr_hr
})
return {
'target_hr': target_hr,
'overall_avg_hr': overall_avg_hr,
'weighted_avg_hr': weighted_avg_hr if 'n' in df_clean.columns else None,
'crossings': crossings,
'total_duration': time_values[-1] if len(time_values) > 0 else 0,
'data_points': len(df_clean)
}
def compute_and_plot_aligned_hr_with_convergence(recordings_root: Path, out_root: Path) -> None:
"""
Compute an across-recordings averaged HR curve aligned by the 4 timestamps
and add a vertical marker for the first convergence point.
"""
recs: list[dict] = []
seg_durations: list[tuple[float, float, float, float, float]] = []
for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]):
rec_name = rec_dir.name
hr_csv = rec_dir / 'hr.csv'
ts_csv = rec_dir / 'timestamps.csv'
if not hr_csv.exists() or not ts_csv.exists():
continue
hr_df = read_signal_csv(hr_csv, 'hr')
marks = read_marks(ts_csv)
if hr_df.empty or marks is None or len(marks) != 4:
continue
# Determine bounds
start_ts = hr_df.index.min()
end_ts = hr_df.index.max()
a, b, c, d = list(marks)
# Ensure marks fall within HR bounds
if not (start_ts < a < b < c < d < end_ts):
# Skip if pathological
continue
# Segment durations in seconds
pre_d = (a - start_ts).total_seconds()
d12 = (b - a).total_seconds()
d23 = (c - b).total_seconds()
d34 = (d - c).total_seconds()
post_d = (end_ts - d).total_seconds()
seg_durations.append((pre_d, d12, d23, d34, post_d))
# Store for second pass
recs.append({'name': rec_name, 'hr': hr_df, 'start': start_ts, 'marks': (a, b, c, d), 'end': end_ts})
if not recs:
print("No valid recordings found for HR alignment")
return
# Median segment durations and normalized cumulative boundaries
med = np.nanmedian(np.array(seg_durations), axis=0)
total = float(np.sum(med))
if total <= 0:
return
proportions = med / total
boundaries = np.concatenate(([0.0], np.cumsum(proportions))) # length 6, last == 1.0
# Common grid (normalized [0,1]) and scaled seconds by median total duration
N = 1000
x_norm = np.linspace(0.0, 1.0, N)
x_sec = x_norm * total
# Helper: piecewise-linear map original timestamp to normalized coordinate
def map_times_to_norm(hr_df: pd.DataFrame, start: pd.Timestamp, marks: tuple, end: pd.Timestamp) -> tuple[np.ndarray, np.ndarray]:
a, b, c, d = marks
segments = [
(start, a, boundaries[0], boundaries[1]),
(a, b, boundaries[1], boundaries[2]),
(b, c, boundaries[2], boundaries[3]),
(c, d, boundaries[3], boundaries[4]),
(d, end, boundaries[4], boundaries[5]),
]
u_list = []
y_list = []
for seg_start, seg_end, u0, u1 in segments:
seg = hr_df.loc[(hr_df.index >= seg_start) & (hr_df.index <= seg_end), 'hr']
if seg.empty:
continue
dur_ns = (seg_end.value - seg_start.value)
if dur_ns <= 0:
continue
rel = (seg.index.view('int64') - np.int64(seg_start.value)) / np.float64(dur_ns)
rel = np.clip(rel.astype(float), 0.0, 1.0)
u = u0 + rel * (u1 - u0)
u_list.append(u)
y_list.append(seg.values.astype(float))
if not u_list:
return np.array([]), np.array([])
u_all = np.concatenate(u_list)
y_all = np.concatenate(y_list)
order = np.argsort(u_all)
u_sorted = u_all[order]
y_sorted = y_all[order]
uniq_u, idx_start = np.unique(u_sorted, return_index=True)
counts = np.diff(np.append(idx_start, len(u_sorted)))
y_avg = []
for i, cnt in enumerate(counts):
start_idx = idx_start[i]
y_avg.append(np.mean(y_sorted[start_idx:start_idx + cnt]))
y_avg = np.asarray(y_avg)
return uniq_u, y_avg
curves = []
for rec in recs:
u, y = map_times_to_norm(rec['hr'], rec['start'], rec['marks'], rec['end'])
if u.size < 2:
continue
y_grid = np.interp(x_norm, u, y, left=np.nan, right=np.nan)
y_grid[x_norm < u.min()] = np.nan
y_grid[x_norm > u.max()] = np.nan
curves.append(y_grid)
if not curves:
print("No valid HR curves generated")
return
Y = np.vstack(curves)
n = np.sum(~np.isnan(Y), axis=0)
marr = np.ma.array(Y, mask=np.isnan(Y))
mean = marr.mean(axis=0).filled(np.nan)
std = marr.std(axis=0).filled(np.nan)
# Create DataFrame for convergence analysis
hr_df_analysis = pd.DataFrame({
'normalized_time': x_norm,
'aligned_seconds': x_sec,
'hr_mean': mean,
'hr_std': std,
'n': n,
})
# Find convergence point
convergence_results = find_hr_convergence(hr_df_analysis)
first_convergence_time = None
target_hr = None
if convergence_results and convergence_results['crossings']:
first_convergence_time = convergence_results['crossings'][0]['time']
target_hr = convergence_results['target_hr']
print(f"\nFirst HR convergence found at: {first_convergence_time:.1f} seconds")
print(f"Target HR (overall average): {target_hr:.3f} BPM")
else:
print("\nNo HR convergence found")
# Plot with convergence marker
fig, ax = plt.subplots(figsize=(14, 6))
# Main HR plot
ax.plot(x_sec, mean, color='tab:blue', linewidth=2, label='Mean HR')
ax.fill_between(x_sec, mean - std, mean + std, color='tab:blue', alpha=0.2, label='±1 SD')
# Segment boundaries (dashed vertical lines)
for xb in boundaries * total:
ax.axvline(xb, color='k', linestyle='--', linewidth=0.8, alpha=0.5)
# Target HR line (overall average)
if target_hr is not None:
ax.axhline(target_hr, color='red', linestyle=':', linewidth=1.5, alpha=0.7,
label=f'Overall Average ({target_hr:.1f} BPM)')
# Convergence marker
if first_convergence_time is not None:
ax.axvline(first_convergence_time, color='red', linestyle='-', linewidth=3, alpha=0.8,
label=f'First Convergence ({first_convergence_time:.1f}s)')
# Add annotation
ax.annotate(f'First Convergence\n{first_convergence_time:.1f}s',
xy=(first_convergence_time, target_hr),
xytext=(first_convergence_time + 50, target_hr + 2),
arrowprops=dict(arrowstyle='->', color='red', alpha=0.8),
fontsize=10, ha='center', va='bottom',
bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8))
ax.set_xlabel('Aligned time (s)')
ax.set_ylabel('HR (bpm)')
ax.set_title('Aligned Average HR Across Recordings with Convergence Marker')
ax.grid(True, alpha=0.3)
ax.legend(loc='upper right')
fig.tight_layout()
# Save plot
fig.savefig(out_root / 'HR_average_aligned_with_convergence.png', dpi=150, bbox_inches='tight')
plt.close(fig)
# Save CSV data
out_csv = out_root / 'HR_average_aligned_with_convergence.csv'
hr_df_analysis.to_csv(out_csv, index=False)
print(f"\nHR alignment with convergence analysis complete.")
print(f"Outputs saved to: {out_root}")
print(f"- Plot: HR_average_aligned_with_convergence.png")
print(f"- Data: HR_average_aligned_with_convergence.csv")
def main() -> None:
"""Main function to run the combined HR analysis."""
root = Path(__file__).resolve().parent
recordings_root = root / 'SingleRecordings'
out_root = root / 'Plots' / 'Aggregate'
ensure_dir(out_root)
print("Combined HR Analysis: Alignment + Convergence Detection")
print("="*60)
print(f"Processing recordings from: {recordings_root}")
print(f"Output directory: {out_root}")
# Count valid recordings
valid_count = 0
for rec_dir in sorted([p for p in recordings_root.iterdir() if p.is_dir()]):
hr_csv = rec_dir / 'hr.csv'
ts_csv = rec_dir / 'timestamps.csv'
if hr_csv.exists() and ts_csv.exists():
marks = read_marks(ts_csv)
if marks is not None and len(marks) == 4:
valid_count += 1
print(f"Found {valid_count} recordings with exactly 4 timestamps")
if valid_count == 0:
print("No valid recordings found. Please check that recordings have exactly 4 timestamps.")
return
# Run the combined analysis
compute_and_plot_aligned_hr_with_convergence(recordings_root, out_root)
if __name__ == '__main__':
main()