#!/usr/bin/env python3
"""
Headless console player runner using Playwright and the existing JS player page.

What it does:
- Starts a lightweight HTTP server to serve the repo (so the player page works with CORS).
- Launches Chromium (headless) and opens src/player/index.html.
- Injects the HLS master URL into the page and triggers playback.
- Attaches listeners to hls.js and <video> to print progress, metrics, and errors to stdout.

This runner proves real playback by validating:
- 'playing' event observed, time updates occur, video.currentTime increases, and segments are actually fetched.

Usage:
  python3 src/console_player/runner.py --manifest https://cdn.example.com/asset/hls/master.m3u8 --duration 120 --region eu-vps-1

Dependencies:
- pip install playwright
- python -m playwright install chromium
"""

import argparse
import contextlib
import json
import glob
import os
import socket
import sys
import threading
import time
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler

from playwright.sync_api import sync_playwright
import hashlib
import urllib.request
import urllib.error
import urllib.parse
from typing import Any, Dict, List, Optional, Tuple, Callable


def find_free_port() -> int:
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(("127.0.0.1", 0))
        return s.getsockname()[1]


class QuietHandler(SimpleHTTPRequestHandler):
    # Silence request logs to keep console clean
    def log_message(self, format, *args):
        pass


def start_static_server(root_dir: str, port: int, log: Optional[Callable[[str], None]] = None) -> ThreadingHTTPServer:
    os.chdir(root_dir)
    httpd = ThreadingHTTPServer(("127.0.0.1", port), QuietHandler)
    thread = threading.Thread(target=httpd.serve_forever, daemon=True)
    thread.start()
    if log:
        log(f"[console_player] Serving {root_dir} at http://127.0.0.1:{port}")
    return httpd


def parse_args():
    parser = argparse.ArgumentParser(description="Headless console HLS player (Playwright)")
    parser.add_argument("--manifest", required=True, help="HLS master.m3u8 URL")
    parser.add_argument("--duration", type=int, default=120, help="Playback duration in seconds")
    parser.add_argument("--region", type=str, default="", help="Region tag for logs")
    parser.add_argument("--root", type=str, default=".", help="Root dir to serve (project root)")
    parser.add_argument("--headful", action="store_true", help="Run with visible browser window (if available)")
    parser.add_argument("--browser", choices=["chromium", "chrome"], default="chromium", help="Browser engine/channel to use")
    parser.add_argument("--monitor-base", type=str, default="", help="Metrics monitor API base URL (e.g. https://monitor.example/)")
    parser.add_argument("--log-level", choices=["debug", "info"], default="debug", help="Logging verbosity: debug (default) or info")
    return parser.parse_args()


