Files
Master-Arbeit-Tom-Hempel/Unity-Master/Assets/vr_experiment_supervisor.py
2025-09-21 22:42:26 +02:00

658 lines
30 KiB
Python

#!/usr/bin/env python3
"""
VR Experiment Supervisor Control
A GUI application for controlling Unity VR experiments via UDP broadcast messages.
"""
import tkinter as tk
from tkinter import ttk, messagebox, scrolledtext
import socket
import json
import threading
import time
from datetime import datetime
class VRExperimentSupervisor:
def __init__(self, root):
self.root = root
self.root.title("VR Experiment Supervisor")
self.root.geometry("750x800") # Reduced height since we now have scrollbar
self.root.resizable(True, True)
# Experiment configuration
self.condition_types = ["Helpful", "Demotivating", "Control", "Non-Interactive"]
self.object_types = ["Brick", "Paperclip", "Rope", "Book"]
# Experiment state
self.experiment_sequence = []
self.current_condition_index = 0
self.experiment_configured = False
self.condition_start_time = None
self.countdown_timer = None
self.countdown_active = False
self.practice_round_completed = False
self.practice_round_active = False
# UDP settings
self.udp_ip = "10.194.161.255"
self.udp_port = 1221
# Create GUI
self.create_gui()
# Log initial status
self.log_message("Application started. Ready to configure experiment.")
# Start the countdown update timer
self.update_countdown_display()
def create_gui(self):
"""Create and layout all GUI elements"""
# Create main canvas with scrollbar
canvas = tk.Canvas(self.root)
scrollbar = ttk.Scrollbar(self.root, orient="vertical", command=canvas.yview)
scrollable_frame = ttk.Frame(canvas)
scrollable_frame.bind(
"<Configure>",
lambda e: canvas.configure(scrollregion=canvas.bbox("all"))
)
canvas.create_window((0, 0), window=scrollable_frame, anchor="nw")
canvas.configure(yscrollcommand=scrollbar.set)
# Configure grid weights for responsive design
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
canvas.columnconfigure(0, weight=1)
scrollable_frame.columnconfigure(0, weight=1)
# Pack canvas and scrollbar
canvas.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
scrollbar.grid(row=0, column=1, sticky=(tk.N, tk.S))
# Configure canvas to expand
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
# Main frame with padding (now inside scrollable frame)
main_frame = ttk.Frame(scrollable_frame, padding="10")
main_frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
main_frame.columnconfigure(0, weight=1)
row = 0
# Instructions section
instructions_frame = ttk.LabelFrame(main_frame, text="Instructions", padding="10")
instructions_frame.grid(row=row, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
instructions_frame.columnconfigure(0, weight=1)
instructions_text = (
f"Broadcasting to: {self.udp_ip} on port {self.udp_port}\n"
"Note: Shares port 1221 with avatar sync system\n"
"1. Start with Practice Round (optional but recommended)\n"
"2. Configure experiment parameters below\n"
"3. Click 'Set Experiment Parameters' to confirm setup\n"
"4. Use 'Start Current Condition' and 'Next Condition' to control experiment"
)
instructions_label = ttk.Label(instructions_frame, text=instructions_text, justify=tk.LEFT)
instructions_label.grid(row=0, column=0, sticky=(tk.W, tk.E))
row += 1
# Practice Round section
practice_frame = ttk.LabelFrame(main_frame, text="Practice Round", padding="10")
practice_frame.grid(row=row, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
practice_frame.columnconfigure(0, weight=1)
practice_frame.columnconfigure(1, weight=1)
# Practice round description
practice_desc = ttk.Label(practice_frame, text="5-minute practice round with practice agent only (no objects)",
font=("TkDefaultFont", 9, "italic"))
practice_desc.grid(row=0, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(0, 10))
# Practice round buttons
self.start_practice_button = ttk.Button(practice_frame, text="Start Practice Round",
command=self.start_practice_round)
self.start_practice_button.grid(row=1, column=0, padx=(0, 5), pady=5, sticky=(tk.W, tk.E))
self.force_practice_end_button = ttk.Button(practice_frame, text="End Practice Early",
command=self.force_end_practice, state=tk.DISABLED)
self.force_practice_end_button.grid(row=1, column=1, padx=(5, 0), pady=5, sticky=(tk.W, tk.E))
# Practice status
self.practice_status_label = ttk.Label(practice_frame, text="Practice round not started",
font=("TkDefaultFont", 9, "bold"))
self.practice_status_label.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=(5, 0))
row += 1
# Session Information section
session_frame = ttk.LabelFrame(main_frame, text="Session Information", padding="10")
session_frame.grid(row=row, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
session_frame.columnconfigure(1, weight=1)
# Group ID
ttk.Label(session_frame, text="Group ID:").grid(row=0, column=0, sticky=tk.W, padx=(0, 10), pady=2)
self.group_id_var = tk.StringVar()
self.group_id_entry = ttk.Entry(session_frame, textvariable=self.group_id_var, width=20)
self.group_id_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), pady=2)
# Notes
ttk.Label(session_frame, text="Notes:").grid(row=1, column=0, sticky=(tk.W, tk.N), padx=(0, 10), pady=(5, 2))
self.notes_text = scrolledtext.ScrolledText(session_frame, height=4, wrap=tk.WORD, width=50)
self.notes_text.grid(row=1, column=1, sticky=(tk.W, tk.E), pady=(5, 2))
# Save button
self.save_button = ttk.Button(session_frame, text="Save Session Data", command=self.save_session_data)
self.save_button.grid(row=2, column=0, columnspan=2, pady=(10, 0))
row += 1
# Experiment Setup section
setup_frame = ttk.LabelFrame(main_frame, text="Experiment Setup", padding="10")
setup_frame.grid(row=row, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
setup_frame.columnconfigure(1, weight=1)
setup_frame.columnconfigure(3, weight=1)
# Headers
ttk.Label(setup_frame, text="Condition", font=("TkDefaultFont", 9, "bold")).grid(row=0, column=0, padx=(0, 10), pady=(0, 5))
ttk.Label(setup_frame, text="Condition Type", font=("TkDefaultFont", 9, "bold")).grid(row=0, column=1, padx=(0, 10), pady=(0, 5))
ttk.Label(setup_frame, text="Object", font=("TkDefaultFont", 9, "bold")).grid(row=0, column=2, padx=(0, 10), pady=(0, 5))
ttk.Label(setup_frame, text="Object Type", font=("TkDefaultFont", 9, "bold")).grid(row=0, column=3, pady=(0, 5))
# Create dropdowns for each condition
self.condition_vars = []
self.object_vars = []
for i in range(4):
# Condition label
ttk.Label(setup_frame, text=f"Condition {i+1}:").grid(row=i+1, column=0, sticky=tk.W, padx=(0, 10), pady=2)
# Condition type dropdown
condition_var = tk.StringVar()
condition_combo = ttk.Combobox(setup_frame, textvariable=condition_var, values=self.condition_types, state="readonly", width=15)
condition_combo.grid(row=i+1, column=1, sticky=(tk.W, tk.E), padx=(0, 10), pady=2)
self.condition_vars.append(condition_var)
# Object label
ttk.Label(setup_frame, text="Object:").grid(row=i+1, column=2, sticky=tk.W, padx=(0, 10), pady=2)
# Object type dropdown
object_var = tk.StringVar()
object_combo = ttk.Combobox(setup_frame, textvariable=object_var, values=self.object_types, state="readonly", width=15)
object_combo.grid(row=i+1, column=3, sticky=(tk.W, tk.E), pady=2)
self.object_vars.append(object_var)
# Set experiment parameters button
self.set_params_button = ttk.Button(setup_frame, text="Set Experiment Parameters", command=self.set_experiment_parameters)
self.set_params_button.grid(row=5, column=0, columnspan=4, pady=(10, 0))
row += 1
# Current Experiment Display section
display_frame = ttk.LabelFrame(main_frame, text="Current Experiment Sequence", padding="10")
display_frame.grid(row=row, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
display_frame.columnconfigure(0, weight=1)
self.sequence_display = scrolledtext.ScrolledText(display_frame, height=6, state=tk.DISABLED, wrap=tk.WORD)
self.sequence_display.grid(row=0, column=0, sticky=(tk.W, tk.E))
row += 1
# Control Buttons section
control_frame = ttk.LabelFrame(main_frame, text="Experiment Control", padding="10")
control_frame.grid(row=row, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
control_frame.columnconfigure(0, weight=1)
control_frame.columnconfigure(1, weight=1)
control_frame.columnconfigure(2, weight=1)
control_frame.columnconfigure(3, weight=1)
self.start_button = ttk.Button(control_frame, text="Start Current Condition", command=self.start_current_condition, state=tk.DISABLED)
self.start_button.grid(row=0, column=0, padx=(0, 5), pady=5, sticky=(tk.W, tk.E))
self.next_button = ttk.Button(control_frame, text="Next Condition", command=self.next_condition, state=tk.DISABLED)
self.next_button.grid(row=0, column=1, padx=5, pady=5, sticky=(tk.W, tk.E))
self.force_next_button = ttk.Button(control_frame, text="Force Next (Override Timer)", command=self.force_next_condition, state=tk.DISABLED)
self.force_next_button.grid(row=0, column=2, padx=5, pady=5, sticky=(tk.W, tk.E))
self.reset_button = ttk.Button(control_frame, text="Reset Experiment", command=self.reset_experiment)
self.reset_button.grid(row=0, column=3, padx=(5, 0), pady=5, sticky=(tk.W, tk.E))
row += 1
# Status Display section
status_frame = ttk.LabelFrame(main_frame, text="Experiment Status", padding="10")
status_frame.grid(row=row, column=0, sticky=(tk.W, tk.E), pady=(0, 10))
status_frame.columnconfigure(0, weight=1)
self.status_label = ttk.Label(status_frame, text="Ready to configure experiment", font=("TkDefaultFont", 10, "bold"))
self.status_label.grid(row=0, column=0, sticky=(tk.W, tk.E))
# Countdown timer display
self.countdown_label = ttk.Label(status_frame, text="", font=("TkDefaultFont", 12, "bold"), foreground="red")
self.countdown_label.grid(row=1, column=0, sticky=(tk.W, tk.E), pady=(5, 0))
row += 1
# Error/Log Display section
log_frame = ttk.LabelFrame(main_frame, text="Log Messages", padding="10")
log_frame.grid(row=row, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
log_frame.columnconfigure(0, weight=1)
log_frame.rowconfigure(0, weight=1)
main_frame.rowconfigure(row, weight=1)
self.log_display = scrolledtext.ScrolledText(log_frame, height=8, state=tk.DISABLED, wrap=tk.WORD)
self.log_display.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
def start_practice_round(self):
"""Start the practice round with practice agent only"""
if self.practice_round_active:
messagebox.showwarning("Warning", "Practice round is already active!")
return
if messagebox.askyesno("Start Practice Round", "Start 5-minute practice round with practice agent only?"):
self.practice_round_active = True
self.practice_round_completed = False
# Send practice round start command
message_data = {
"command": "start_practice_round",
"duration": 300 # 5 minutes in seconds
}
if self.send_udp_message(message_data):
# Update UI
self.start_practice_button.config(state=tk.DISABLED)
self.force_practice_end_button.config(state=tk.NORMAL)
self.practice_status_label.config(text="Practice round active - 5 minutes remaining")
# Start practice countdown timer
self.condition_start_time = time.time()
self.countdown_active = True
self.log_message("Practice round started - 5-minute timer active")
self.update_status("Practice round active")
def force_end_practice(self):
"""Force end the practice round early"""
if not self.practice_round_active:
return
if messagebox.askyesno("End Practice Early", "Are you sure you want to end the practice round early?"):
self.end_practice_round("manually_ended")
def end_practice_round(self, reason="timer_expired"):
"""End the practice round"""
self.practice_round_active = False
self.countdown_active = False
# Send practice round end command
message_data = {
"command": "end_practice_round",
"reason": reason
}
if self.send_udp_message(message_data):
# Update UI
self.start_practice_button.config(state=tk.NORMAL)
self.force_practice_end_button.config(state=tk.DISABLED)
self.practice_status_label.config(text="Practice round completed")
if reason == "timer_expired":
self.log_message("Practice round completed - 5-minute timer expired")
self.update_status("Practice round completed")
else:
self.log_message("Practice round ended early by supervisor")
self.update_status("Practice round ended early")
self.practice_round_completed = True
def log_message(self, message):
"""Add a timestamped message to the log display"""
timestamp = datetime.now().strftime("%H:%M:%S")
formatted_message = f"[{timestamp}] {message}\n"
self.log_display.config(state=tk.NORMAL)
self.log_display.insert(tk.END, formatted_message)
self.log_display.see(tk.END)
self.log_display.config(state=tk.DISABLED)
def update_status(self, status):
"""Update the status label"""
self.status_label.config(text=status)
self.log_message(f"Status: {status}")
def start_countdown_timer(self):
"""Start the 5-minute countdown timer for the current condition"""
self.condition_start_time = time.time()
self.countdown_active = True
self.log_message("5-minute countdown timer started for current condition")
def update_countdown_display(self):
"""Update the countdown display every second"""
if self.countdown_active and self.condition_start_time is not None:
elapsed_time = time.time() - self.condition_start_time
remaining_time = max(0, 300 - elapsed_time) # 300 seconds = 5 minutes
if remaining_time > 0:
minutes = int(remaining_time // 60)
seconds = int(remaining_time % 60)
self.countdown_label.config(text=f"Time Remaining: {minutes:02d}:{seconds:02d}")
else:
# Timer expired
self.countdown_label.config(text="TIME EXPIRED - Block Finished")
self.countdown_active = False
if self.practice_round_active:
self.end_practice_round("timer_expired")
else:
self.condition_finished()
else:
self.countdown_label.config(text="")
# Schedule next update
self.root.after(1000, self.update_countdown_display)
def condition_finished(self):
"""Called when the 5-minute timer expires"""
self.log_message("5-minute timer expired - sending disable_all command")
# Send command to Unity to disable all objects and avatars
message_data = {
"command": "disable_all",
"reason": "timer_expired"
}
if self.send_udp_message(message_data):
self.update_status("Block finished - All objects disabled. Ready for next condition.")
# Enable next condition button if not the last condition
if self.current_condition_index < len(self.experiment_sequence) - 1:
self.next_button.config(state=tk.NORMAL)
self.force_next_button.config(state=tk.DISABLED) # Disable force button when timer expires
def force_next_condition(self):
"""Force move to next condition (override timer)"""
if messagebox.askyesno("Override Timer", "Are you sure you want to skip the remaining time and move to the next condition?"):
# Stop the current timer
self.countdown_active = False
self.log_message("Timer manually overridden by supervisor")
# Call the condition finished function to disable current objects
self.condition_finished()
def save_session_data(self):
"""Save session data to a text file"""
try:
group_id = self.group_id_var.get().strip()
if not group_id:
messagebox.showerror("Error", "Please enter a Group ID before saving.")
return
if not self.experiment_configured:
messagebox.showerror("Error", "Please configure the experiment before saving.")
return
# Create filename with group ID and timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"VR_Experiment_{group_id}_{timestamp}.txt"
# Prepare data to save
notes = self.notes_text.get(1.0, tk.END).strip()
with open(filename, 'w', encoding='utf-8') as f:
f.write(f"VR Experiment Session Data\n")
f.write(f"{'='*50}\n\n")
f.write(f"Group ID: {group_id}\n")
f.write(f"Date/Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Practice Round Completed: {'Yes' if self.practice_round_completed else 'No'}\n")
f.write(f"Current Condition Index: {self.current_condition_index}\n\n")
f.write(f"Experiment Sequence:\n")
f.write(f"-" * 20 + "\n")
for i, condition in enumerate(self.experiment_sequence):
status = ""
if i < self.current_condition_index:
status = " [COMPLETED]"
elif i == self.current_condition_index:
status = " [CURRENT]"
else:
status = " [PENDING]"
f.write(f"Condition {i+1}: {condition['condition_type']} ({condition['object_type']}){status}\n")
f.write(f"\nSupervisor Notes:\n")
f.write(f"-" * 20 + "\n")
if notes:
f.write(f"{notes}\n")
else:
f.write("No notes provided.\n")
self.log_message(f"Session data saved to: {filename}")
messagebox.showinfo("Success", f"Session data saved to:\n{filename}")
except Exception as e:
error_msg = f"Failed to save session data: {str(e)}"
self.log_message(f"ERROR: {error_msg}")
messagebox.showerror("Error", error_msg)
def set_experiment_parameters(self):
"""Validate and set the experiment parameters"""
try:
# Get selected values
selected_conditions = [var.get() for var in self.condition_vars]
selected_objects = [var.get() for var in self.object_vars]
# Validate that all dropdowns are filled
if any(not cond for cond in selected_conditions) or any(not obj for obj in selected_objects):
messagebox.showerror("Error", "Please select condition types and objects for all four conditions.")
return
# Validate that each condition type is used exactly once
if len(set(selected_conditions)) != 4 or set(selected_conditions) != set(self.condition_types):
messagebox.showerror("Error", "Each condition type (Helpful, Demotivating, Control, Non-Interactive) must be used exactly once.")
return
# Validate that each object type is used exactly once
if len(set(selected_objects)) != 4 or set(selected_objects) != set(self.object_types):
messagebox.showerror("Error", "Each object type (Brick, Paperclip, Rope, Book) must be used exactly once.")
return
# Create experiment sequence
self.experiment_sequence = []
for i in range(4):
self.experiment_sequence.append({
"condition_index": i,
"condition_type": selected_conditions[i],
"object_type": selected_objects[i]
})
self.current_condition_index = 0
self.experiment_configured = True
# Update sequence display
self.update_sequence_display()
# Enable control buttons
self.start_button.config(state=tk.NORMAL)
self.next_button.config(state=tk.DISABLED) # Will be enabled after timer
self.force_next_button.config(state=tk.DISABLED) # Will be enabled after start
# Update status
self.update_status("Experiment configured. Ready to start first condition.")
messagebox.showinfo("Success", "Experiment parameters set successfully!")
except Exception as e:
self.log_message(f"Error setting experiment parameters: {str(e)}")
messagebox.showerror("Error", f"Failed to set experiment parameters: {str(e)}")
def update_sequence_display(self):
"""Update the experiment sequence display"""
if not self.experiment_configured:
display_text = "No experiment configured."
else:
display_text = "Experiment Sequence:\n\n"
for i, condition in enumerate(self.experiment_sequence):
status = ""
if i < self.current_condition_index:
status = " [COMPLETED]"
elif i == self.current_condition_index:
status = " [CURRENT]"
else:
status = " [PENDING]"
display_text += f"Condition {i+1}: {condition['condition_type']} ({condition['object_type']}){status}\n"
self.sequence_display.config(state=tk.NORMAL)
self.sequence_display.delete(1.0, tk.END)
self.sequence_display.insert(1.0, display_text)
self.sequence_display.config(state=tk.DISABLED)
def send_udp_message(self, message_data):
"""Send UDP broadcast message"""
try:
# Create UDP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
# Convert message to JSON
json_message = json.dumps(message_data)
# Send message
sock.sendto(json_message.encode('utf-8'), (self.udp_ip, self.udp_port))
sock.close()
self.log_message(f"Sent UDP message: {json_message}")
return True
except Exception as e:
error_msg = f"Failed to send UDP message: {str(e)}"
self.log_message(f"ERROR: {error_msg}")
messagebox.showerror("Network Error", error_msg)
return False
def start_current_condition(self):
"""Start the current condition"""
if not self.experiment_configured or self.current_condition_index >= len(self.experiment_sequence):
return
current_condition = self.experiment_sequence[self.current_condition_index]
message_data = {
"command": "start_condition",
"condition_type": current_condition["condition_type"],
"object_type": current_condition["object_type"],
"condition_index": self.current_condition_index
}
if self.send_udp_message(message_data):
# Update UI
self.start_button.config(state=tk.DISABLED)
self.next_button.config(state=tk.DISABLED) # Disabled until timer expires
# Enable force next button if there are more conditions
if self.current_condition_index < len(self.experiment_sequence) - 1:
self.force_next_button.config(state=tk.NORMAL)
condition_name = f"{current_condition['condition_type']} ({current_condition['object_type']})"
self.update_status(f"Current Condition: {condition_name}")
self.update_sequence_display()
# Start the 5-minute countdown timer
self.start_countdown_timer()
def next_condition(self):
"""Move to the next condition"""
if not self.experiment_configured:
return
self.current_condition_index += 1
if self.current_condition_index >= len(self.experiment_sequence):
# Experiment completed
self.update_status("Experiment Completed")
self.start_button.config(state=tk.DISABLED)
self.next_button.config(state=tk.DISABLED)
self.update_sequence_display()
messagebox.showinfo("Experiment Complete", "All conditions have been completed!")
return
current_condition = self.experiment_sequence[self.current_condition_index]
message_data = {
"command": "next_condition",
"condition_type": current_condition["condition_type"],
"object_type": current_condition["object_type"],
"condition_index": self.current_condition_index
}
if self.send_udp_message(message_data):
# Update UI
self.next_button.config(state=tk.DISABLED) # Disabled until timer expires
# Enable force next button if there are more conditions after this one
if self.current_condition_index < len(self.experiment_sequence) - 1:
self.force_next_button.config(state=tk.NORMAL)
else:
self.force_next_button.config(state=tk.DISABLED)
condition_name = f"{current_condition['condition_type']} ({current_condition['object_type']})"
self.update_status(f"Current Condition: {condition_name}")
self.update_sequence_display()
# Start the 5-minute countdown timer for the new condition
self.start_countdown_timer()
def reset_experiment(self):
"""Reset the experiment to allow new configuration"""
if messagebox.askyesno("Reset Experiment", "Are you sure you want to reset the experiment? This will clear the current configuration."):
# Reset state
self.experiment_sequence = []
self.current_condition_index = 0
self.experiment_configured = False
self.countdown_active = False
self.condition_start_time = None
self.practice_round_completed = False
self.practice_round_active = False
# Clear dropdowns
for var in self.condition_vars:
var.set("")
for var in self.object_vars:
var.set("")
# Reset UI
self.start_button.config(state=tk.DISABLED)
self.next_button.config(state=tk.DISABLED)
self.force_next_button.config(state=tk.DISABLED)
self.start_practice_button.config(state=tk.NORMAL)
self.force_practice_end_button.config(state=tk.DISABLED)
self.practice_status_label.config(text="Practice round not started")
self.update_status("Ready to configure experiment")
self.update_sequence_display()
self.log_message("Experiment reset. Ready for new configuration.")
def main():
"""Main function to run the application"""
root = tk.Tk()
app = VRExperimentSupervisor(root)
# Center the window on screen
root.update_idletasks()
x = (root.winfo_screenwidth() // 2) - (root.winfo_width() // 2)
y = (root.winfo_screenheight() // 2) - (root.winfo_height() // 2)
root.geometry(f"+{x}+{y}")
# Start the GUI event loop
try:
root.mainloop()
except KeyboardInterrupt:
print("\nApplication closed by user.")
if __name__ == "__main__":
main()