Add WebSocket support and content command to faf script

- Add WebSocket connection functions for Firefox Remote Debugging Protocol
- Implement CDP Runtime.evaluate protocol communication
- Add 'content' command to fetch HTML from tabs via WebSocket
- Add get_tab_html() function to orchestrate HTML retrieval
- Increase connection timeout for RDP requests
This commit is contained in:
DannyDannyDanny 2026-02-01 15:03:42 +01:00
parent b2526ee952
commit 2253bdde5d

View file

@ -5,11 +5,13 @@ Works with Firefox installed via Nix/Home Manager.
Usage:
faf [method] [rdp_port]
faf content <tab-id> [rdp_port]
Methods:
session, s - Read from Firefox session files (default)
rdp, r - Use Remote Debugging Protocol
both, b - Try both methods
content, c - Get HTML content from a tab via WebSocket
"""
import json
@ -17,10 +19,17 @@ import sys
import os
import struct
import subprocess
import asyncio
from pathlib import Path
from urllib.request import urlopen
from urllib.error import URLError
try:
import websockets
WEBSOCKETS_AVAILABLE = True
except ImportError:
WEBSOCKETS_AVAILABLE = False
# Try to find Nix Python with lz4 if available
def find_nix_python():
"""Try to find a Nix Python with lz4 library."""
@ -355,7 +364,7 @@ def get_tabs_via_rdp(port=6000):
url = f"http://localhost:{port}/json/list"
try:
with urlopen(url, timeout=2) as response:
with urlopen(url, timeout=10) as response:
data = json.loads(response.read())
return data
except URLError:
@ -399,6 +408,139 @@ def get_tabs_via_rdp(port=6000):
return None
def get_tab_websocket_url(tab_id, port=6000):
"""Get the WebSocket URL for a specific tab ID."""
tabs = get_tabs_via_rdp(port)
if not tabs:
return None
# Tab ID can be either string or number, so we need to handle both
tab_id_str = str(tab_id)
for tab in tabs:
# Compare as strings to handle both numeric and string IDs
if str(tab.get('id', '')) == tab_id_str:
websocket_url = tab.get('webSocketDebuggerUrl')
if websocket_url:
return websocket_url
else:
# Tab exists but is not debuggable (e.g., about: pages)
return None
return None
async def execute_javascript(websocket, code, timeout=10):
"""Execute JavaScript code via CDP Runtime.evaluate and return the result."""
request_id = 1
message = {
"id": request_id,
"method": "Runtime.evaluate",
"params": {
"expression": code,
"returnByValue": True
}
}
try:
# Send the request
await websocket.send(json.dumps(message))
# Wait for response with timeout
response_str = await asyncio.wait_for(websocket.recv(), timeout=timeout)
response = json.loads(response_str)
# Check if this is the response to our request
if response.get('id') == request_id:
if 'error' in response:
error = response['error']
raise Exception(f"JavaScript execution error: {error.get('message', 'Unknown error')}")
if 'result' in response:
result = response['result']
if 'value' in result:
return result['value']
elif 'result' in result:
# Sometimes the value is nested
return result.get('result')
# If we got a different message, it might be an event or other response
# Try to wait for the actual response
while True:
try:
response_str = await asyncio.wait_for(websocket.recv(), timeout=2)
response = json.loads(response_str)
if response.get('id') == request_id:
if 'error' in response:
error = response['error']
raise Exception(f"JavaScript execution error: {error.get('message', 'Unknown error')}")
if 'result' in response:
result = response['result']
if 'value' in result:
return result['value']
except asyncio.TimeoutError:
break
raise Exception("No valid response received")
except asyncio.TimeoutError:
raise Exception(f"Timeout waiting for JavaScript execution response (>{timeout}s)")
except json.JSONDecodeError as e:
raise Exception(f"Invalid JSON response: {e}")
async def get_tab_html_async(tab_id, port=6000):
"""Connect to a tab via WebSocket and retrieve its HTML content."""
if not WEBSOCKETS_AVAILABLE:
raise Exception("websockets library not available. Install it with: pip install websockets")
# Get the WebSocket URL for this tab
websocket_url = get_tab_websocket_url(tab_id, port)
if not websocket_url:
# Check if tab exists but just isn't debuggable
tabs = get_tabs_via_rdp(port)
if tabs:
tab_id_str = str(tab_id)
for tab in tabs:
if str(tab.get('id', '')) == tab_id_str:
# Tab exists but has no WebSocket URL (not debuggable)
raise Exception(f"Tab {tab_id} exists but is not debuggable (e.g., about: pages)")
raise Exception(f"Tab {tab_id} not found. Use 'faf rdp' to list available tabs.")
try:
# Connect to the WebSocket
async with websockets.connect(websocket_url, timeout=5) as websocket:
# Execute JavaScript to get the HTML
html = await execute_javascript(websocket, "document.documentElement.outerHTML")
return html
except websockets.exceptions.InvalidURI:
raise Exception(f"Invalid WebSocket URL: {websocket_url}")
except websockets.exceptions.ConnectionClosed:
raise Exception(f"WebSocket connection closed unexpectedly")
except asyncio.TimeoutError:
raise Exception(f"Timeout connecting to tab {tab_id}")
except Exception as e:
raise Exception(f"Error retrieving HTML from tab {tab_id}: {e}")
def get_tab_html(tab_id, port=6000):
"""Synchronous wrapper for get_tab_html_async."""
try:
# Try to get the existing event loop
loop = asyncio.get_event_loop()
if loop.is_running():
# If loop is running, we need to use a different approach
# Create a new event loop in a thread
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(asyncio.run, get_tab_html_async(tab_id, port))
return future.result()
else:
return loop.run_until_complete(get_tab_html_async(tab_id, port))
except RuntimeError:
# No event loop, create a new one
return asyncio.run(get_tab_html_async(tab_id, port))
def print_tabs(tabs_info, method="session"):
"""Print tab information in a readable format."""
if not tabs_info:
@ -422,8 +564,34 @@ def print_tabs(tabs_info, method="session"):
def main():
method = sys.argv[1] if len(sys.argv) > 1 else "session"
rdp_port = int(sys.argv[2]) if len(sys.argv) > 2 and sys.argv[1] in ["rdp", "r", "both", "b"] else 6000
if len(sys.argv) < 2:
method = "session"
rdp_port = 6000
else:
method = sys.argv[1]
# Handle content command
if method in ["content", "c"]:
if len(sys.argv) < 3:
print(f"{RED}Error: tab-id required for content command{NC}", file=sys.stderr)
print("Usage: faf content <tab-id> [rdp_port]", file=sys.stderr)
print("Example: faf content 123", file=sys.stderr)
print("Example: faf content 123 9222", file=sys.stderr)
sys.exit(1)
tab_id = sys.argv[2]
rdp_port = int(sys.argv[3]) if len(sys.argv) > 3 else 6000
try:
html = get_tab_html(tab_id, rdp_port)
# Output raw HTML to stdout (for piping to other tools)
print(html)
except Exception as e:
print(f"{RED}Error: {e}{NC}", file=sys.stderr)
sys.exit(1)
return
# Handle other commands
rdp_port = int(sys.argv[2]) if len(sys.argv) > 2 and sys.argv[1] in ["rdp", "r", "both", "b"] else 6000
if method in ["session", "s"]:
print(f"{GREEN}Getting tabs from session files...{NC}")
@ -456,11 +624,13 @@ def main():
else:
print("Usage: faf [method] [rdp_port]")
print(" faf content <tab-id> [rdp_port]")
print("")
print("Methods:")
print(" session, s - Read from Firefox session files (default)")
print(" rdp, r - Use Remote Debugging Protocol")
print(" both, b - Try both methods")
print(" content, c - Get HTML content from a tab")
print("")
print("Examples:")
print(" faf # Use session files (default)")
@ -468,6 +638,8 @@ def main():
print(" faf rdp # Use RDP on default port 6000")
print(" faf rdp 9222 # Use RDP on port 9222")
print(" faf both # Try both methods")
print(" faf content 123 # Get HTML from tab 123 (port 6000)")
print(" faf content 123 9222 # Get HTML from tab 123 (port 9222)")
sys.exit(1)