#!/usr/bin/env python3 """ break-event - Log idle state transitions and auto-trim old events Called by swayidle to record when the user goes idle or resumes activity. Automatically trims log entries before qualifying break periods to keep log small. """ import os import sys import time import fcntl import json from pathlib import Path from typing import TypedDict Event = tuple[int, str] Events = list[Event] class BreakConfig(TypedDict): interval: str duration: str class Config(TypedDict): short_break: BreakConfig long_break: BreakConfig def parse_duration(duration: str) -> int: """Parse duration string like '5m' or '30s' to seconds""" if isinstance(duration, (int, float)): return int(duration) duration = duration.strip() if duration.endswith('m'): return int(duration[:-1]) * 60 elif duration.endswith('s'): return int(duration[:-1]) elif duration.endswith('h'): return int(duration[:-1]) * 3600 else: # Assume seconds if no unit return int(duration) def load_config() -> Config: """Load configuration from ~/.config/break-timer/config.json""" config_path = Path.home() / ".config" / "break-timer" / "config.json" # Default configuration default_config = { "short_break": {"interval": "20m", "duration": "20s"}, "long_break": {"interval": "45m", "duration": "5m"} } if not config_path.exists(): return default_config try: with open(config_path) as f: return json.load(f) except (json.JSONDecodeError, IOError): return default_config def get_log_path() -> Path: """Get the path to the activity log file""" runtime_dir = os.environ.get('XDG_RUNTIME_DIR') if runtime_dir: # Probably /run/user/$UID/break-timer log_dir = Path(runtime_dir) / "break-timer" else: log_dir = Path.home() / ".cache" / "break-timer" log_dir.mkdir(parents=True, exist_ok=True) return log_dir / "activity.log" def parse_log(log_path: Path) -> Events: """Parse the log file and return list of (timestamp, event_type) tuples""" events = [] if not log_path.exists(): return events try: with open(log_path, 'r') as f: for line in f: line = line.strip() if not line: continue parts = line.split(maxsplit=1) if len(parts) == 2: try: timestamp = int(parts[0]) event_type = parts[1] events.append((timestamp, event_type)) except ValueError: continue # Skip malformed lines except IOError: pass return events def trim_log(events: Events, long_break_duration: int) -> Events: """ Trim events before qualifying long breaks. Returns filtered list of events. """ if len(events) < 2: return events # Find the most recent idle period that qualifies as a long break last_qualifying_idle_index = None for i in range(len(events) - 1): timestamp, event_type = events[i] if event_type == "idle_start": # Find the corresponding idle_end for j in range(i + 1, len(events)): next_timestamp, next_event_type = events[j] if next_event_type == "idle_end": idle_duration = next_timestamp - timestamp if idle_duration >= long_break_duration: last_qualifying_idle_index = j break elif next_event_type == "idle_start": # No idle_end found, check if still idle break # Also check if currently idle and duration qualifies if events and events[-1][1] == "idle_start": idle_start_time = events[-1][0] current_time = int(time.time()) if current_time - idle_start_time >= long_break_duration: last_qualifying_idle_index = len(events) - 1 # Trim everything before the qualifying idle_start if last_qualifying_idle_index is not None: return events[last_qualifying_idle_index:] return events def write_log(log_path: Path, events: Events) -> None: """Write events to log file atomically""" temp_path = log_path.with_suffix('.tmp') try: with open(temp_path, 'w') as f: for timestamp, event_type in events: f.write(f"{timestamp} {event_type}\n") # Atomic rename temp_path.rename(log_path) except IOError as e: print(f"Error writing log: {e}", file=sys.stderr) if temp_path.exists(): temp_path.unlink() def is_paused(events: Events) -> bool: """Check if timer is currently paused (last event is 'paused')""" return bool(events and events[-1][1] == "paused") def main(): if len(sys.argv) != 2 or sys.argv[1] not in ("idle_start", "idle_end", "skip_long", "toggle-pause"): print("Usage: break-event ", file=sys.stderr) sys.exit(1) event_type = sys.argv[1] config = load_config() log_path = get_log_path() lock_path = log_path.parent / "break-timer.lock" # Convert config to seconds long_break_duration = parse_duration(config["long_break"]["duration"]) # Acquire lock for atomic operation lock_path.touch() with open(lock_path, 'w') as lock_file: fcntl.flock(lock_file, fcntl.LOCK_EX) # Read current events events = parse_log(log_path) try: if event_type == "skip_long": # Erase all events from the log, effectively resetting all timers. write_log(log_path, []) elif event_type == "toggle-pause": current_time = int(time.time()) event_key = "paused" if not is_paused(events) else "unpaused" events.append((current_time, event_key)) write_log(log_path, events) else: # Don't log idle events when paused if not is_paused(events): current_time = int(time.time()) events.append((current_time, event_type)) events = trim_log(events, long_break_duration) write_log(log_path, events) finally: fcntl.flock(lock_file, fcntl.LOCK_UN) if __name__ == "__main__": main()