dotfiles/local-bin/.local/bin/niri-dev-launcher

238 lines
8.6 KiB
Python
Executable file

#!/usr/bin/env python3
"""Niri Dev Launcher - Launch development environments from git repositories"""
import argparse
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import time
from pathlib import Path
CACHE_MAX_AGE = 300
MRU_SIZE = 5
def debug(msg):
if os.getenv("DEBUG") == "1":
print(f"[DEBUG] {msg}", file=sys.stderr)
def get_cache_files(dev_dir):
dev_dir_hash = hashlib.sha256(dev_dir.encode()).hexdigest()[:16]
cache_dir = Path.home() / ".cache"
return (
cache_dir / f"niri-dev-launcher-cache-{dev_dir_hash}",
cache_dir / f"niri-dev-launcher-mru-{dev_dir_hash}",
)
# Cache fd availability check
_HAS_FD = None
def _has_fd():
global _HAS_FD
if _HAS_FD is None:
_HAS_FD = shutil.which("fd") is not None
return _HAS_FD
def find_git_repos(dev_dir, no_cache=False):
cache_file, _ = get_cache_files(dev_dir)
debug("find_git_repos: checking cache...")
if not no_cache and cache_file.exists():
cache_age = time.time() - os.path.getmtime(cache_file)
debug(f"Cache age: {cache_age:.0f}s (max: {CACHE_MAX_AGE}s)")
if cache_age < CACHE_MAX_AGE:
debug("Using cached repos")
return [r for r in cache_file.read_text().strip().split("\n") if r]
debug("Cache expired, rescanning...")
debug(f"Scanning for git repositories in {dev_dir}...")
if not Path(dev_dir).exists():
return []
repos = []
if _has_fd():
debug("Using fd for scanning")
result = subprocess.run(
["fd", "-H", "-t", "d", "^\.git$", dev_dir, "-d", "3", "-0"],
capture_output=True, text=True,
)
else:
debug("Using find for scanning")
result = subprocess.run(
["find", dev_dir, "-maxdepth", "3", "-type", "d", "-name", ".git", "-print0"],
capture_output=True, text=True,
)
if result.returncode == 0:
repos = [str(Path(g).parent) for g in result.stdout.strip("\0").split("\0") if g]
repos = sorted(set(repos))
cache_file.parent.mkdir(parents=True, exist_ok=True)
cache_file.write_text("\n".join(repos))
debug(f"Found {len(repos)} repositories")
return repos
def get_project_name(repo_path):
name = Path(repo_path).name.lower()
name = re.sub(r"[^a-z0-9]", "-", name)
name = re.sub(r"-+", "-", name)
return name.strip("-")
def update_mru(selected_display, dev_dir):
_, mru_file = get_cache_files(dev_dir)
mru_list = [l.strip() for l in mru_file.read_text().split("\n") if l.strip()] if mru_file.exists() else []
# Remove if exists (using list comprehension is faster than remove+insert for small lists)
mru_list = [s for s in mru_list if s != selected_display]
mru_list.insert(0, selected_display)
mru_file.parent.mkdir(parents=True, exist_ok=True)
mru_file.write_text("\n".join(mru_list[:MRU_SIZE]))
def sort_by_mru(display_list, dev_dir):
_, mru_file = get_cache_files(dev_dir)
mru_set = set()
if mru_file.exists():
mru_set = {l.strip() for l in mru_file.read_text().split("\n") if l.strip()}
if not mru_set:
return sorted(display_list)
# Use set for O(1) lookup instead of list
mru_repos = sorted([f"{d}" for d in display_list if d in mru_set])
non_mru_repos = sorted([d for d in display_list if d not in mru_set])
return mru_repos + non_mru_repos
def find_window_by_app_id(app_id):
debug(f"Searching for window with app-id: {app_id}")
try:
result = subprocess.run(["niri", "msg", "--json", "windows"], capture_output=True, text=True, check=False)
windows_json = result.stdout.strip()
if not windows_json or windows_json == "[]":
return None
windows = json.loads(windows_json)
for window in windows:
if window.get("app_id", "").lower() == app_id.lower():
return str(window.get("id"))
return None
except (json.JSONDecodeError, subprocess.SubprocessError, KeyError):
return None
def focus_or_launch_alacritty(class_name, title, working_dir, session_name):
debug(f"focus_or_launch_alacritty: class={class_name}, title={title}, dir={working_dir}")
window_id = find_window_by_app_id(class_name)
if window_id:
debug("Found existing window, focusing...")
subprocess.run(["niri", "msg", "action", "focus-window", "--id", window_id],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
debug("No existing window found, launching alacritty...")
editor = os.getenv("EDITOR", "nvim")
tmux_cmd = f"""if tmux has-session -t '{session_name}' 2>/dev/null; then
tmux attach -t '{session_name}';
else
tmux new-session -c '{working_dir}' -s '{session_name}' -d '{editor}';
tmux split-window -h -c '{working_dir}' -t '{session_name}';
tmux select-pane -t '{session_name}:0.0';
tmux attach -t '{session_name}';
fi"""
subprocess.Popen(
["alacritty", f"--class={class_name}", f"--title={title}",
f"--working-directory={working_dir}", "-e", "bash", "-c", tmux_cmd],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True,
)
def main():
parser = argparse.ArgumentParser(description="Niri Dev Launcher")
parser.add_argument("dev_dir", nargs="?", default=os.path.expanduser("~/Dev"), help="Development directory")
parser.add_argument("--no-cache", action="store_true", help="Bypass cache and rescan repositories")
parser.add_argument("--clear-cache", action="store_true", help="Clear cache and MRU files")
args = parser.parse_args()
debug(f"Starting niri-dev-launcher with args: {sys.argv[1:]}")
debug(f"DEV_DIR: {args.dev_dir}")
cache_file, mru_file = get_cache_files(args.dev_dir)
debug(f"CACHE_FILE: {cache_file}")
debug(f"MRU_FILE: {mru_file}")
if args.clear_cache:
if cache_file.exists():
cache_file.unlink()
if mru_file.exists():
mru_file.unlink()
subprocess.run(["notify-send", "Niri Dev Launcher", "Cache cleared"], check=False)
sys.exit(0)
repos = find_git_repos(args.dev_dir, args.no_cache)
if not repos:
debug("No repositories found!")
subprocess.run(["notify-send", "Niri Dev Launcher", f"No git repositories found in {args.dev_dir}"], check=False)
sys.exit(1)
debug("Found repositories, proceeding...")
display_to_path = {}
display_list = []
for repo in repos:
display = str(Path(repo).relative_to(args.dev_dir)) if repo.startswith(args.dev_dir) else repo
display_list.append(display)
display_to_path[display] = repo
debug(f"Mapped {len(display_to_path)} projects")
debug("Sorting by MRU...")
sorted_list = sort_by_mru(display_list, args.dev_dir)
debug("Presenting in fuzzel...")
# Use default font to avoid subprocess call - can be overridden via env if needed
fuzzel_process = subprocess.Popen(
["fuzzel", "--prompt", "💀 Poison: ", "--dmenu", "--width", "30", "--lines", "20",
"--border-width", "2", "--background-color", "#191724ff",
"--text-color", "#e0def4ff", "--match-color", "#31748fff", "--selection-color", "#1f1d2eff",
"--selection-text-color", "#31748fff", "--selection-match-color", "#31748fff", "--prompt-color", "#f6c177ff"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True,
)
stdout, _ = fuzzel_process.communicate(input="\n".join(sorted_list))
selected_display = stdout.strip()
debug(f"Selected display: '{selected_display}'")
if not selected_display:
debug("No selection, exiting")
sys.exit(0)
selected_display_clean = selected_display.replace("", "", 1)
selected = display_to_path.get(selected_display_clean, "")
debug(f"Selected display (clean): '{selected_display_clean}'")
debug(f"Selected path: '{selected}'")
if not selected or not Path(selected).is_dir():
debug("Invalid selection, exiting")
sys.exit(0)
debug("Updating MRU...")
update_mru(selected_display_clean, args.dev_dir)
project_name = get_project_name(selected)
session_name = f"dev-{project_name}"
class_name = f"com.mzunino.dev.{project_name}"
debug(f"Project name: {project_name}")
debug(f"Session name: {session_name}")
debug(f"Class name: {class_name}")
debug("Focusing or launching alacritty...")
focus_or_launch_alacritty(class_name, project_name, selected, session_name)
if __name__ == "__main__":
main()