306 lines
15 KiB
Python
306 lines
15 KiB
Python
import sys
|
|
import os
|
|
import tempfile
|
|
import subprocess
|
|
import threading
|
|
import queue
|
|
import logging
|
|
import time
|
|
|
|
from pynput.keyboard import Key, Listener # Directly imported as it's used here
|
|
from model.constants import STOP_KEY, AHK_LOG_MONITOR_INTERVAL, AHK_TEMPLATE_REL_PATH_FROM_MODEL_DIR, AHK_EXECUTABLE_FALLBACK_PATHS, AUTOHOTKEY_KEY_MAP
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class AHKProcessManager:
|
|
"""
|
|
Manages the AutoHotkey script lifecycle: generation, launching,
|
|
monitoring, stopping, and cleanup of temporary files.
|
|
Also handles the pynput keyboard listener for global stop key detection.
|
|
"""
|
|
def __init__(self, ahk_template_rel_path, ahk_executable_fallback_paths, autohotkey_key_map, stop_key, message_queue):
|
|
self._ahk_template_rel_path = ahk_template_rel_path
|
|
self._ahk_executable_fallback_paths = ahk_executable_fallback_paths
|
|
self._autohotkey_key_map = autohotkey_key_map
|
|
self._stop_key = stop_key
|
|
self._message_queue = message_queue # Central message queue for GUI communication
|
|
|
|
self._ahk_process = None
|
|
self._ahk_script_temp_file = None
|
|
self._ahk_log_temp_file = None
|
|
self._ahk_log_file_last_read_pos = 0
|
|
self._keep_running = False # Flag to control monitoring and listener threads
|
|
|
|
self._current_target_window_title = ""
|
|
self._current_key_to_press_str = ''
|
|
self._ahk_stopped_callback = None # Callback to notify KeyPressModel (and then Controller)
|
|
|
|
logger.info("AHKProcessManager: Initialized.")
|
|
|
|
def register_ahk_stopped_callback(self, callback_func):
|
|
"""Registers a callback function to be called when the AHK script stops externally."""
|
|
self._ahk_stopped_callback = callback_func
|
|
logger.info("AHKProcessManager: AHK stopped callback registered.")
|
|
|
|
def _trigger_ahk_stopped_callback(self):
|
|
"""Internal method to trigger the registered callback, if any."""
|
|
if self._ahk_stopped_callback:
|
|
# The callback is expected to be a method of KeyPressModel,
|
|
# which then might schedule a call to the Controller on the main thread.
|
|
self._ahk_stopped_callback()
|
|
logger.info("AHKProcessManager: AHK stopped callback triggered.")
|
|
else:
|
|
logger.warning("AHKProcessManager: AHK stopped, but no callback was registered.")
|
|
|
|
def set_target_window(self, title):
|
|
"""Sets the target window title for AHK operations."""
|
|
self._current_target_window_title = title
|
|
|
|
def set_key_to_press(self, key_str):
|
|
"""Sets the key string to be pressed by AHK."""
|
|
self._current_key_to_press_str = key_str
|
|
|
|
def _get_autohotkey_key_name(self, user_input_key):
|
|
"""Converts a user-friendly key input to its AutoHotkey v2 equivalent."""
|
|
normalized_input = user_input_key.lower().strip()
|
|
mapped_key = self._autohotkey_key_map.get(normalized_input)
|
|
if mapped_key:
|
|
return mapped_key
|
|
if len(user_input_key) == 1 and user_input_key.isalnum():
|
|
return user_input_key
|
|
return user_input_key
|
|
|
|
def start_script(self):
|
|
"""
|
|
Generates and launches the AutoHotkey script as a subprocess.
|
|
Monitors its log file and starts pynput listener in separate threads.
|
|
"""
|
|
logger.info("AHKProcessManager: start_script: Function entry.")
|
|
|
|
if self._ahk_process and self._ahk_process.poll() is None:
|
|
logger.warning("AHKProcessManager: AutoHotkey script is already running. Exiting function.")
|
|
return True
|
|
|
|
resolved_autohotkey_exe_path = None
|
|
|
|
try:
|
|
# --- Determine the correct path to template.ahk ---
|
|
if getattr(sys, 'frozen', False): # Running as a PyInstaller executable
|
|
template_path = os.path.join(sys._MEIPASS, "resources", "template.ahk")
|
|
logger.info(f"AHKProcessManager: Running as EXE. Template path: {template_path}")
|
|
else: # Running as a Python script (from project_root/model/)
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
# Construct path relative to ahk_process_manager.py to reach project_root/resources/template.ahk
|
|
template_path = os.path.join(script_dir, self._ahk_template_rel_path)
|
|
logger.info(f"AHKProcessManager: Running as script. Template path: {template_path}")
|
|
|
|
logger.info(f"AHKProcessManager: Attempting to read AHK template from: {template_path}")
|
|
if not os.path.exists(template_path):
|
|
logger.error(f"AHKProcessManager: AHK template file not found at '{template_path}'.")
|
|
return False
|
|
|
|
with open(template_path, 'r') as f:
|
|
ahk_template_content = f.read()
|
|
logger.info(f"AHKProcessManager: AHK template loaded successfully from {template_path}.")
|
|
|
|
logger.debug("AHKProcessManager: Creating temporary files for AHK script and log.")
|
|
fd_script, self._ahk_script_temp_file = tempfile.mkstemp(suffix=".ahk", prefix="jarvis_ahk_")
|
|
fd_log, self._ahk_log_temp_file = tempfile.mkstemp(suffix=".log", prefix="jarvis_ahk_log_")
|
|
|
|
os.close(fd_script)
|
|
os.close(fd_log)
|
|
|
|
self._ahk_log_file_last_read_pos = 0
|
|
|
|
target_win_ahk = self._current_target_window_title
|
|
key_to_press_ahk = self._get_autohotkey_key_name(self._current_key_to_press_str)
|
|
stop_key_ahk_name = self._get_autohotkey_key_name(str(self._stop_key.name))
|
|
|
|
logger.info(f"AHKProcessManager: Target window for AHK: '{target_win_ahk}'")
|
|
logger.info(f"AHKProcessManager: Key to press for AHK: '{key_to_press_ahk}'")
|
|
logger.info(f"AHKProcessManager: Stop key for AHK: '{stop_key_ahk_name}'")
|
|
|
|
ahk_script_content = ahk_template_content.format(
|
|
TARGET_WINDOW_TITLE_PLACEHOLDER=target_win_ahk,
|
|
KEY_TO_PRESS_AHK_PLACEHOLDER=key_to_press_ahk,
|
|
STOP_KEY_AHK_PLACEHOLDER=stop_key_ahk_name,
|
|
AHK_LOG_FILE_PLACEHOLDER=self._ahk_log_temp_file.replace('\\', '/')
|
|
)
|
|
|
|
logger.debug(f"AHKProcessManager: Writing AHK script content to: {self._ahk_script_temp_file}")
|
|
with open(self._ahk_script_temp_file, 'w') as f:
|
|
f.write(ahk_script_content)
|
|
logger.debug(f"AHKProcessManager: AutoHotkey script successfully written.")
|
|
logger.debug(f"AHKProcessManager: AutoHotkey logging will occur in: {self._ahk_log_temp_file}")
|
|
|
|
# --- Determine the correct path to AutoHotkey.exe ---
|
|
if getattr(sys, 'frozen', False):
|
|
bundle_dir = sys._MEIPASS
|
|
ahk_bundled_locations = [
|
|
os.path.join(bundle_dir, "AutoHotkey.exe"),
|
|
os.path.join(bundle_dir, "AutoHotkey", "AutoHotkey.exe"),
|
|
os.path.join(bundle_dir, "v2", "AutoHotkey.exe"),
|
|
]
|
|
|
|
for loc in ahk_bundled_locations:
|
|
if os.path.exists(loc):
|
|
resolved_autohotkey_exe_path = loc
|
|
logger.debug(f"AHKProcessManager: Found bundled AutoHotkey.exe at: {resolved_autohotkey_exe_path}")
|
|
break
|
|
|
|
if resolved_autohotkey_exe_path is None:
|
|
logger.warning("AHKProcessManager: Could not find AutoHotkey.exe within the PyInstaller bundle. Falling back to common install paths.")
|
|
|
|
if resolved_autohotkey_exe_path is None:
|
|
for path in self._ahk_executable_fallback_paths:
|
|
if os.path.exists(path):
|
|
resolved_autohotkey_exe_path = path
|
|
logger.debug(f"AHKProcessManager: Found AutoHotkey.exe at configured path: {resolved_autohotkey_exe_path}")
|
|
break
|
|
|
|
if resolved_autohotkey_exe_path is None:
|
|
logger.error("AHKProcessManager: AutoHotkey.exe not found in any known location or system PATH. Cannot launch AHK script.")
|
|
raise FileNotFoundError("AutoHotkey.exe not found. Please ensure it's installed or specify its path.")
|
|
|
|
command = [resolved_autohotkey_exe_path, self._ahk_script_temp_file]
|
|
logger.info(f"AHKProcessManager: Launching AutoHotkey command: {' '.join(command)}")
|
|
|
|
creationflags = subprocess.DETACHED_PROCESS if os.name == 'nt' else 0
|
|
|
|
self._ahk_process = subprocess.Popen(command, creationflags=creationflags)
|
|
logger.info(f"AHKProcessManager: AutoHotkey script launched successfully with PID: {self._ahk_process.pid}")
|
|
|
|
self._keep_running = True
|
|
logger.debug("AHKProcessManager: Waiting 0.5 seconds for AHK to initialize.")
|
|
time.sleep(0.5)
|
|
|
|
logger.debug("AHKProcessManager: Launching AHK log monitoring thread.")
|
|
threading.Thread(target=self._monitor_ahk_log, daemon=True).start()
|
|
logger.debug("AHKProcessManager: AHK log monitoring thread launched.")
|
|
|
|
return True
|
|
except FileNotFoundError as e:
|
|
logger.error(f"AHKProcessManager: FileNotFoundError: {e}", exc_info=True)
|
|
self._trigger_ahk_stopped_callback()
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"AHKProcessManager: An unexpected error occurred while launching AutoHotkey script: {e}", exc_info=True)
|
|
self._trigger_ahk_stopped_callback()
|
|
return False
|
|
finally:
|
|
logger.info("AHKProcessManager: start_script: Function exit.")
|
|
|
|
def stop_script(self):
|
|
"""
|
|
Terminates the running AutoHotkey script process and cleans up temporary files.
|
|
Always attempts to stop the process and clean up, regardless of previous state.
|
|
"""
|
|
logger.info("AHKProcessManager: stop_script: Function entry.")
|
|
self._keep_running = False
|
|
|
|
if self._ahk_process and self._ahk_process.poll() is None:
|
|
logger.info("AHKProcessManager: Terminating AutoHotkey script process.")
|
|
try:
|
|
self._ahk_process.terminate()
|
|
self._ahk_process.wait(timeout=2)
|
|
if self._ahk_process.poll() is None:
|
|
self._ahk_process.kill()
|
|
logger.info("AHKProcessManager: AutoHotkey process forcefully killed.")
|
|
else:
|
|
logger.info("AHKProcessManager: AutoHotkey process terminated gracefully.")
|
|
except Exception as e:
|
|
logger.error(f"AHKProcessManager: Error terminating AutoHotkey process: {e}", exc_info=True)
|
|
finally:
|
|
self._ahk_process = None
|
|
else:
|
|
logger.info("AHKProcessManager: AutoHotkey script was not running or already terminated.")
|
|
|
|
self._clean_ahk_temp_files()
|
|
logger.info("AHKProcessManager: stop_script: Function exit.")
|
|
self._trigger_ahk_stopped_callback()
|
|
|
|
def _monitor_ahk_log(self):
|
|
"""Continuously reads the AutoHotkey script's log file and puts new lines into the message queue."""
|
|
logger.info("AHKProcessManager: _monitor_ahk_log: Started monitoring AHK log file.")
|
|
time.sleep(1)
|
|
while self._keep_running and self._ahk_log_temp_file and os.path.exists(self._ahk_log_temp_file):
|
|
try:
|
|
if self._ahk_process and self._ahk_process.poll() is not None:
|
|
logger.warning("AHKProcessManager: AHK process unexpectedly terminated. Signaling stop.")
|
|
self._keep_running = False
|
|
self._trigger_ahk_stopped_callback()
|
|
break
|
|
|
|
with open(self._ahk_log_temp_file, 'r') as f:
|
|
f.seek(self._ahk_log_file_last_read_pos)
|
|
new_lines = f.readlines()
|
|
for line in new_lines:
|
|
self._message_queue.put(f"[AHK] {line.strip()}")
|
|
self._ahk_log_file_last_read_pos = f.tell()
|
|
except FileNotFoundError:
|
|
logger.warning("AHKProcessManager: _monitor_ahk_log: AHK log file not found, it might have been deleted or not created yet. Retrying...")
|
|
except Exception as e:
|
|
logger.error(f"AHKProcessManager: _monitor_ahk_log: Error reading AHK log file: {e}", exc_info=True)
|
|
time.sleep(AHK_LOG_MONITOR_INTERVAL)
|
|
logger.info("AHKProcessManager: _monitor_ahk_log: Stopped monitoring AHK log file.")
|
|
if not self._keep_running:
|
|
self.stop_script()
|
|
|
|
|
|
def _clean_ahk_temp_files(self):
|
|
"""Removes temporary AutoHotkey script and log files."""
|
|
logger.info("AHKProcessManager: _clean_ahk_temp_files: Attempting to clean up temporary AHK files.")
|
|
if self._ahk_script_temp_file and os.path.exists(self._ahk_script_temp_file):
|
|
try:
|
|
os.remove(self._ahk_script_temp_file)
|
|
logger.info(f"AHKProcessManager: Removed temporary AutoHotkey script: {self._ahk_script_temp_file}")
|
|
except Exception as e:
|
|
logger.error(f"AHKProcessManager: Failed to remove temporary AHK script file: {e}", exc_info=True)
|
|
self._ahk_script_temp_file = None
|
|
|
|
if self._ahk_log_temp_file and os.path.exists(self._ahk_log_temp_file):
|
|
try:
|
|
os.remove(self._ahk_log_temp_file)
|
|
logger.info(f"AHKProcessManager: Removed temporary AutoHotkey log: {self._ahk_log_temp_file}")
|
|
except Exception as e:
|
|
logger.error(f"AHKProcessManager: Failed to remove temporary AHK log file: {e}", exc_info=True)
|
|
self._ahk_log_temp_file = None
|
|
|
|
def _on_pynput_key_listener(self, key):
|
|
"""Callback for pynput keyboard listener to detect STOP_KEY."""
|
|
try:
|
|
if key == self._stop_key:
|
|
logger.info(f"AHKProcessManager: Stop key pressed. Signaling script to stop.")
|
|
self._keep_running = False
|
|
self._trigger_ahk_stopped_callback()
|
|
return False
|
|
except AttributeError: # Handle special keys which don't have a .char attribute (e.g., F-keys)
|
|
if key == self._stop_key:
|
|
logger.info(f"AHKProcessManager: '{self._stop_key}' pressed. Signaling script to stop.")
|
|
self._keep_running = False
|
|
self._trigger_ahk_stopped_callback()
|
|
return False
|
|
|
|
def start_pynput_listener(self):
|
|
"""
|
|
Starts the pynput keyboard listener thread.
|
|
This method is a public interface to initiate the internal listener thread.
|
|
"""
|
|
logger.info("AHKProcessManager: Public start_pynput_listener called. Initiating internal listener thread.")
|
|
threading.Thread(target=self._start_pynput_listener_thread_internal, daemon=True).start()
|
|
|
|
def _start_pynput_listener_thread_internal(self):
|
|
"""
|
|
Internal method to start the pynput keyboard listener.
|
|
This method is meant to be run in its own daemon thread.
|
|
"""
|
|
logger.info("AHKProcessManager: pynput listener thread started. Waiting for stop key press.")
|
|
try:
|
|
with Listener(on_press=self._on_pynput_key_listener) as listener:
|
|
listener.join()
|
|
except Exception as e:
|
|
logger.error(f"AHKProcessManager: Error in pynput listener thread: {e}", exc_info=True)
|
|
logger.info("AHKProcessManager: pynput listener thread stopped.")
|
|
|