INJECT_SCRIPT = r"""
(() => {
  // Attach lightweight instrumentation without modifying the original file.
  const wait = (ms) => new Promise(r => setTimeout(r, ms));

  const emit = (type, payload = {}) => {
    const msg = { ts: new Date().toISOString(), tsMs: Date.now(), type, ...payload };
    // Print as a single JSON line for easy parsing.
    // eslint-disable-next-line no-console
    console.log(JSON.stringify(msg));
  };

  const setup = async () => {
    const urlParams = new URLSearchParams(window.location.search);
    const src = urlParams.get('src');
    const durationSec = parseInt(urlParams.get('duration') || '0', 10) || 0;
    const region = urlParams.get('region') || '';

    const video = document.getElementById('video');
    const input = document.getElementById('src');
    const btn = document.getElementById('load');
    if (!video || !input || !btn) {
      emit('error', { message: 'Player controls not found' });
      return;
    }

    if (src) {
      input.value = src;
      // Trigger the page's own load() logic (hls.js attach and loadSource)
      emit('start_initiated', {});
      btn.click();
    }

    // Ensure autoplay without user gesture
    try {
      video.muted = true;
      const p = video.play();
      if (p && p.catch) { p.catch(() => {}); }
    } catch (_) {}

    // Hook video events
    const vq = {
      playingOnce: false,
      lastTime: 0,
      timeUpdates: 0,
      segmentsLoaded: 0,
    };
    ['playing','pause','waiting','stalled','ended'].forEach(ev => {
      video.addEventListener(ev, () => emit('video', { event: ev, currentTime: video.currentTime, readyState: video.readyState }));
    });
    video.addEventListener('timeupdate', () => {
      vq.timeUpdates++;
      emit('progress', { currentTime: video.currentTime, readyState: video.readyState });
    });

    // Wait a bit for window._hls to be created by the page's script
    for (let i=0; i<50; i++) {
      if (window._hls) break;
      await wait(100);
    }
    if (!window._hls) {
      emit('warn', { message: 'HLS instance not found; did the page load the source?' });
      return;
    }
    const hls = window._hls;

    // Attach hls.js event listeners
    const H = window.Hls && window.Hls.Events ? window.Hls.Events : {};
    if (!H) {
      emit('warn', { message: 'Hls.Events not available' });
    }
    const on = (evt, handler) => { try { hls.on(evt, handler); } catch(_){} };

    on(H.MANIFEST_PARSED, (_e, d) => emit('manifest', { levels: (d && d.levels || []).map(l => ({height: l.height, bitrate: l.bitrate})) }));
    on(H.LEVEL_SWITCHING, (_e, d) => emit('level_switch', { id: d && d.level, bitrate: d && d.bitrate }));
    on(H.FRAG_LOADING, (_e, d) => emit('seg_fetch', { uri: d && d.frag && d.frag.url }));
    on(H.FRAG_LOADED, (_e, d) => {
      vq.segmentsLoaded++;
      // trequest -> time request started; tfirst -> first byte; tload -> full load
      const trequest = d && d.stats && d.stats.trequest;
      const tfirst = d && d.stats && d.stats.tfirst;
      const tload = d && d.stats && d.stats.tload;
      const ttfbMs = (typeof trequest === 'number' && typeof tfirst === 'number') ? (tfirst - trequest) : undefined;
      emit('seg_loaded', {
        uri: d && d.frag && d.frag.url,
        size: d && d.stats && d.stats.total,
        trequest,
        tfirst,
        tload,
        ttfbMs
      });
    });
    on(H.ERROR, (_e, d) => {
      let httpStatus = undefined;
      try {
        if (d && d.response && typeof d.response.code === 'number') httpStatus = d.response.code;
        if (d && d.response && typeof d.response.status === 'number') httpStatus = d.response.status;
      } catch (_){ }
      emit('hls_error', { fatal: d && d.fatal, type: d && d.type, details: d && d.details, httpStatus });
    });

    // Periodic status
    const start = Date.now();
    const targetMs = (durationSec > 0 ? durationSec : 0) * 1000;
    const timer = setInterval(() => {
      const elapsed = Date.now() - start;
      const proof = (vq.timeUpdates >= 3 && vq.segmentsLoaded >= 1 && video.currentTime > 0);
      emit('status', {
        elapsedMs: elapsed,
        currentTime: video.currentTime,
        buffered: (video.buffered && video.buffered.length ? video.buffered.end(video.buffered.length-1) : 0),
        segmentsLoaded: vq.segmentsLoaded,
        timeUpdates: vq.timeUpdates,
        playbackProof: proof
      });
      if (targetMs && elapsed >= targetMs) {
        clearInterval(timer);
        emit('done', { reason: 'duration_reached' });
      }
    }, 1000);
  };

  if (document.readyState === 'complete') {
    setup();
  } else {
    window.addEventListener('load', setup);
  }
})();
"""


def _build_video_id(manifest: str) -> str:
    """Derive a stable, human-light video id from manifest path or URL."""
    try:
        if manifest.startswith("http://") or manifest.startswith("https://"):
            return hashlib.sha1(manifest.encode("utf-8")).hexdigest()[:12]
        parts = manifest.split("/")
        if "out" in parts:
            idx = parts.index("out")
            if idx + 1 < len(parts):
                return parts[idx + 1]
        return hashlib.sha1(os.path.abspath(manifest).encode("utf-8")).hexdigest()[:12]
    except Exception:
        return hashlib.sha1(manifest.encode("utf-8")).hexdigest()[:12]


