Files
wlr-break-timer/break-event

212 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
"""
break-event - Log idle state transitions and auto-trim old events
Called by swayidle to record when the user goes idle or resumes activity.
Automatically trims log entries before qualifying break periods to keep log small.
"""
import os
import sys
import time
import fcntl
import json
from pathlib import Path
from typing import TypedDict
Event = tuple[int, str]
Events = list[Event]
class BreakConfig(TypedDict):
interval: str
duration: str
class Config(TypedDict):
short_break: BreakConfig
long_break: BreakConfig
def parse_duration(duration: str) -> int:
"""Parse duration string like '5m' or '30s' to seconds"""
if isinstance(duration, (int, float)):
return int(duration)
duration = duration.strip()
if duration.endswith('m'):
return int(duration[:-1]) * 60
elif duration.endswith('s'):
return int(duration[:-1])
elif duration.endswith('h'):
return int(duration[:-1]) * 3600
else:
# Assume seconds if no unit
return int(duration)
def load_config() -> Config:
"""Load configuration from ~/.config/break-timer/config.json"""
config_path = Path.home() / ".config" / "break-timer" / "config.json"
# Default configuration
default_config = {
"short_break": {"interval": "20m", "duration": "20s"},
"long_break": {"interval": "45m", "duration": "5m"}
}
if not config_path.exists():
return default_config
try:
with open(config_path) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return default_config
def get_log_path() -> Path:
"""Get the path to the activity log file"""
runtime_dir = os.environ.get('XDG_RUNTIME_DIR')
if runtime_dir:
# Probably /run/user/$UID/break-timer
log_dir = Path(runtime_dir) / "break-timer"
else:
log_dir = Path.home() / ".cache" / "break-timer"
log_dir.mkdir(parents=True, exist_ok=True)
return log_dir / "activity.log"
def parse_log(log_path: Path) -> Events:
"""Parse the log file and return list of (timestamp, event_type) tuples"""
events = []
if not log_path.exists():
return events
try:
with open(log_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
parts = line.split(maxsplit=1)
if len(parts) == 2:
try:
timestamp = int(parts[0])
event_type = parts[1]
events.append((timestamp, event_type))
except ValueError:
continue # Skip malformed lines
except IOError:
pass
return events
def trim_log(events: Events, long_break_duration: int) -> Events:
"""
Trim events before qualifying long breaks.
Returns filtered list of events.
"""
if len(events) < 2:
return events
# Find the most recent idle period that qualifies as a long break
last_qualifying_idle_index = None
for i in range(len(events) - 1):
timestamp, event_type = events[i]
if event_type == "idle_start":
# Find the corresponding idle_end
for j in range(i + 1, len(events)):
next_timestamp, next_event_type = events[j]
if next_event_type == "idle_end":
idle_duration = next_timestamp - timestamp
if idle_duration >= long_break_duration:
last_qualifying_idle_index = j
break
elif next_event_type == "idle_start":
# No idle_end found, check if still idle
break
# Also check if currently idle and duration qualifies
if events and events[-1][1] == "idle_start":
idle_start_time = events[-1][0]
current_time = int(time.time())
if current_time - idle_start_time >= long_break_duration:
last_qualifying_idle_index = len(events) - 1
# Trim everything before the qualifying idle_start
if last_qualifying_idle_index is not None:
return events[last_qualifying_idle_index:]
return events
def write_log(log_path: Path, events: Events) -> None:
"""Write events to log file atomically"""
temp_path = log_path.with_suffix('.tmp')
try:
with open(temp_path, 'w') as f:
for timestamp, event_type in events:
f.write(f"{timestamp} {event_type}\n")
# Atomic rename
temp_path.rename(log_path)
except IOError as e:
print(f"Error writing log: {e}", file=sys.stderr)
if temp_path.exists():
temp_path.unlink()
def is_paused(events: Events) -> bool:
"""Check if timer is currently paused (last event is 'paused')"""
return bool(events and events[-1][1] == "paused")
def main():
if len(sys.argv) != 2 or sys.argv[1] not in ("idle_start", "idle_end", "skip_long", "toggle-pause"):
print("Usage: break-event <idle_start|idle_end|skip_long|toggle-pause>", file=sys.stderr)
sys.exit(1)
event_type = sys.argv[1]
config = load_config()
log_path = get_log_path()
lock_path = log_path.parent / "break-timer.lock"
# Convert config to seconds
long_break_duration = parse_duration(config["long_break"]["duration"])
# Acquire lock for atomic operation
lock_path.touch()
with open(lock_path, 'w') as lock_file:
fcntl.flock(lock_file, fcntl.LOCK_EX)
# Read current events
events = parse_log(log_path)
try:
if event_type == "skip_long":
# Erase all events from the log, effectively resetting all timers.
write_log(log_path, [])
elif event_type == "toggle-pause":
current_time = int(time.time())
event_key = "paused" if not is_paused(events) else "unpaused"
events.append((current_time, event_key))
write_log(log_path, events)
else:
# Don't log idle events when paused
if not is_paused(events):
current_time = int(time.time())
events.append((current_time, event_type))
events = trim_log(events, long_break_duration)
write_log(log_path, events)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
if __name__ == "__main__":
main()