def _post_json(base_url: str, path: str, payload: Dict[str, Any], timeout: float = 5.0) -> Tuple[int, str]:
    """POST JSON to monitor API and return (status_code, body)."""
    url = base_url.rstrip("/") + path
    data = json.dumps(payload).encode("utf-8")
    req = urllib.request.Request(
        url,
        data=data,
        headers={"Content-Type": "application/json", "Accept": "application/json"},
        method="POST",
    )
    try:
        with urllib.request.urlopen(req, timeout=timeout) as resp:
            body = resp.read().decode("utf-8", errors="replace")
            return (resp.getcode() or 0, body)
    except urllib.error.HTTPError as e:
        try:
            body = e.read().decode("utf-8", errors="replace")
        except Exception:
            body = str(e)
        return (e.code or 0, body)
    except Exception as e:
        return (0, str(e))


def run():
    args = parse_args()

    # Ensure unbuffered output for immediate feedback
    try:
        sys.stdout.reconfigure(line_buffering=True)
        sys.stderr.reconfigure(line_buffering=True)
    except Exception:
        pass

    # Logging helpers
    def log_debug(msg: str):
        if args.log_level == "debug":
            print(msg, flush=True)

    def log_info(msg: str):
        print(msg, flush=True)

    log_info(f"[console_player] Start: manifest={args.manifest} duration={args.duration} region={args.region} browser={args.browser}")
    if args.monitor_base:
        log_info(f"[console_player] Monitor API: {args.monitor_base}")

    # Validate manifest argument early to avoid silent hangs
    def die(msg, hints=None, code=2):
        print(f"[console_player][error] {msg}", file=sys.stderr)
        if hints:
            for h in hints:
                print(f"[console_player][hint] {h}", file=sys.stderr)
        sys.exit(code)

    is_http = args.manifest.startswith("http://") or args.manifest.startswith("https://")
    if not is_http:
        rel = args.manifest.lstrip("/")
        fs_path = os.path.normpath(os.path.join(args.root, rel))
        if os.path.isdir(fs_path):
            candidates = sorted(glob.glob(os.path.join(args.root, "out", "*", "hls", "master.m3u8")))
            hints = [
                "Pass a path to a master .m3u8 file under --root.",
            ] + ["  " + os.path.relpath(c, args.root) for c in candidates[:5]]
            die(f"'--manifest' points to a directory: {args.manifest}", hints)
        if not os.path.exists(fs_path):
            candidates = sorted(glob.glob(os.path.join(args.root, "out", "*", "hls", "master.m3u8")))
            hints = [
                "Path does not exist. Examples discovered:",
            ] + ["  " + os.path.relpath(c, args.root) for c in candidates[:5]]
            die(f"Manifest not found: {args.manifest}", hints)
        if not args.manifest.endswith(".m3u8"):
            print(f"[console_player][warn] '--manifest' does not look like an HLS playlist (.m3u8): {args.manifest}", file=sys.stderr)
    else:
        if args.log_level == "debug":
            print("[console_player][warn] Using absolute URL for manifest; cross-origin requests may require CORS headers.", file=sys.stderr)

    # Start local server to host player and static files
    port = find_free_port()
    httpd = start_static_server(args.root, port, log=(log_debug if args.log_level == "debug" else None))

    try:
        with sync_playwright() as p:
            launch_kwargs = dict(
                headless=(not args.headful),
                args=[
                    "--no-sandbox",
                    "--disable-gpu",
                    "--autoplay-policy=no-user-gesture-required",
                    "--mute-audio",
                    "--disable-dev-shm-usage",
                ],
            )
            if args.browser == "chrome":
                # Use Google Chrome channel (needs: `playwright install chrome` and Chrome present)
                launch_kwargs["channel"] = "chrome"
            browser = p.chromium.launch(**launch_kwargs)
            context = browser.new_context(
                user_agent="console-player/1.0",
                viewport={"width": 1280, "height": 720},
                java_script_enabled=True,
            )
            page = context.new_page()

            # Aggregation state for metrics
            agg_state: Dict[str, Any] = {
                "start_ts_ms": None,
                "first_playing_ts_ms": None,
                "rebuffering": False,
                "rebuffer_start_ts_ms": None,
                "rebuffer_events": 0,
                "rebuffer_time_ms": 0.0,
                "bitrate_samples": [],  # List[Tuple[float, int]] (ts_ms, bitrate)
                "ttfb_ms_samples": [],
                "first_segment_ttfb_ms": None,
                "last_event_ts_ms": None,
            }

            def _on_event(obj: Dict[str, Any]):
                ts_ms = obj.get("tsMs")
                if isinstance(ts_ms, (int, float)):
                    agg_state["last_event_ts_ms"] = float(ts_ms)

                typ = obj.get("type")
                if typ == "start_initiated":
                    agg_state["start_ts_ms"] = float(ts_ms or 0.0)
                elif typ == "video":
                    ev = obj.get("event")
                    if ev == "playing" and agg_state["first_playing_ts_ms"] is None and isinstance(ts_ms, (int, float)):
                        agg_state["first_playing_ts_ms"] = float(ts_ms)
                        if agg_state["rebuffering"] and agg_state["rebuffer_start_ts_ms"] is not None:
                            agg_state["rebuffer_time_ms"] += max(0.0, float(ts_ms) - float(agg_state["rebuffer_start_ts_ms"]))
                            agg_state["rebuffering"] = False
                            agg_state["rebuffer_start_ts_ms"] = None
                    if ev in ("waiting", "stalled") and isinstance(ts_ms, (int, float)):
                        if not agg_state["rebuffering"]:
                            agg_state["rebuffering"] = True
                            agg_state["rebuffer_start_ts_ms"] = float(ts_ms)
                    if ev == "playing" and isinstance(ts_ms, (int, float)):
                        if agg_state["rebuffering"] and agg_state["rebuffer_start_ts_ms"] is not None:
                            agg_state["rebuffer_events"] += 1
                            agg_state["rebuffer_time_ms"] += max(0.0, float(ts_ms) - float(agg_state["rebuffer_start_ts_ms"]))
                            agg_state["rebuffering"] = False
                            agg_state["rebuffer_start_ts_ms"] = None
                elif typ == "level_switch":
                    br = obj.get("bitrate")
                    if isinstance(br, (int, float)) and isinstance(ts_ms, (int, float)):
                        agg_state["bitrate_samples"].append((float(ts_ms), int(br)))
                elif typ == "seg_loaded":
                    ttfb_ms = obj.get("ttfbMs")
                    uri = obj.get("uri") or ""
                    if isinstance(ttfb_ms, (int, float)) and "init" not in uri:
                        agg_state["ttfb_ms_samples"].append(float(ttfb_ms))
                        if agg_state["first_segment_ttfb_ms"] is None:
                            agg_state["first_segment_ttfb_ms"] = float(ttfb_ms)
                elif typ == "hls_error" and args.monitor_base:
                    http_status = obj.get("httpStatus")
                    if isinstance(http_status, int):
                        payload = {
                            "region": args.region or "",
                            "playerVersion": "console-player/1.0",
                            "videoId": _build_video_id(args.manifest),
                            "httpStatus": int(http_status),
                        }
                        code, body = _post_json(args.monitor_base, "/ingest/segment_error", payload)
                        print(f"[console_player] segment_error posted: {code} {body[:200]}", flush=True)

            # Forward console logs
            def on_console(msg):
                try:
                    # Try to pretty print JSON lines from the injected script
                    text_attr = getattr(msg, "text", None)
                    text = text_attr if isinstance(text_attr, str) else (text_attr() if callable(text_attr) else str(msg))
                    if text.startswith("{") and text.endswith("}"):
                        obj = json.loads(text)
                        # feed aggregation
                        if isinstance(obj, dict) and obj.get("type"):
                            _on_event(obj)
                        if args.log_level == "debug":
                            print(json.dumps(obj, ensure_ascii=False), flush=True)
                    else:
                        if args.log_level == "debug":
                            print(text, flush=True)
                except Exception as e:
                    # Fallback: dump raw representation to avoid recursion errors
                    if args.log_level == "debug":
                        try:
                            print(str(msg), flush=True)
                        except Exception:
                            print(f"[console_player] console message parse error: {e}", flush=True)

            page.on("console", on_console)

            # Add our instrumentation before any page script runs
            page.add_init_script(INJECT_SCRIPT)

            # Build player URL with parameters
            player_url = f"http://127.0.0.1:{port}/src/player/index.html?src={args.manifest}&duration={args.duration}"
            if args.region:
                player_url += f"&region={args.region}"

            # For visibility print the resolved manifest URL the page will use
            if is_http:
                resolved_manifest = args.manifest
            else:
                resolved_manifest = f"http://127.0.0.1:{port}/" + args.manifest.lstrip("/")
            log_debug(f"[console_player] Opening {player_url}")
            log_debug(f"[console_player] Resolved manifest URL: {resolved_manifest}")
            page.goto(player_url, wait_until="load", timeout=60_000)

            # Ensure playback starts (mute + play)
            page.evaluate("""
                (async () => {
                  const v = document.getElementById('video');
                  if (v) { v.muted = true; try { const p = v.play(); if (p) await p.catch(()=>{}); } catch(_){} }
                })();
            """)

            # Extra diagnostics: log page errors and failed requests
            def on_page_error(err):
                try:
                    print(f"[console_player][pageerror] {err}", flush=True)
                except Exception:
                    pass

            def on_request_failed(req):
                try:
                    failure = req.failure()
                    print(f"[console_player][requestfailed] {req.url} reason={failure and failure.get('errorText')}", flush=True)
                except Exception:
                    pass

            if args.log_level == "debug":
                page.on("pageerror", on_page_error)
                page.on("requestfailed", on_request_failed)

            # Wait until we get a proof of playback or timeout
            start_time = time.time()
            proof = False
            startup_deadline = max(15, min(60, args.duration))
            while time.time() - start_time < startup_deadline:
                # Check for playback proof flag emitted by our script via window state
                state = page.evaluate("""
                    (() => {
                      const v = document.getElementById('video');
                      return v ? { ct: v.currentTime, rs: v.readyState } : null;
                    })();
                """)
                if state and (state.get('ct', 0) or 0) > 0:
                    proof = True
                    break
                time.sleep(0.5)

            if not proof:
                err_hints = []
                if not is_http and not args.manifest.endswith('.m3u8'):
                    err_hints.append("Use a .m3u8 master playlist path, e.g. out/<asset>/hls/master.m3u8")
                if is_http:
                    err_hints.append("If the CDN origin differs, ensure CORS allows XHR from http://127.0.0.1:PORT")
                err_hints.append("Verify the path exists and is reachable in a browser.")
                print("[console_player][error] Failed to start playback within startup deadline.", file=sys.stderr, flush=True)
                for h in err_hints:
                    if args.log_level == "debug":
                        print(f"[console_player][hint] {h}", file=sys.stderr, flush=True)
                # post start_error if configured
                if args.monitor_base:
                    payload = {
                        "region": args.region or "",
                        "playerVersion": "console-player/1.0",
                        "videoId": _build_video_id(args.manifest),
                        "error": "startup_timeout"
                    }
                    code, body = _post_json(args.monitor_base, "/ingest/start_error", payload)
                    if args.log_level == "debug":
                        print(f"[console_player] POST /ingest/start_error url={args.monitor_base.rstrip('/')+'/ingest/start_error'} payload={json.dumps(payload)} status={code} body={body[:200]}", flush=True)
                    else:
                        print(f"[console_player] POST /ingest/start_error status={code}", flush=True)
            if args.log_level == "debug":
                print(f"[console_player] Playback proof: {'YES' if proof else 'NO'} (currentTime>0 observed)", flush=True)

            # Keep session running for requested duration; use page.wait_for_timeout so Playwright
            # continues to dispatch console/network events (time.sleep would block the dispatcher).
            remaining_ms = (max(0, args.duration - int(time.time() - start_time)) + 2) * 1000
            if remaining_ms > 0:
                page.wait_for_timeout(remaining_ms)

            # Signal done and close
            # Post QoE/TTFB if configured
            if args.monitor_base:
                # StartEvent if proof
                if proof:
                    join_ms = 0.0
                    if agg_state.get("start_ts_ms") is not None and agg_state.get("first_playing_ts_ms") is not None:
                        join_ms = max(0.0, float(agg_state["first_playing_ts_ms"]) - float(agg_state["start_ts_ms"]))
                    payload = {
                        "region": args.region or "",
                        "playerVersion": "console-player/1.0",
                        "videoId": _build_video_id(args.manifest),
                        "joinTimeSeconds": float(join_ms) / 1000.0,
                    }
                code, body = _post_json(args.monitor_base, "/ingest/start", payload)
                if args.log_level == "debug":
                    print(f"[console_player] POST /ingest/start url={args.monitor_base.rstrip('/')+'/ingest/start'} payload={json.dumps(payload)} status={code} body={body[:200]}", flush=True)
                else:
                    print(f"[console_player] POST /ingest/start status={code}", flush=True)

                # finalize session timestamps
                end_ts_ms = agg_state.get("last_event_ts_ms") or (time.time() * 1000.0)
                start_play_ms = agg_state.get("first_playing_ts_ms") or end_ts_ms
                session_seconds = max(0.0, (float(end_ts_ms) - float(start_play_ms)) / 1000.0)

                # average bitrate (time-weighted)
                avg_bitrate = 0
                samples = agg_state.get("bitrate_samples") or []
                if samples:
                    total_w = 0.0
                    total = 0.0
                    for i, (t0, br) in enumerate(samples):
                        t1 = float(end_ts_ms) if i == len(samples) - 1 else float(samples[i+1][0])
                        if t1 > t0:
                            total_w += (t1 - t0)
                            total += (t1 - t0) * float(br)
                    if total_w > 0.0:
                        avg_bitrate = int(total / total_w)

                qoe_payload = {
                    "region": args.region or "",
                    "playerVersion": "console-player/1.0",
                    "videoId": _build_video_id(args.manifest),
                    "rebufferEvents": int(agg_state.get("rebuffer_events") or 0),
                    "rebufferTimeSeconds": float(agg_state.get("rebuffer_time_ms") or 0.0) / 1000.0,
                    "averageBitrateBps": int(avg_bitrate),
                    "sessionDurationSeconds": float(session_seconds),
                }
                code, body = _post_json(args.monitor_base, "/ingest/qoe", qoe_payload)
                if args.log_level == "debug":
                    print(f"[console_player] POST /ingest/qoe url={args.monitor_base.rstrip('/')+'/ingest/qoe'} payload={json.dumps(qoe_payload)} status={code} body={body[:200]}", flush=True)
                else:
                    print(f"[console_player] POST /ingest/qoe status={code}", flush=True)

                # TTFB summary
                ttfb_samples = [float(x) for x in (agg_state.get("ttfb_ms_samples") or [])]
                def _pct(values: List[float], p: float) -> float:
                    if not values:
                        return 0.0
                    s = sorted(values)
                    k = (len(s) - 1) * p
                    f = int(k)
                    c = min(f + 1, len(s) - 1)
                    if f == c:
                        return s[f]
                    return s[f] * (c - k) + s[c] * (k - f)

                first_ttfb = float(agg_state.get("first_segment_ttfb_ms") or (ttfb_samples[0] if ttfb_samples else 0.0))
                p50 = _pct(ttfb_samples, 0.5)
                p95 = _pct(ttfb_samples, 0.95)
                ttfb_payload = {
                    "region": args.region or "",
                    "playerVersion": "console-player/1.0",
                    "videoId": _build_video_id(args.manifest),
                    "firstSegmentTtfbMs": float(first_ttfb),
                    "segmentTtfbMsP50": float(p50),
                    "segmentTtfbMsP95": float(p95),
                    "samplesCount": int(len(ttfb_samples)),
                }
                code, body = _post_json(args.monitor_base, "/ingest/ttfb_summary", ttfb_payload)
                if args.log_level == "debug":
                    print(f"[console_player] POST /ingest/ttfb_summary url={args.monitor_base.rstrip('/')+'/ingest/ttfb_summary'} payload={json.dumps(ttfb_payload)} status={code} body={body[:200]}", flush=True)
                else:
                    print(f"[console_player] POST /ingest/ttfb_summary status={code}", flush=True)

            log_info("[console_player] Finished")
            context.close()
            browser.close()
    finally:
        with contextlib.suppress(Exception):
            httpd.shutdown()
            print("[console_player] Server stopped", flush=True)


if __name__ == "__main__":
    run()


