#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2026 LINBIT HA-Solutions GmbH
# Author: Lars Ellenberg
#
# tunl -- LINBIT support CLI for managing linbit-tunl sessions
#
# Usage: tunl.py <list|show|connect|proxy|expect> [args]
# Requires: Python 3.6+, SSH access to the relay as the support user.
#
# This script runs on support-controlled systems where python3 is known
# to be available at a standard path.

import contextlib
import json
import os
import re
import select
import shlex
import shutil
import socket
import subprocess
import sys
import tempfile
import termios
import tty
import threading
import time
import urllib.error
import urllib.request
from pathlib import Path

# --- BEGIN INLINE tunl_trace --------------------------------------------------
# Everything between the BEGIN/END markers is the inlineable tracer body.
# It is copied verbatim into linbit-tunl.py and tunl.py so those single-file
# deliverables stay single-file.  tests/test_inline_tracer.py asserts the
# copies match.  If you need to change tracer behaviour, edit this file and
# re-run `make sync-inline-tracer`.
import collections
import hashlib
import json
import os
import signal
import struct
import threading
import time

try:
    _mono_ns = time.monotonic_ns          # novermin   Python 3.7+
except AttributeError:                    # 3.6 fallback (customer hosts)
    def _mono_ns():
        return int(time.monotonic() * 1_000_000_000)


# ---------------------------------------------------------------------------
# Advanced-tracing aspect toggles (Options C, D, E in docs/debuggability-review)
#
# Semantics: each aspect has its own env var.  If set to '1', enabled; to '0',
# explicitly disabled.  If unset, falls back to TUNL_TRACE_ALL.  This lets
# a reproducer do `TUNL_TRACE_ALL=1 ...` to switch everything on, then add
# e.g. `TUNL_SSH_TRACE=v` to dial back one aspect.  Intended for short
# test runs where no redaction or log rotation is wanted.
# ---------------------------------------------------------------------------

def tracing_enabled(aspect):
    """Return True when an advanced-tracing aspect should be active.

    aspect in {'b', 'wire', 'tmux'}.  The 'b' aspect is the Phase 2 JSON
    tracer (same as TUNL_TRACE=1).  For SSH verbosity use ssh_trace_level().
    """
    aspect_var = {
        'b':    'TUNL_TRACE',
        'wire': 'TUNL_TRACE_WIRE',
        'tmux': 'TUNL_TRACE_TMUX',
    }[aspect]
    v = os.environ.get(aspect_var, '')
    if v == '1':
        return True
    if v == '0':
        return False
    return os.environ.get('TUNL_TRACE_ALL') == '1'


def ssh_trace_level():
    """Return 'v' / 'vv' / 'vvv' / '' for the verbose-SSH aspect (Option E).

    TUNL_SSH_TRACE overrides TUNL_TRACE_ALL.  Values: 'v','vv','vvv' passed
    through; '1' -> 'vvv'; '0' -> ''.  Unset + TUNL_TRACE_ALL=1 -> 'vvv'.
    """
    v = os.environ.get('TUNL_SSH_TRACE', '').lower()
    if v in ('v', 'vv', 'vvv'):
        return v
    if v == '1':
        return 'vvv'
    if v == '0':
        return ''
    if os.environ.get('TUNL_TRACE_ALL') == '1':
        return 'vvv'
    return ''


def ssh_debug_args(purpose, session_dir):
    """Return the ssh args that enable verbose tracing, or [] if disabled.

    purpose identifies the call site ('main', 'upgrade', 'master', 'consume'
    etc.); it becomes part of the -E log file name so concurrent calls
    don't trample each other.  session_dir is where the log file lands.
    """
    lvl = ssh_trace_level()
    if not lvl:
        return []
    path = os.path.join(str(session_dir), f'ssh.{purpose}.{lvl}.log')
    return [f'-{lvl}', '-E', path]


class WireTap:
    """Capture every byte crossing a socket / pipe boundary (Option C).

    Records are binary-framed for fast append and exact replay:
        1 byte  direction    '>' (local sends) or '<' (local receives)
        8 bytes monotonic ns since tap open (big-endian uint64)
        4 bytes payload length (big-endian uint32)
        N bytes payload

    One WireTap per communication channel (per-consumer UDS, customer SSH
    stdio, etc.).  Not redacted.  No size cap: the caller is expected to
    use these only for short reproducer runs.
    """

    _HEADER = struct.Struct('>BQI')

    def __init__(self, path):
        self._lock = threading.Lock()
        self._t0   = _mono_ns()
        try:
            os.makedirs(os.path.dirname(path) or '.', exist_ok=True)
            self._fh = open(path, 'wb', buffering=0)
        except OSError:
            self._fh = None

    def write(self, direction, data):
        """Record one chunk.  direction is '>' (egress) or '<' (ingress).

        data may be bytes or str (UTF-8 encoded).  Never raises.
        """
        if self._fh is None or not data:
            return
        if isinstance(data, str):
            data = data.encode('utf-8', errors='replace')
        try:
            hdr = self._HEADER.pack(
                ord(direction), _mono_ns() - self._t0, len(data))
            with self._lock:
                self._fh.write(hdr)
                self._fh.write(data)
        except (OSError, struct.error):
            pass

    def close(self):
        fh = self._fh
        self._fh = None
        if fh is not None:
            try:
                fh.close()
            except OSError:
                pass


def _disabled_tap():
    """A no-op WireTap returned when wire tracing is off."""
    t = WireTap.__new__(WireTap)
    t._lock = threading.Lock()
    t._t0   = 0
    t._fh   = None
    return t


def open_wire_tap(path):
    """Return a WireTap if TUNL_TRACE_WIRE / TUNL_TRACE_ALL enable it, else a no-op."""
    if not tracing_enabled('wire'):
        return _disabled_tap()
    return WireTap(path)


def frame_digest(payload):
    """Return a short sha256 prefix of payload for logging without body.

    Accepts str or bytes.  Returns 16 hex chars (64-bit prefix).
    """
    if payload is None:
        return ''
    if isinstance(payload, str):
        payload = payload.encode('utf-8', errors='replace')
    return hashlib.sha256(payload).hexdigest()[:16]


def _iso_ts():
    # UTC, millisecond precision, ends with Z.
    t = time.time()
    ms = int((t - int(t)) * 1000)
    return time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(t)) + f'.{ms:03d}Z'


class Tracer:
    """Structured tracer for one process.

    Events are always appended to the in-memory ring buffer (cheap).
    They are additionally written to disk only while enabled and while
    the byte cap has not been reached.
    """

    def __init__(self, comp, sid=None, path=None,
                 ring_size=None, max_bytes=None):
        self.comp        = comp
        self.sid         = sid
        self.path        = path
        self.enabled     = os.environ.get('TUNL_TRACE') == '1'
        rs = ring_size if ring_size is not None else \
             int(os.environ.get('TUNL_TRACE_RING', '500'))
        mb = max_bytes if max_bytes is not None else \
             int(os.environ.get('TUNL_TRACE_MAX', str(50 * 1024 * 1024)))
        self.ring        = collections.deque(maxlen=rs)
        self.max_bytes   = mb
        self._bytes      = 0
        self._fh         = None
        self._lock       = threading.Lock()
        self._capped     = False
        if self.enabled and self.path:
            self._open()

    def _open(self):
        try:
            # Line-buffered so every record is on disk immediately.
            os.makedirs(os.path.dirname(self.path) or '.', exist_ok=True)
            self._fh = open(self.path, 'a', buffering=1)
        except OSError:
            self._fh = None

    def set_sid(self, sid):
        self.sid = sid

    def set_path(self, path):
        self.path = path
        if self.enabled and self._fh is None:
            self._open()

    def trace(self, event, **fields):
        """Emit one event.  Always appended to the ring; written to disk
        only if enabled and under the byte cap.  Never raises.
        """
        try:
            rec = {
                'ts':    _iso_ts(),
                'sid':   self.sid,
                'comp':  self.comp,
                'event': event,
            }
            rec.update(fields)
            line = json.dumps(rec, separators=(',', ':'),
                              default=_json_fallback) + '\n'
        except Exception:
            return
        self.ring.append(line)
        if not self.enabled or self._fh is None or self._capped:
            return
        with self._lock:
            if self._bytes >= self.max_bytes:
                if not self._capped:
                    self._capped = True
                    try:
                        self._fh.write('{"ts":"' + _iso_ts() +
                                       '","comp":"' + self.comp +
                                       '","event":"trace_capped"}\n')
                    except OSError:
                        pass
                return
            try:
                self._fh.write(line)
                self._bytes += len(line)
            except OSError:
                pass

    def toggle(self, enabled=None):
        """Flip or set the enabled flag.  SIGUSR1 calls this with None."""
        if enabled is None:
            enabled = not self.enabled
        self.enabled = enabled
        if enabled and self._fh is None and self.path:
            self._open()
        try:
            self.trace('trace_toggle', enabled=enabled)
        except Exception:
            pass

    def dump_ring(self, path):
        """Write the current ring buffer to path (overwrite).  Never raises."""
        try:
            os.makedirs(os.path.dirname(path) or '.', exist_ok=True)
            with open(path, 'w') as f:
                f.writelines(list(self.ring))
        except OSError:
            pass

    def setup_signals(self, lastgasp_path=None):
        """Install SIGUSR1 (toggle) and SIGUSR2 (dump ring) handlers.

        Must be called from the main thread.  Safe to call multiple times.
        """
        def _sigusr1(signum, frame):
            self.toggle()

        def _sigusr2(signum, frame):
            if lastgasp_path:
                self.dump_ring(lastgasp_path)

        try:
            signal.signal(signal.SIGUSR1, _sigusr1)
            signal.signal(signal.SIGUSR2, _sigusr2)
        except (OSError, ValueError):
            # Not the main thread or signals unavailable.  Skip silently.
            pass

    def flush_on_exit(self, lastgasp_path):
        """Return a callable suitable for atexit: dumps the ring if nonempty."""
        def _dump():
            if self.ring:
                self.dump_ring(lastgasp_path)
        return _dump

    def close(self):
        fh = self._fh
        self._fh = None
        if fh is not None:
            try:
                fh.close()
            except OSError:
                pass


def _json_fallback(obj):
    """Coerce non-JSON-native objects to something serialisable."""
    if isinstance(obj, (bytes, bytearray)):
        try:
            return obj.decode('utf-8')
        except UnicodeDecodeError:
            return obj.hex()
    if isinstance(obj, (set, frozenset)):
        return sorted(obj)
    return repr(obj)


# ---------------------------------------------------------------------------
# Module-level convenience: a shared singleton for simple cases.
# ---------------------------------------------------------------------------

_default = None
_default_lock = threading.Lock()


def init(comp, sid=None, path=None, ring_size=None, max_bytes=None,
         lastgasp_path=None, install_signals=True):
    """Create the module-level default tracer.  Idempotent per process."""
    global _default
    with _default_lock:
        if _default is not None:
            if sid is not None:
                _default.set_sid(sid)
            if path is not None:
                _default.set_path(path)
            return _default
        _default = Tracer(comp, sid=sid, path=path,
                          ring_size=ring_size, max_bytes=max_bytes)
        if install_signals:
            _default.setup_signals(lastgasp_path=lastgasp_path)
        return _default


def get():
    """Return the default tracer, or a disabled no-op one if init() was never called."""
    global _default
    if _default is None:
        with _default_lock:
            if _default is None:
                _default = Tracer('unknown')
    return _default


def trace(event, **fields):
    """Shortcut: emit on the default tracer."""
    get().trace(event, **fields)
# --- END INLINE tunl_trace ----------------------------------------------------

# --- BEGIN INLINE chatui ------------------------------------------------------
# Everything between BEGIN/END markers is the inlineable body, copied
# verbatim into linbit-tunl.py and tunl.py.  tests/test_inline_chatui.py
# asserts the copies match byte-for-byte.

import hashlib as _cu_hashlib
import queue as _cu_queue
import re as _cu_re
import textwrap as _cu_textwrap
import time as _cu_time


# ---------------------------------------------------------------------------
# Wire protocol helpers.  Consumer side of tunl share-mode protocol v4.
#
# wire_parse(payload) decodes the body of a single CTRL frame; wire_encode
# (kind, text) returns the bytes of a complete v4 frame ready for the wire.
# Symbols decode_ctrl, encode_ctrl, encode_message, TYPE_CTRL,
# WireProtocolError are provided by the wire codec at module scope.
# ---------------------------------------------------------------------------

def wire_parse(payload):
    """Parse one inbound CTRL frame payload.

    payload is the CTRL frame body: bytes of '<verb-line>\\n[<binary-args>...]'.
    Accepts bytes; str is coerced to UTF-8 for backward-compatible callers.

    Returns an event dict:
        {'kind': 'chat',          'who': str, 'ts': str, 'body': str}
        {'kind': 'system',        'ts': str, 'body': str}        (SCHAT from '[tunl]')
        {'kind': 'note',          'who': str, 'ts': str, 'body': str}  (M7)
        {'kind': 'session_info',  'payload': dict}
        {'kind': 'ephemeral',     'lifecycle': str, 'body': str}
        {'kind': 'session_ended'}
        {'kind': 'readonly'}
        {'kind': 'readwrite'}
        {'kind': 'brb'}
        {'kind': 'brb_end'}
        {'kind': 'heartbeat'}
        {'kind': 'unknown',       'raw': str}
    Never raises on malformed input; returns kind='unknown'.
    """
    if isinstance(payload, str):
        payload = payload.encode('utf-8', errors='replace')
    if not payload:
        return {'kind': 'unknown', 'raw': ''}
    try:
        line, args = decode_ctrl(payload)
    except WireProtocolError:
        return {'kind': 'unknown', 'raw': repr(payload)}
    parts = line.split()
    if not parts:
        return {'kind': 'unknown', 'raw': line}
    verb = parts[0]

    if verb == 'SESSION_ENDED':
        return {'kind': 'session_ended'}
    if verb == 'READONLY':
        return {'kind': 'readonly'}
    if verb == 'READWRITE':
        return {'kind': 'readwrite'}
    if verb == 'BRB':
        return {'kind': 'brb'}
    if verb == 'BRB_END':
        return {'kind': 'brb_end'}
    if verb == 'HEARTBEAT':
        return {'kind': 'heartbeat'}

    if verb == 'SESSION_INFO':
        # 1 binary arg = JSON blob (UTF-8).
        if args:
            import json as _cu_json
            try:
                payload_obj = _cu_json.loads(args[0].decode('utf-8', 'replace'))
            except (_cu_json.JSONDecodeError, ValueError):
                return {'kind': 'unknown', 'raw': line}
            return {'kind': 'session_info', 'payload': payload_obj}

    if verb == 'EPHEMERAL':
        # 'EPHEMERAL <lifecycle>'; 0 or 1 binary arg (body).
        # 'clear' lifecycle has no body.
        if len(parts) == 2:
            lifecycle = parts[1]
            body = args[0].decode('utf-8', errors='replace') if args else ''
            return {'kind': 'ephemeral', 'lifecycle': lifecycle, 'body': body}

    if verb == 'CCHAT':
        # 'CCHAT <ts>' + 1 binary arg = msg
        if len(parts) == 2 and args:
            ts = parts[1]
            body = args[0].decode('utf-8', errors='replace')
            return {'kind': 'chat', 'who': 'customer', 'ts': ts, 'body': body}

    if verb == 'SCHAT':
        # 'SCHAT <ts>' + 2 binary args = (name, msg)
        if len(parts) == 2 and len(args) >= 2:
            ts = parts[1]
            name = args[0].decode('utf-8', errors='replace')
            body = args[1].decode('utf-8', errors='replace')
            # The broker uses the literal sender name '[tunl]' for
            # system messages (joins, resizes, etc.).
            if name == '[tunl]':
                return {'kind': 'system', 'ts': ts, 'body': body}
            return {'kind': 'chat', 'who': name, 'ts': ts, 'body': body}

    if verb == 'SNOTE':
        # 'SNOTE <ts>' + 2 binary args = (author, body)
        if len(parts) == 2 and len(args) >= 2:
            ts = parts[1]
            author = args[0].decode('utf-8', errors='replace')
            body   = args[1].decode('utf-8', errors='replace')
            return {'kind': 'note', 'who': author, 'ts': ts, 'body': body}

    return {'kind': 'unknown', 'raw': line}


def wire_encode(kind, text):
    """Encode an outbound v4 CTRL frame.

    kind in {'chat', 'customer_chat', 'name', 'customer_nick', 'note'}.
    Returns full v4 frame bytes (header + payload), ready to write to the
    transport.
    """
    if isinstance(text, bytes):
        data = text
    else:
        data = text.encode('utf-8', errors='replace')
    if kind == 'chat':
        verb = 'CHAT'
    elif kind == 'customer_chat':
        verb = 'CUSTOMER_CHAT'
    elif kind == 'name':
        verb = 'NAME'
    elif kind == 'customer_nick':
        verb = 'CUSTOMER_NICK'
    elif kind == 'note':
        verb = 'NOTE'
    else:
        raise ValueError(f'unknown wire kind: {kind!r}')
    return encode_message(TYPE_CTRL, encode_ctrl(verb, binary_args=(data,)))


# ---------------------------------------------------------------------------
# Nick abbreviation and column-width helpers.
# ---------------------------------------------------------------------------

def _compute_nick_abbrevs(nicks):
    """Return {raw_nick: display_nick} using the shortest unique word prefix.

    If two nicks share the same W-word prefix, both get W+1 words.
    Identical full names (same seat) are left as-is (same abbreviation).
    """
    nicks = list(nicks)
    words = [n.split() for n in nicks]
    result = {}
    for i, nick in enumerate(nicks):
        wlist = words[i]
        abbrev = nick  # fallback: full name unchanged
        for w_count in range(1, len(wlist) + 1):
            candidate = ' '.join(wlist[:w_count])
            if all(' '.join(words[j][:w_count]) != candidate
                   for j in range(len(nicks)) if j != i):
                abbrev = candidate
                break
        result[nick] = abbrev
    return result


def _nick_col_width(display_nicks):
    """Tab-stop-aligned column width for the nick field.

    Rounds up to the nearest 4 so the column widens in discrete steps
    (reducing visual churn when participants join/leave).
    """
    if not display_nicks:
        return 8
    max_w = max(len(n) for n in display_nicks)
    w = ((max_w + 4) // 4) * 4   # round up to next multiple of 4
    return min(max(w, 8), 20)


def _first_significant_word(company):
    """Return the first significant word from a company/org name, or None."""
    if not company:
        return None
    for word in _cu_re.split(r'[\s_@\-]+', company):
        w = word.strip()
        if len(w) > 2 and w.lower() not in _MENTION_STOPLIST:
            return w
    parts = company.split()
    return parts[0] if parts else None


# ---------------------------------------------------------------------------
# HistoryBuffer -- bounded deque of structured row dicts plus a scroll index.
# ---------------------------------------------------------------------------

class HistoryBuffer:
    """Store chat events and render them as structured row dicts for the viewport.

    The viewport shows at most `height` rows of `width` columns.  Messages
    are appended as event dicts (from wire_parse) or local notes; call
    lines_for_viewport(height, width) to get the renderable row dicts.

    Each row dict has: {ts, nick_display, nick_color_key, body, kind,
    is_continuation}.  Callers flatten to strings only at display time.

    Scrolling: scroll_offset is the number of rows scrolled up from the
    bottom.  Zero == stuck to live tail; any positive number == paused in
    scrollback (new messages do not auto-scroll).

    Two display toggles are cheap and mutate the wrap cache:
      show_ts      -- if False, render without the leading 'HH:MM '
      filter_re    -- if set, only events whose nick+body matches are shown
      customer_nick -- if set, substitutes who='customer' with this string
    """

    def __init__(self, max_events=2000):
        self._events        = []       # list of event dicts
        self._max_events    = max_events
        self.scroll_offset  = 0        # 0 = at bottom; N = N rows scrolled up
        self.show_ts        = True
        self.filter_re      = None     # compiled regex or None
        self.filter_kinds   = None     # frozenset of allowed kinds or None
        self.customer_nick  = None     # display name for who='customer'
        self.my_nick        = None     # local-echo override; /nick on the
                                       # consumer side updates this so future
                                       # self-tagged messages use the new name
        # Cached wrap: (width, show_ts, filter_id, kinds_id, customer_nick)
        self._wrap_key      = None
        self._wrapped       = []       # flat list of row dicts
        self._nick_col_w    = 8        # computed by _rewrap; used by renderer

    def append(self, ev):
        """Append an event.  ev is a dict with at minimum {'ts','who','body'}
        for chat events, or {'ts','body','kind':'system'|'note'} for others.
        Trims to max_events.  Invalidates the wrap cache.
        """
        self._events.append(ev)
        if len(self._events) > self._max_events:
            drop = len(self._events) - self._max_events
            self._events = self._events[drop:]
        self._wrap_key = None  # force re-wrap

    def __len__(self):
        return len(self._events)

    def set_show_ts(self, on: bool):
        self.show_ts = bool(on)
        self._wrap_key = None

    def set_filter(self, pattern):
        """Set a case-insensitive filter regex; pass None / '' to clear."""
        if not pattern:
            self.filter_re = None
        else:
            try:
                self.filter_re = _cu_re.compile(pattern, _cu_re.IGNORECASE)
            except _cu_re.error:
                self.filter_re = None
        self._wrap_key = None

    def set_kind_filter(self, kinds):
        """Restrict scrollback to the given event kinds.  'local' events
        (client-side feedback like '/search set' acknowledgements) are
        always preserved so users can see their commands take effect.
        Pass None or an empty collection to clear the filter."""
        if not kinds:
            self.filter_kinds = None
        else:
            self.filter_kinds = frozenset(kinds)
        self._wrap_key = None

    def is_filtered(self):
        return (self.filter_re is not None) or (self.filter_kinds is not None)

    def _rewrap(self, width):
        key = (width, self.show_ts, id(self.filter_re), id(self.filter_kinds),
               self.customer_nick)
        if key == self._wrap_key:
            return

        w = max(width, 10)

        # Pass 1: collect distinct non-customer nicks; compute abbreviations.
        raw_nicks = set()
        for ev in self._events:
            who = ev.get('who', '')
            if who and who != 'customer' and ev.get('kind') not in ('system', 'local'):
                raw_nicks.add(who)
        abbrevs = _compute_nick_abbrevs(raw_nicks)

        # Customer display nick (substituted for who='customer').
        cust_display = self.customer_nick or 'customer'

        # Nick column width: sized from all display nicks in this history.
        # Include fixed tokens ([tunl], --, *note*) in the sizing.
        all_display = set(abbrevs.values()) | {cust_display, '[tunl]', '--'}
        # Note authors (wrapped in *) count as their abbrev + 2 chars.
        for ev in self._events:
            if ev.get('kind') == 'note':
                who = ev.get('who', '')
                if who:
                    all_display.add(f'*{abbrevs.get(who, who)}*')
        self._nick_col_w = _nick_col_width(all_display)
        nick_col_w = self._nick_col_w

        # Prefix width: ts (6) + nick column.  Body wraps within remainder.
        ts_w    = 6 if self.show_ts else 0
        prefix_w = ts_w + nick_col_w
        body_w  = max(10, w - prefix_w)
        indent  = ' ' * min(prefix_w, w // 2)

        self._wrapped = []

        for ev in self._events:
            kind = ev.get('kind', 'chat')
            if (self.filter_kinds is not None and kind != 'local'
                    and kind not in self.filter_kinds):
                continue

            ts = ev.get('ts', '') or ''

            if kind == 'system':
                nick_display  = '[tunl]'
                nick_color_key = 'tunl'
                body = ev.get('body', '')
            elif kind == 'note':
                who = ev.get('who', 'note')
                abbr = abbrevs.get(who, who)
                nick_display  = f'*{abbr}*'
                nick_color_key = 'note'
                body = ev.get('body', '')
            elif kind == 'local':
                nick_display  = '--'
                nick_color_key = '--'
                body = ev.get('body', '')
            else:
                who = ev.get('who', '?')
                if who == 'customer':
                    nick_display  = cust_display
                    nick_color_key = 'customer'
                else:
                    nick_display  = abbrevs.get(who, who)
                    nick_color_key = who
                body = ev.get('body', '')

            # Filter on nick + body (no timestamp -- timestamp matching is
            # rarely useful and confuses searches for numbers).
            if self.filter_re is not None:
                if not self.filter_re.search(f'{nick_display} {body}'):
                    continue

            body_lines = _cu_textwrap.wrap(body, width=body_w,
                                           break_long_words=True,
                                           break_on_hyphens=False)
            if not body_lines:
                body_lines = ['']

            self._wrapped.append({
                'ts':            ts if self.show_ts else '',
                'nick_display':  nick_display,
                'nick_color_key': nick_color_key,
                'body':          body_lines[0],
                'kind':          kind,
                'is_continuation': False,
            })
            for bl in body_lines[1:]:
                self._wrapped.append({
                    'ts':            '',
                    'nick_display':  '',
                    'nick_color_key': '',
                    'body':          indent + bl,
                    'kind':          kind,
                    'is_continuation': True,
                })

        self._wrap_key = key

    def lines_for_viewport(self, height, width):
        """Return (rows, at_bottom_flag).

        rows is a list of up to `height` row dicts, topmost first.  Each dict
        has: {ts, nick_display, nick_color_key, body, kind, is_continuation}.
        at_bottom is True iff the view is not scrolled up.
        """
        self._rewrap(width)
        total = len(self._wrapped)
        h = max(height, 1)
        if total <= h:
            return list(self._wrapped), True
        # scroll_offset is how many rows scrolled up from the bottom.
        offset = max(0, min(self.scroll_offset, total - h))
        end   = total - offset
        start = max(0, end - h)
        return self._wrapped[start:end], offset == 0

    def scroll(self, delta, height=1):
        """Positive delta -> scroll up (older).  Negative -> scroll down."""
        if self._wrap_key is None:
            # Force-wrap at the current (or fallback) width so scroll can
            # clamp against something sensible even before the first render.
            w = self._wrap_key[0] if self._wrap_key else 80
            self._rewrap(w)
        max_off = max(0, len(self._wrapped) - max(height, 1))
        self.scroll_offset = max(0, min(self.scroll_offset + delta, max_off))

    def scroll_to_bottom(self):
        self.scroll_offset = 0


# ---------------------------------------------------------------------------
# InputBuffer -- editable single-line input with readline-ish keybindings.
# ---------------------------------------------------------------------------

class InputBuffer:
    """A mutable text buffer + cursor position.  Not curses-aware.

    All keybinding helpers take no arguments and mutate state in place.
    """

    def __init__(self):
        self.text   = ''
        self.cursor = 0
        self.history = []      # submission history (oldest first)
        self._hidx  = None     # current history index (None = live editing)

    def insert(self, ch):
        """Insert a single character (str) or short string at the cursor."""
        if not ch:
            return
        self._hidx = None
        self.text = self.text[:self.cursor] + ch + self.text[self.cursor:]
        self.cursor += len(ch)

    def backspace(self):
        if self.cursor > 0:
            self._hidx = None
            self.text = self.text[:self.cursor - 1] + self.text[self.cursor:]
            self.cursor -= 1

    def delete(self):
        if self.cursor < len(self.text):
            self._hidx = None
            self.text = self.text[:self.cursor] + self.text[self.cursor + 1:]

    def cursor_left(self):
        if self.cursor > 0:
            self.cursor -= 1

    def cursor_right(self):
        if self.cursor < len(self.text):
            self.cursor += 1

    def cursor_home(self):
        self.cursor = 0

    def cursor_end(self):
        self.cursor = len(self.text)

    def kill_to_start(self):       # ctrl-u
        self._hidx = None
        self.text = self.text[self.cursor:]
        self.cursor = 0

    def kill_to_end(self):         # ctrl-k
        self._hidx = None
        self.text = self.text[:self.cursor]

    def kill_word_left(self):      # ctrl-w
        if self.cursor == 0:
            return
        self._hidx = None
        i = self.cursor - 1
        # Skip trailing spaces, then one word.
        while i > 0 and self.text[i].isspace():
            i -= 1
        while i > 0 and not self.text[i - 1].isspace():
            i -= 1
        self.text = self.text[:i] + self.text[self.cursor:]
        self.cursor = i

    def submit(self):
        """Return the current text and clear the buffer.  Empty -> ''."""
        out = self.text
        if out:
            self.history.append(out)
            # Cap history at 200 entries.
            if len(self.history) > 200:
                self.history = self.history[-200:]
        self.text   = ''
        self.cursor = 0
        self._hidx  = None
        return out

    def history_prev(self):
        if not self.history:
            return
        self._hidx = (len(self.history) - 1 if self._hidx is None
                      else max(0, self._hidx - 1))
        self.text   = self.history[self._hidx]
        self.cursor = len(self.text)

    def history_next(self):
        if self._hidx is None:
            return
        self._hidx += 1
        if self._hidx >= len(self.history):
            self._hidx = None
            self.text  = ''
        else:
            self.text = self.history[self._hidx]
        self.cursor = len(self.text)


# ---------------------------------------------------------------------------
# NickColors -- stable nick -> color-pair index (1..8).
# ---------------------------------------------------------------------------

class NickColors:
    """Map a nick to a stable color index in [1, 8].  Index 0 is reserved
    for the default terminal pair; this class never returns it.
    """

    # Reserve index 1 for 'customer' and 2 for 'tunl' system messages.
    # Keys are looked up after stripping '*[]' from the matched nick token,
    # so '[tunl]' -> 'tunl' and '*note*' -> 'note'.
    _RESERVED = {'customer': 1, 'tunl': 2, 'note': 2}

    def __init__(self, palette_size=8):
        self._palette = max(3, palette_size)   # leave 1,2 for reserved

    def index_for(self, nick):
        if nick in self._RESERVED:
            return self._RESERVED[nick]
        h = _cu_hashlib.sha1(nick.encode('utf-8', 'replace')).digest()
        return 3 + (h[0] % (self._palette - 2))


# ---------------------------------------------------------------------------
# Pinned row rendering -- status info that sits above the scrollback.
# Customer uses 1 row; support uses 2.  Content is free-form; the owner
# (caller) is responsible for keeping slots populated.  This class is
# protocol-agnostic: it knows how to lay out strings in `width` columns.
# ---------------------------------------------------------------------------

def fit_lcr(left, center, right, width):
    """Place left / center / right into exactly `width` columns.

    Priority when space is tight: keep left, then right, then center.
    Left is truncated (never dropped) so the user always sees the most
    important slot.  Single-space minimum between non-empty slots.
    """
    if width <= 0:
        return ''
    L, C, R = str(left or ''), str(center or ''), str(right or '')
    if C and len(L) + len(C) + len(R) + 2 <= width:
        inner_w = width - len(L) - len(R)
        lgap = (inner_w - len(C)) // 2
        rgap = inner_w - len(C) - lgap
        return L + (' ' * lgap) + C + (' ' * rgap) + R
    if len(L) + len(R) + 1 <= width:
        return L + ' ' * (width - len(L) - len(R)) + R
    # Either L or R alone exceeds the remaining budget: keep L (truncated).
    return L[:width].ljust(width)


class PinnedRows:
    """Mutable state for 1 or 2 pinned rows above the chat scrollback.

    Row 1 is always L/C/R (connection state, session meta, viewer count).
    Row 2 (support side only) is free-form text used for a priority stack
    of transient context (latest note -> ticket URL -> viewer names).

    scroll_status is set by the curses layer when the user has scrolled
    or is filtering; it preempts the right slot of row 1.
    """

    def __init__(self, rows=1):
        if rows not in (1, 2):
            rows = 1
        self.rows          = rows
        self.row1_left     = ''
        self.row1_center   = ''
        self.row1_right    = ''
        self.row2_body     = ''
        self.scroll_status = None   # 'scrolled' | 'filtered' | None

    def set_row1(self, left=None, center=None, right=None):
        if left   is not None: self.row1_left   = str(left)
        if center is not None: self.row1_center = str(center)
        if right  is not None: self.row1_right  = str(right)

    def set_row2(self, body=''):
        self.row2_body = str(body or '')

    def render(self, width):
        """Return a list of 1 or 2 rendered strings, each exactly `width`
        characters wide (or empty when width <= 0)."""
        if width <= 0:
            return ['', ''][:self.rows]
        right = self.row1_right
        if self.scroll_status == 'filtered':
            right = '[filtered -- /filter to clear]'
        elif self.scroll_status:
            right = f'[{self.scroll_status}]'
        out = [fit_lcr(self.row1_left, self.row1_center, right, width)]
        if self.rows == 2:
            body = self.row2_body[:width]
            out.append(body.ljust(width))
        return out


# ---------------------------------------------------------------------------
# Ephemeral slot -- a one-shot overlay that eats the top scrollback row
# until cleared.  Two lifecycles:
#   state     -- caller explicitly clears when the underlying condition
#                resolves (e.g. 'Shell exited' cleared when the shell
#                respawns).
#   keystroke -- any subsequent input keystroke dismisses the ephemeral
#                (cheap user acknowledgement).
# ---------------------------------------------------------------------------

class Ephemeral:

    def __init__(self):
        self.text      = ''
        self.lifecycle = None

    def set(self, text, lifecycle='state'):
        self.text      = str(text or '')
        self.lifecycle = lifecycle if lifecycle in ('state', 'keystroke') \
                                   else 'state'

    def clear(self):
        self.text      = ''
        self.lifecycle = None

    def on_keystroke(self):
        if self.lifecycle == 'keystroke':
            self.clear()

    def active(self):
        return bool(self.text)


# ---------------------------------------------------------------------------
# SlashCommands -- simple dispatch table for leading-'/' input.
# ---------------------------------------------------------------------------

class SlashCommands:
    """Holds (name -> callable) pairs; dispatch() returns True iff handled.

    The callable receives (args_str) and may return a local event dict to
    append to history, or None for silent commands.
    """

    def __init__(self):
        self._cmds = {}

    def register(self, name, fn, help_text=''):
        if not name.startswith('/'):
            name = '/' + name
        self._cmds[name] = (fn, help_text)

    def names(self):
        return sorted(self._cmds.keys())

    def help_text(self):
        return '\n'.join(f'{k}  {self._cmds[k][1]}' for k in sorted(self._cmds))

    def dispatch(self, line):
        """Parse '/cmd args...'.  Return (handled, event_or_None).

        When handled is False the caller should send the line as chat.
        """
        if not line.startswith('/'):
            return (False, None)
        parts = line.split(None, 1)
        cmd   = parts[0]
        args  = parts[1] if len(parts) > 1 else ''
        if cmd not in self._cmds:
            return (True, {'kind': 'local',
                           'ts': _cu_time.strftime('%H:%M', _cu_time.gmtime()),
                           'body': f'unknown command: {cmd} (try /help)'})
        fn, _ = self._cmds[cmd]
        try:
            return (True, fn(args))
        except Exception as e:
            return (True, {'kind': 'local',
                           'ts': _cu_time.strftime('%H:%M', _cu_time.gmtime()),
                           'body': f'{cmd} failed: {e!r}'})


# ---------------------------------------------------------------------------
# Tiny utilities re-used by the curses layer (kept pure so they're tested).
# ---------------------------------------------------------------------------

_MENTION_STOPLIST = frozenset({
    'corp', 'ltd', 'llc', 'inc', 'gmbh', 'ag', 'sa', 'co', 'bv', 'nv',
    'the', 'and', 'for', 'of', 'de', 'group', 'solutions', 'services', 'systems',
})


def build_mention_tokens(role, my_name='', extra_company=''):
    """Return a frozenset of lowercase tokens that count as 'addressed at me'.

    role          -- 'support' or 'customer' (always included)
    my_name       -- display name; each word > 2 chars is included
    extra_company -- company / org name; significant words (> 2 chars, not on
                     the generic-word stoplist) are included
    """
    tokens = set()
    if role:
        tokens.add(role.lower())
    for word in _cu_re.split(r'[\s_@-]+', my_name or ''):
        w = word.lower().strip()
        if len(w) > 2:
            tokens.add(w)
    for word in _cu_re.split(r'[\s_@-]+', extra_company or ''):
        w = word.lower().strip()
        if len(w) > 2 and w not in _MENTION_STOPLIST:
            tokens.add(w)
    return frozenset(tokens)


def highlight_mentions(body, tokens):
    """Return (body, mentions_you:bool).

    tokens -- frozenset of lowercase strings returned by build_mention_tokens().
    Matches '@<token>' (case-insensitive, word-boundary on the right).
    Empty tokens -> no highlight.
    """
    if not tokens:
        return body, False
    for token in tokens:
        if _cu_re.search(r'(?i)@' + _cu_re.escape(token) + r'\b', body):
            return body, True
    return body, False


def now_hhmm():
    """UTC HH:MM timestamp string.  Matches wire ts format."""
    return _cu_time.strftime('%H:%M', _cu_time.gmtime())


# ---------------------------------------------------------------------------
# Thin curses wrapper.  Included in the INLINE block so the single-file
# deliverables carry their own copy and don't need the relay/ directory.
# ---------------------------------------------------------------------------
import curses
import locale
import threading


def run_chat_ui(read_line, send_bytes, my_name, stop_event=None,
                readonly=False, wire_out_kind='chat',
                send_name=True, local_echo_on_send=False,
                local_echo_who=None,
                pinned_rows=1, session_info_handler=None,
                mention_tokens=frozenset(),
                customer_nick=None,
                extra_commands=None,
                session_id=''):
    """Render the chat UI until the transport closes or /quit is typed.

    Parameters:
        read_line           -- callable() -> bytes or None  (blocking read
                               of one inbound line; return None on EOF)
        send_bytes          -- callable(bytes) -> None      (write outbound
                               frame; bytes already framed by run_chat_ui)
        my_name             -- str  (this client's display name; used for
                               bold-marking own messages)
        stop_event          -- optional threading.Event; set to request exit
        readonly            -- if True, outbound CHAT is suppressed
        wire_out_kind       -- 'chat' for consumer, or 'customer_chat' for
                               the customer producer
        send_name           -- True for consumers; customer side sets False
        local_echo_on_send  -- True when the broker does not echo back
                               (customer side with CUSTOMER_CHAT)
        local_echo_who      -- nick used in the local-echo event's 'who'
                               field; defaults to my_name.  Customer side
                               passes 'customer' so own messages get the
                               reserved yellow color rather than the hash
                               color for their display name.
        pinned_rows         -- 1 for customer layout, 2 for support
        session_info_handler-- optional callable(payload, pinned, ephemeral)
                               invoked on each inbound session_info frame.
                               Mutates the pinned rows / ephemeral slot to
                               reflect the new state.  Caller-site logic
                               lives here; this module stays generic.
        mention_tokens      -- frozenset from build_mention_tokens(); messages
                               containing '@<token>' ring the bell.
        customer_nick       -- display name substituted for who='customer';
                               None falls back to 'customer'.  Support side
                               also updates this dynamically via
                               session_info_handler receiving history.
        extra_commands      -- optional callable(cmds, history) invoked
                               after the default slash commands are
                               registered.  Lets callers add side-specific
                               commands (e.g. customer /brb, /back) without
                               leaking them into the canonical default set.
    """
    locale.setlocale(locale.LC_ALL, '')
    stop_event = stop_event or threading.Event()
    in_q = _cu_queue.Queue()

    def _reader():
        while not stop_event.is_set():
            line = read_line()
            if line is None:
                in_q.put({'kind': 'session_ended', '_eof': True})
                return
            in_q.put(wire_parse(line))

    t = threading.Thread(target=_reader, daemon=True)
    t.start()

    # Consumer-side announces itself to the broker (idempotent).
    # Customer side skips this: it is the producer, not a consumer.
    if send_name and my_name:
        try:
            send_bytes(wire_encode('name', my_name))
        except Exception:
            pass

    def _inner(stdscr):
        _curses_main(stdscr, in_q, send_bytes, my_name, stop_event, readonly,
                     wire_out_kind, local_echo_on_send, local_echo_who,
                     pinned_rows, session_info_handler, mention_tokens,
                     customer_nick, extra_commands, session_id)

    try:
        curses.wrapper(_inner)
    finally:
        stop_event.set()


def _curses_main(stdscr, in_q, send_bytes, my_name, stop_event, readonly,
                 wire_out_kind='chat', local_echo_on_send=False,
                 local_echo_who=None,
                 pinned_rows=1, session_info_handler=None,
                 mention_tokens=frozenset(),
                 customer_nick=None,
                 extra_commands=None,
                 session_id=''):
    curses.curs_set(1)
    stdscr.nodelay(False)
    stdscr.timeout(80)   # ms; controls inbound-queue drain cadence

    # Color setup.
    try:
        curses.start_color()
        curses.use_default_colors()
        palette = [curses.COLOR_YELLOW,    # 1 -- customer
                   curses.COLOR_CYAN,      # 2 -- [tunl] system
                   curses.COLOR_GREEN,     # 3..
                   curses.COLOR_MAGENTA,
                   curses.COLOR_BLUE,
                   curses.COLOR_RED,
                   curses.COLOR_WHITE,
                   curses.COLOR_YELLOW]
        for i, c in enumerate(palette, start=1):
            try:
                curses.init_pair(i, c, -1)
            except curses.error:
                pass
        has_colors = curses.has_colors()
    except curses.error:
        has_colors = False

    history = HistoryBuffer()
    history.customer_nick = customer_nick
    inp     = InputBuffer()
    nicks   = NickColors()
    cmds    = SlashCommands()
    pinned  = PinnedRows(rows=pinned_rows)
    ephem   = Ephemeral()
    _register_default_commands(cmds, history, stop_event,
                               send_bytes=send_bytes,
                               wire_out_kind=wire_out_kind)
    if extra_commands is not None:
        try:
            extra_commands(cmds, history)
        except Exception:
            pass

    # Seed the screen with a tiny banner.  Including the session id
    # makes it copy-pastable from either pane on the support side, so
    # whoever needs to read it to a third party (a customer on the
    # phone, a colleague over chat) can do so without flipping windows.
    sid_part = f' | session {session_id}' if session_id else ''
    history.append({'kind': 'local', 'ts': now_hhmm(),
                    'body': f'chat connected as {my_name or "anonymous"}'
                            f'{sid_part}'
                            f'{" (read-only)" if readonly else ""}'})

    redraw_needed = True
    mentions_bell = False

    while not stop_event.is_set():
        # Reflect scroll / filter state into the pinned row right slot.
        _, at_bottom = history.lines_for_viewport(1, 80)  # cheap check
        if not at_bottom:
            pinned.scroll_status = 'scrolled'
        elif history.is_filtered():
            pinned.scroll_status = 'filtered'
        else:
            pinned.scroll_status = None

        if redraw_needed:
            _render(stdscr, history, inp, nicks, has_colors,
                    mentions_bell, my_name, pinned, ephem)
            mentions_bell = False
            redraw_needed = False

        # Drain inbound queue.
        try:
            while True:
                ev = in_q.get_nowait()
                if ev.get('kind') == 'session_info':
                    payload = ev.get('payload', {}) or {}
                    history.session_info = payload  # /who reads this
                    if session_info_handler is not None:
                        try:
                            session_info_handler(payload,
                                                 pinned, ephem,
                                                 history=history)
                        except Exception:
                            pass
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'ephemeral':
                    lifecycle = ev.get('lifecycle', 'state')
                    if lifecycle == 'clear':
                        ephem.clear()
                    else:
                        ephem.set(ev.get('body', ''), lifecycle=lifecycle)
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'session_ended':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- session ended --'})
                    ephem.set('session ended', lifecycle='keystroke')
                    redraw_needed = True
                    if ev.get('_eof'):
                        # Transport closed; linger briefly so the user sees
                        # the message, then exit.
                        stdscr.timeout(2000)
                        try:
                            stdscr.getch()
                        except curses.error:
                            pass
                        return
                    continue
                if ev.get('kind') == 'readonly':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- read-only mode --'})
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'readwrite':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- interactive mode restored --'})
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'brb':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- customer is away; input frozen --'})
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'brb_end':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- customer is back; input restored --'})
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'heartbeat':
                    continue
                if ev.get('kind') in ('chat', 'system', 'note'):
                    body = ev.get('body', '')
                    _, hit = highlight_mentions(body, mention_tokens)
                    if hit and ev.get('who') != my_name:
                        mentions_bell = True
                    history.append(ev)
                    redraw_needed = True
                    continue
                # 'unknown' and anything else we just tuck away.
                history.append({'kind': 'local', 'ts': now_hhmm(),
                                'body': f'? {ev.get("raw", ev)}'})
                redraw_needed = True
        except _cu_queue.Empty:
            pass

        # Wait for keystroke (timeout already set above).
        # get_wch returns a str (UTF-8-decoded printable, possibly multibyte)
        # or int (control codes / curses KEY_*).  curses.error means "no
        # input within the timeout" -- the getch() equivalent of -1.
        # Using getch + chr(k) here would treat each UTF-8 byte of an
        # umlaut as a separate latin-1 char, double-encoding on submit.
        try:
            k = stdscr.get_wch()
        except curses.error:
            continue
        except KeyboardInterrupt:
            return
        redraw_needed = True

        # Any keystroke dismisses a keystroke-lifecycle ephemeral.
        ephem.on_keystroke()

        # Handle terminal resize first.
        if k == curses.KEY_RESIZE:
            try:
                h, w = stdscr.getmaxyx()
                curses.resize_term(h, w)
            except curses.error:
                pass
            continue

        _handle_key(k, inp, history, cmds, send_bytes, stdscr, readonly,
                    my_name, wire_out_kind, local_echo_on_send, local_echo_who,
                    mention_tokens)


def _register_default_commands(cmds, history, stop_event, send_bytes=None,
                               wire_out_kind='chat'):
    def _help(args):
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': 'commands: ' + ' '.join(cmds.names())}

    def _quit(args):
        stop_event.set()
        return None

    def _clear(args):
        history._events = []
        history._wrap_key = None
        return None

    def _notime(args):
        history.set_show_ts(not history.show_ts)
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': f'timestamps: '
                        f'{"on" if history.show_ts else "off"}'}

    def _search(args):
        pat = args.strip()
        history.set_filter(pat or None)
        history.scroll_to_bottom()
        if not pat:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'scrollback filter cleared'}
        try:
            _cu_re.compile(pat)
        except _cu_re.error as e:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': f'invalid regex: {e}'}
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': f'scrollback filtered to /{pat}/ (case-insensitive)'}

    def _events(args):
        # Toggle the pre-canned "system notices only" view.
        if history.filter_kinds and 'system' in history.filter_kinds:
            history.set_kind_filter(None)
            body = 'events filter off (full scrollback)'
        else:
            history.set_kind_filter({'system', 'note', 'session_ended'})
            body = 'events filter on (system notices + notes only)'
        history.scroll_to_bottom()
        return {'kind': 'local', 'ts': now_hhmm(), 'body': body}

    def _note(args):
        body = args.strip()
        if not body:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'usage: /note <text>'}
        if send_bytes is None:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'no transport; /note cannot be submitted'}
        try:
            send_bytes(wire_encode('note', body))
        except Exception as e:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': f'/note failed: {e!r}'}
        # The broker echoes an SNOTE back; no local echo needed.
        return None

    def _who(args):
        info = getattr(history, 'session_info', None) or {}
        cust = info.get('customer_nick') or info.get('customer') or 'customer'
        viewer_names = info.get('viewer_names') or []
        n_viewers = info.get('viewers')
        parts = [f'customer: {cust}']
        if viewer_names:
            parts.append('viewers: ' + ', '.join(viewer_names))
        elif n_viewers:
            parts.append(f'viewers: {n_viewers}')
        else:
            parts.append('no viewers connected yet')
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': ' | '.join(parts)}

    def _me(args):
        body = args.strip()
        if not body:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'usage: /me <action>'}
        if send_bytes is None:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'no transport; /me cannot be sent'}
        try:
            send_bytes(wire_encode(wire_out_kind, '* ' + body))
        except Exception as e:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': f'/me failed: {e!r}'}
        # Local echo is handled normally (by local_echo_on_send when
        # set, or via broker echo).
        return None

    def _nick(args):
        new_nick = args.strip()
        if not new_nick:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'usage: /nick <name>'}
        # Sanity: keep it printable, no whitespace, reasonable length.
        if any(c.isspace() for c in new_nick) or len(new_nick) > 32:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'nick must be one word, <=32 chars'}
        if wire_out_kind == 'chat':
            # Consumer side: tell the broker our new name; the
            # existing 'name' wire kind already triggers re-broadcast
            # (NAME ctrl frame) so peers and the customer see it.
            # Also update history.my_nick so the next local echo on
            # this seat uses the new name -- otherwise our own pane
            # keeps tagging our messages with the original my_name
            # until the wrapper restarts, while the customer and
            # other consumers already see the new one.
            if send_bytes is not None:
                try:
                    send_bytes(wire_encode('name', new_nick))
                except Exception as e:
                    return {'kind': 'local', 'ts': now_hhmm(),
                            'body': f'/nick failed: {e!r}'}
            history.my_nick = new_nick
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': f'now known as {new_nick}'}
        # Customer side: send CUSTOMER_NICK so the broker can fan it
        # out via SESSION_INFO; consumers substitute the chosen nick
        # for the company-derived 'customer' label.  Update the local
        # history so the customer's own pane labels their messages
        # the same way as well.
        history.customer_nick = new_nick
        if send_bytes is not None:
            try:
                send_bytes(wire_encode('customer_nick', new_nick))
            except Exception as e:
                return {'kind': 'local', 'ts': now_hhmm(),
                        'body': f'/nick failed: {e!r}'}
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': f'now known as {new_nick}'}

    cmds.register('help',   _help,   'show this help')
    cmds.register('quit',   _quit,   'disconnect')
    cmds.register('clear',  _clear,  'clear scrollback (not history on relay)')
    cmds.register('notime', _notime, 'toggle HH:MM timestamps')
    cmds.register('search', _search, 'filter scrollback by regex; /search to clear')
    cmds.register('filter', _search, 'alias for /search')
    cmds.register('events', _events, 'toggle system-notices-only view')
    cmds.register('note',   _note,   'append a session-scoped note (live relay)')
    cmds.register('who',    _who,    'show customer + connected viewers')
    cmds.register('me',     _me,     'send an emote-style chat line')
    cmds.register('nick',   _nick,   'set your display name')


def _longest_common_prefix(strings):
    if not strings:
        return ''
    out = ''
    for chars in zip(*strings):
        if all(c == chars[0] for c in chars):
            out += chars[0]
        else:
            break
    return out


def _gather_nicks(history, my_name, mention_tokens):
    """Collect known display names from history, my_name, and mention tokens."""
    nicks = set()
    for ev in getattr(history, '_events', []):
        who = ev.get('who')
        if isinstance(who, str) and who.strip():
            nicks.add(who.strip())
    if my_name:
        nicks.add(my_name)
    cn = getattr(history, 'customer_nick', None)
    if cn:
        nicks.add(cn)
    for t in (mention_tokens or frozenset()):
        if t:
            nicks.add(str(t))
    return sorted(nicks)


def _complete_token(text, cursor, cmds, history, my_name, mention_tokens):
    """Tab completion.  Return (new_text, new_cursor, matches_or_None).

    Two modes:
      - slash command: when the current line starts with '/' and the cursor
        is within the first token, complete against cmds.names().
      - @-mention: find the nearest preceding '@' (at BOL or after space);
        complete the word against known nicks.

    When there is exactly one match, complete fully (append space if the
    char after the cursor is not already a space).  When multiple matches
    share a longer common prefix, extend to that prefix.  When prefix
    cannot be extended, return (unchanged, matches) so the caller can
    display the list.
    """
    if cursor > len(text):
        cursor = len(text)
    prefix = text[:cursor]
    suffix = text[cursor:]

    # Slash command completion only for the first token.
    if prefix.startswith('/') and ' ' not in prefix:
        token_start = 0
        candidates = list(cmds.names())
    else:
        at_idx = -1
        for i in range(len(prefix) - 1, -1, -1):
            if prefix[i] == '@':
                if i == 0 or prefix[i - 1].isspace():
                    at_idx = i
                break
            if prefix[i].isspace():
                break
        if at_idx < 0:
            return (text, cursor, None)
        token_start = at_idx
        candidates = ['@' + n for n in _gather_nicks(history, my_name, mention_tokens)]

    token = prefix[token_start:]
    matches = [c for c in candidates if c.startswith(token)]
    if not matches:
        return (text, cursor, None)
    if len(matches) == 1:
        completion = matches[0]
        if not suffix.startswith(' '):
            completion += ' '
        new_text = prefix[:token_start] + completion + suffix
        new_cursor = token_start + len(completion)
        return (new_text, new_cursor, None)
    lcp = _longest_common_prefix(matches)
    if lcp == token:
        return (text, cursor, matches)
    new_text = prefix[:token_start] + lcp + suffix
    new_cursor = token_start + len(lcp)
    return (new_text, new_cursor, matches)


def _handle_key(k, inp, history, cmds, send_bytes, stdscr, readonly,
                my_name='', wire_out_kind='chat', local_echo_on_send=False,
                local_echo_who=None, mention_tokens=frozenset()):
    # get_wch returns control characters (Enter, Tab, Backspace, Ctrl-*) as
    # a one-char str; the special-key handlers below compare with int codes.
    # Normalize a single-char control str back to its ord so those handlers
    # keep working.  Printable input stays a str and falls through to the
    # str branch at the bottom.
    if isinstance(k, str) and len(k) == 1:
        c = ord(k)
        if c < 32 or c == 127:
            k = c
    # Scrollback keys -- operate on the history viewport.
    h, _ = stdscr.getmaxyx()
    page = max(1, h - 3)
    if k == curses.KEY_PPAGE:
        history.scroll(+page, page);   return
    if k == curses.KEY_NPAGE:
        history.scroll(-page, page);   return
    if k == curses.KEY_HOME:
        inp.cursor_home();             return
    if k == curses.KEY_END:
        inp.cursor_end();              return
    if k == curses.KEY_LEFT:
        inp.cursor_left();             return
    if k == curses.KEY_RIGHT:
        inp.cursor_right();            return
    if k == curses.KEY_UP:
        inp.history_prev();            return
    if k == curses.KEY_DOWN:
        inp.history_next();            return
    if k in (curses.KEY_BACKSPACE, 127, 8):
        inp.backspace();               return
    if k == curses.KEY_DC:
        inp.delete();                  return
    if k == 21:                # ctrl-u
        inp.kill_to_start();           return
    if k == 11:                # ctrl-k
        inp.kill_to_end();             return
    if k == 23:                # ctrl-w
        inp.kill_word_left();          return
    if k == 3:                 # ctrl-c: ignore (use /quit to exit)
        return
    if k == 9:                 # TAB: complete slash command or @mention
        new_text, new_cursor, matches = _complete_token(
            inp.text, inp.cursor, cmds, history, my_name, mention_tokens,
        )
        if (new_text, new_cursor) != (inp.text, inp.cursor):
            inp.text, inp.cursor = new_text, new_cursor
        elif matches and len(matches) > 1:
            shown = ' '.join(matches[:8])
            suffix = '' if len(matches) <= 8 else f' ... ({len(matches)} total)'
            history.append({'kind': 'local', 'ts': now_hhmm(),
                            'body': f'completions: {shown}{suffix}'})
        return
    if k in (10, 13, curses.KEY_ENTER):
        line = inp.submit()
        if not line.strip():
            return
        # Slash command?
        handled, ev = cmds.dispatch(line)
        if handled:
            if ev is not None:
                history.append(ev)
            history.scroll_to_bottom()
            return
        if readonly:
            history.append({'kind': 'local', 'ts': now_hhmm(),
                            'body': '-- read-only; message not sent --'})
            return
        # Send as chat.
        try:
            send_bytes(wire_encode(wire_out_kind, line))
            if local_echo_on_send:
                history.append({'kind': 'chat',
                                'who':  history.my_nick or local_echo_who
                                        or my_name or 'you',
                                'ts':   now_hhmm(),
                                'body': line})
        except Exception as e:
            history.append({'kind': 'local', 'ts': now_hhmm(),
                            'body': f'send failed: {e!r}'})
        history.scroll_to_bottom()
        return
    # Printable char (or multibyte sequence).
    # get_wch returns a str directly for printable input (UTF-8 decoded
    # using the active locale); int returns are control codes that
    # didn't match any of the cases above.
    if isinstance(k, str):
        if k.isprintable():
            inp.insert(k)
        return
    try:
        ch = chr(k)
    except (ValueError, OverflowError):
        return
    if ch.isprintable():
        inp.insert(ch)


def _render(stdscr, history, inp, nicks, has_colors, bell, my_name,
            pinned=None, ephem=None):
    try:
        stdscr.erase()
        h, w = stdscr.getmaxyx()
        if h < 4 or w < 20:
            stdscr.addstr(0, 0, 'terminal too small')
            stdscr.refresh()
            return

        # Layout (top to bottom):
        #   [scrollback region]   rows 0 .. hist_h-1
        #   [ephemeral, if set]   row  hist_h-1  (eats last scrollback row)
        #   [input row]           row  h - pinned_h - 1
        #   [pinned rows ...]     rows h - pinned_h .. h-1  (primary at h-1)
        pinned_h = pinned.rows if pinned is not None else 0
        hist_h   = max(1, h - pinned_h - 1)   # 1 for input
        input_row = h - pinned_h - 1

        zone_attr = _zone_attr(has_colors)

        # Scrollback region: rows 0..hist_h-1.
        # Ephemeral occupies the bottom-most scrollback row (above input).
        eph_active = ephem is not None and ephem.active()
        avail_h = hist_h - (1 if eph_active else 0)
        rows, _at_bottom = history.lines_for_viewport(avail_h, w - 1)
        nick_col_w = history._nick_col_w
        # Bottom-align: push messages down so the latest row is directly
        # above the ephemeral (or input when no ephemeral is active).
        row_y = max(0, avail_h - len(rows))
        for row in rows:
            _addstr_colored(stdscr, row_y, 0, row, nicks, has_colors,
                            my_name, nick_col_w)
            row_y += 1
        if eph_active:
            _paint_tinted_row(stdscr, hist_h - 1, w,
                              f'[!] {ephem.text}',
                              _ephemeral_attr(has_colors))

        # Input row (tinted zone).  Horizontal-scroll the buffer so cursor
        # stays visible.  Cursor placement is deferred to the end of the
        # render so subsequent writes do not move it off the input line.
        prompt = '> '
        max_in = max(1, w - 1 - len(prompt))
        start  = max(0, inp.cursor - max_in + 1)
        visible = inp.text[start:start + max_in]
        input_cursor_col = len(prompt) + (inp.cursor - start)
        _paint_tinted_row(stdscr, input_row, w, prompt + visible, zone_attr)

        # Pinned rows at bottom: rendered in reverse so the primary row
        # (row 1) lands at h-1 (most prominent) and supplementary rows
        # sit above it.
        if pinned is not None:
            rendered = pinned.render(w - 1)
            for i, row in enumerate(reversed(rendered)):
                _paint_tinted_row(stdscr, h - 1 - i, w, row, zone_attr)

        if bell:
            try:
                curses.beep()
            except curses.error:
                pass

        # Place the cursor last so no intervening write can move it.
        try:
            stdscr.move(input_row, min(input_cursor_col, w - 1))
        except curses.error:
            pass

        stdscr.refresh()
    except curses.error:
        pass


def _paint_tinted_row(stdscr, y, w, content, attr):
    """Paint row `y` with `attr` applied to the full width `w`.

    Writes `content` (truncated / padded to w-1 chars) into cols 0..w-2 via
    addstr, then uses chgat to extend `attr` to the last column without
    actually writing a character there.  Writing col w-1 on the bottom row
    would scroll the window; chgat only changes attributes.
    """
    try:
        body = content[:w - 1].ljust(w - 1)
        stdscr.addstr(y, 0, body, attr)
    except curses.error:
        pass
    try:
        stdscr.chgat(y, 0, w, attr)
    except curses.error:
        pass


def _zone_attr(has_colors):
    """Attribute for pinned / input rows.  Subtle reverse-video when
    colors are available; bold otherwise."""
    if has_colors:
        try:
            return curses.A_REVERSE
        except AttributeError:
            return curses.A_NORMAL
    return curses.A_BOLD | curses.A_REVERSE


def _ephemeral_attr(has_colors):
    """Attribute for the ephemeral overlay row -- louder than pinned."""
    if has_colors:
        return curses.A_BOLD | curses.A_REVERSE
    return curses.A_STANDOUT


def _row_to_str(row, nick_col_w):
    """Flatten a structured row dict to a plain string (no-color terminals)."""
    if row['is_continuation']:
        return row['body']
    ts   = (row['ts'] + ' ') if row['ts'] else ''
    nick = row['nick_display'].ljust(nick_col_w)
    return ts + nick + row['body']


def _addstr_colored(stdscr, y, x, row, nicks, has_colors, my_name, nick_col_w=8):
    """Write a structured row dict at (y, x).

    Coloring rules:
    - [tunl] system rows: whole row in cyan.
    - '--' local rows: whole row dim.
    - Chat / note rows: ts plain, nick colored, body terminal-default.
    - Continuation rows: indented body, no coloring.
    """
    try:
        _, w = stdscr.getmaxyx()
        budget = max(0, w - x - 1)

        if row['is_continuation']:
            stdscr.addnstr(y, x, row['body'], budget)
            return

        if not has_colors:
            stdscr.addnstr(y, x, _row_to_str(row, nick_col_w), budget)
            return

        ck   = row['nick_color_key']
        ts   = (row['ts'] + ' ') if row['ts'] else ''
        nick = row['nick_display']
        body = row['body']

        # [tunl]: whole row in system color.
        if ck == 'tunl':
            line = _row_to_str(row, nick_col_w)
            stdscr.addnstr(y, x, line, budget,
                           curses.color_pair(nicks.index_for('tunl')))
            return
        # '--' local: whole row dim.
        if ck == '--':
            stdscr.addnstr(y, x, _row_to_str(row, nick_col_w), budget,
                           curses.A_DIM)
            return

        # Chat / note: ts uncolored, nick colored, body terminal-default.
        idx  = nicks.index_for(ck)
        attr = curses.color_pair(idx)
        if my_name and ck == my_name:
            attr |= curses.A_BOLD

        col = x
        if ts and col < w - 1:
            stdscr.addnstr(y, col, ts, min(len(ts), w - col - 1))
            col += len(ts)
        nick_padded = nick.ljust(nick_col_w)
        if col < w - 1:
            stdscr.addnstr(y, col, nick_padded,
                           min(len(nick_padded), w - col - 1), attr)
            col += len(nick_padded)
        if body and col < w - 1:
            stdscr.addnstr(y, col, body, w - col - 1)
    except curses.error:
        pass
# --- END INLINE chatui --------------------------------------------------------


# --- BEGIN INLINE wire --------------------------------------------------------
# Everything between BEGIN/END markers is the inlineable body, copied
# verbatim into linbit-tunl.py and tunl.py.  tests/test_inline_blocks.py
# asserts the copies match byte-for-byte.

import struct as _wi_struct


WIRE_VERSION = b"linbit-tunl-share/4"

MAX_FRAGMENT = 256 * 1024        # 256 KiB per fragment payload
MAX_MESSAGE = 4 * 1024 * 1024    # 4 MiB per reassembled logical message

# Frame types (low 6 bits of type byte).
TYPE_DATA = 0x00
TYPE_CTRL = 0x01
TYPE_HELLO = 0x02
TYPE_ERROR = 0x03

# Fragmentation (high 2 bits of type byte).
FRAG_SINGLE = 0
FRAG_START = 1
FRAG_CONT = 2
FRAG_END = 3

_TYPE_NAMES = {
    TYPE_DATA: "DATA",
    TYPE_CTRL: "CTRL",
    TYPE_HELLO: "HELLO",
    TYPE_ERROR: "ERROR",
}
_FRAG_NAMES = {
    FRAG_SINGLE: "SINGLE",
    FRAG_START: "START",
    FRAG_CONT: "CONT",
    FRAG_END: "END",
}

_NO_FRAGMENT_TYPES = (TYPE_HELLO, TYPE_ERROR)


class WireProtocolError(Exception):
    """Raised on any wire-level protocol violation.  Caller closes the
    connection cleanly and (if customer-facing) shows a friendly message;
    the relay-side log keeps the detail."""


def encode_frame(type_code, frag, payload):
    """Encode a single frame.  Caller is responsible for fragmentation
    decisions; encode_message() is the high-level helper that fragments
    automatically when payload exceeds MAX_FRAGMENT."""
    if not (0 <= type_code <= 0x3F):
        raise ValueError("type code must fit in 6 bits")
    if frag not in (FRAG_SINGLE, FRAG_START, FRAG_CONT, FRAG_END):
        raise ValueError("frag must be SINGLE/START/CONT/END")
    if len(payload) > MAX_FRAGMENT:
        raise ValueError(
            "fragment payload {0} exceeds MAX_FRAGMENT {1}".format(
                len(payload), MAX_FRAGMENT))
    header = bytes([(frag << 6) | type_code]) + _wi_struct.pack(">I", len(payload))
    return header + bytes(payload)


def encode_message(type_code, payload):
    """Encode a logical message of any size up to MAX_MESSAGE, fragmenting
    automatically.  Returns the concatenation of all frame bytes ready
    for the wire."""
    n = len(payload)
    if n <= MAX_FRAGMENT:
        return encode_frame(type_code, FRAG_SINGLE, payload)
    if type_code in _NO_FRAGMENT_TYPES:
        raise ValueError(
            "type {0} cannot be fragmented".format(_TYPE_NAMES.get(type_code, type_code)))
    if n > MAX_MESSAGE:
        raise ValueError(
            "message size {0} exceeds MAX_MESSAGE {1}".format(n, MAX_MESSAGE))
    out = bytearray()
    offset = 0
    first = True
    while offset < n:
        end = offset + MAX_FRAGMENT
        if end >= n:
            chunk = payload[offset:n]
            offset = n
            frag = FRAG_END
        else:
            chunk = payload[offset:end]
            offset = end
            frag = FRAG_CONT
        if first:
            frag = FRAG_START
            first = False
        out += encode_frame(type_code, frag, chunk)
    return bytes(out)


def encode_ctrl(line, binary_args=()):
    """Build a CTRL frame payload.  `line` is the verb plus space-separated
    text args (no NUL, no newline); each entry in `binary_args` becomes a
    length-prefixed binary trailer."""
    if "\n" in line or "\0" in line:
        raise ValueError("ctrl line must not contain NUL or newline")
    out = bytearray()
    out += line.encode("utf-8")
    out += b"\n"
    for arg in binary_args:
        b = bytes(arg)
        out += _wi_struct.pack(">I", len(b))
        out += b
    return bytes(out)


def decode_ctrl(payload):
    """Parse a CTRL payload into (line: str, binary_args: list of bytes).
    Raises WireProtocolError on malformed input."""
    nl = payload.find(b"\n")
    if nl < 0:
        raise WireProtocolError("ctrl payload missing verb-line newline")
    verb_line_bytes = payload[:nl]
    # B2.9 (2026-05-06 audit): mirror encode_ctrl's NUL rejection on
    # the decode side.  encode rejects NUL in the verb-line; decode
    # used to silently accept it, so a malicious peer could embed
    # NUL into the verb token and the per-verb dispatcher would
    # silently drop the frame instead of treating it as a protocol
    # error (per the v4 spec).  Treat NUL in verb-line as a hard
    # protocol error -- close the connection cleanly.
    if b"\x00" in verb_line_bytes:
        raise WireProtocolError("ctrl verb-line contains NUL byte")
    try:
        line = verb_line_bytes.decode("utf-8")
    except UnicodeDecodeError as e:
        raise WireProtocolError("ctrl verb-line not valid UTF-8: {0}".format(e))
    rest = payload[nl + 1:]
    args = []
    while rest:
        if len(rest) < 4:
            raise WireProtocolError("ctrl arg trailer truncated (length header)")
        n = _wi_struct.unpack(">I", rest[:4])[0]
        if n > len(rest) - 4:
            raise WireProtocolError(
                "ctrl arg length {0} exceeds remaining payload {1}".format(
                    n, len(rest) - 4))
        args.append(bytes(rest[4:4 + n]))
        rest = rest[4 + n:]
    return line, args


class FrameStream:
    """Stream-oriented frame reader.

    Wraps a `read_some(n)` callable that returns up to n bytes (or b'' on
    EOF) -- e.g. socket.recv or a pipe.read.  Buffers internally to deliver
    exact-length reads and assembles fragmented messages.

    Use read_frame() for raw frame access (e.g. to forward DATA payloads
    immediately without buffering all fragments) or read_message() for the
    common case of "wait until I have a complete logical message"."""

    def __init__(self, read_some):
        self._read_some = read_some
        self._eof = False
        self._reasm_type = None
        self._reasm_buf = bytearray()

    def _read_exact(self, n):
        buf = bytearray()
        while len(buf) < n:
            chunk = self._read_some(n - len(buf))
            if not chunk:
                self._eof = True
                raise EOFError("connection closed before {0} bytes read".format(n))
            buf += chunk
        return bytes(buf)

    def read_frame(self):
        """Read one wire frame.  Returns (type_code, frag, payload).
        Raises EOFError on clean EOF (only at frame boundary), or
        WireProtocolError on malformed header / oversize fragment."""
        header = self._read_exact(5)
        type_byte = header[0]
        frag = (type_byte >> 6) & 0x3
        type_code = type_byte & 0x3F
        length = _wi_struct.unpack(">I", header[1:5])[0]
        if length > MAX_FRAGMENT:
            raise WireProtocolError(
                "fragment length {0} exceeds MAX_FRAGMENT {1}".format(
                    length, MAX_FRAGMENT))
        payload = self._read_exact(length) if length else b""
        return type_code, frag, payload

    def read_message(self):
        """Read frames until a complete logical message is available.
        Returns (type_code, payload).  Raises WireProtocolError on
        protocol violation (mid-sequence start, type-code mismatch,
        fragmented HELLO/ERROR, oversize reassembled message, etc.)."""
        while True:
            type_code, frag, payload = self.read_frame()
            if frag == FRAG_SINGLE:
                if self._reasm_type is not None:
                    raise WireProtocolError(
                        "SINGLE frame mid-sequence (was assembling type {0})".format(
                            self._reasm_type))
                return type_code, payload
            if frag == FRAG_START:
                if self._reasm_type is not None:
                    raise WireProtocolError(
                        "START frame mid-sequence (was assembling type {0})".format(
                            self._reasm_type))
                if type_code in _NO_FRAGMENT_TYPES:
                    raise WireProtocolError(
                        "type {0} must not be fragmented".format(
                            _TYPE_NAMES.get(type_code, type_code)))
                if len(payload) > MAX_MESSAGE:
                    raise WireProtocolError(
                        "reassembled message exceeds MAX_MESSAGE")
                self._reasm_type = type_code
                self._reasm_buf = bytearray(payload)
                continue
            # CONT or END
            if self._reasm_type is None:
                raise WireProtocolError(
                    "{0} frame without preceding START".format(_FRAG_NAMES[frag]))
            if type_code != self._reasm_type:
                raise WireProtocolError(
                    "type-code mismatch within fragmented sequence "
                    "(START was {0}, this is {1})".format(
                        self._reasm_type, type_code))
            if len(self._reasm_buf) + len(payload) > MAX_MESSAGE:
                raise WireProtocolError(
                    "reassembled message exceeds MAX_MESSAGE")
            self._reasm_buf += payload
            if frag == FRAG_END:
                t = self._reasm_type
                buf = bytes(self._reasm_buf)
                self._reasm_type = None
                self._reasm_buf = bytearray()
                return t, buf


def read_some_from_socket(sock):
    """Adapter: returns a read_some(n) callable for a socket.socket."""
    def _read(n):
        try:
            return sock.recv(n)
        except OSError:
            return b""
    return _read


def read_some_from_fileobj(f):
    """Adapter: returns a read_some(n) callable for a binary file-like.
    `f.read(n)` is called; returns whatever read returns (b'' on EOF)."""
    def _read(n):
        try:
            data = f.read(n)
        except OSError:
            return b""
        return data if data else b""
    return _read


# --- END INLINE wire ----------------------------------------------------------


def _v4_ctrl(line: str, *bin_args: bytes) -> bytes:
    """v4 CTRL frame: verb-line + 0 or more length-prefixed binary args."""
    return encode_message(TYPE_CTRL, encode_ctrl(line, bin_args))


def _v4_hello() -> bytes:
    return encode_message(TYPE_HELLO, WIRE_VERSION)


VERSION          = "0.3.0"
GIT_COMMIT       = "8723da1"

# Replaced at package build time with the relay's actual host CA public key.
# If the placeholder is not substituted, _ensure_relay_host_key() falls back
# to TOFU keyscan (acceptable for development; rejected in release builds).
RELAY_HOST_CA_PUBKEY = os.environ.get(
    "TUNL_RELAY_HOST_CA_PUBKEY",
    "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAILHlzJ/Pzvs1xTetEr+VkGurdCbJfvTjSu3BrRQ/eSaF linbit-tunl-host-ca",
)

def _env_int(name: str, default: int) -> int:
    v = os.environ.get(name)
    if v is None or v == '':
        return default
    try:
        return int(v)
    except ValueError:
        sys.exit(f"ERROR: {name}={v!r} is not an integer")

RELAY_HOST       = os.environ.get("TUNL_RELAY", "tunl.linbit.com")
FORWARD_AGENT    = os.environ.get("TUNL_FORWARD_AGENT", "").lower() in ("1", "yes", "true")
RELAY_PORT       = _env_int("TUNL_RELAY_PORT", 443)
RELAY_API_PORT   = _env_int("TUNL_API_PORT",   8080)
SUPPORT_USER     = os.environ.get("TUNL_SUPPORT_USER",  "support")
SUPPORT_COMPANY  = os.environ.get("TUNL_SUPPORT_COMPANY", "linbit")
SOCKS_PORT       = _env_int("TUNL_SOCKS_PORT", 1080)

SSH_LOG_PATH     = os.environ.get("TUNL_SSH_LOG", "/tmp/tunl-ssh.log")

# Idle keepalive cadence on the support<->broker SSH stdio.  Mirrors
# relay/share_common.HEARTBEAT_INTERVAL.  See docs/share.md "Liveness".
HEARTBEAT_INTERVAL = _env_int("TUNL_HEARTBEAT_INTERVAL", 3)

PREFIX = "[tunl]"

_ASCII = os.environ.get('TUNL_ASCII', '').lower() in ('1', 'yes', 'true')
SYM_ON   = '*' if _ASCII else '\u25cf'   # ● connected / active
SYM_OFF  = 'o' if _ASCII else '\u25cb'   # ○ disconnected / pending
SYM_TUN  = '^' if _ASCII else '\u25c6\u2191'  # ◆↑ tunnel active
SYM_NTUN = '-' if _ASCII else '\u25c7'   # ◇ no tunnel


def _agent_opt():
    """Return ['-o', 'ForwardAgent=no'] unless TUNL_FORWARD_AGENT is set."""
    return [] if FORWARD_AGENT else ['-o', 'ForwardAgent=no']


def _ensure_relay_host_key():
    """Add relay host trust to ~/.ssh/known_hosts if not already present.

    When RELAY_HOST_CA_PUBKEY is the real embedded CA (not the placeholder),
    writes a @cert-authority line so the relay's host certificate is verified
    without TOFU.  Falls back to ssh-keyscan TOFU when running from source
    (placeholder still present), so development/testing still works.
    """
    known_hosts = Path.home() / '.ssh' / 'known_hosts'
    tag = f'[{RELAY_HOST}]:{RELAY_PORT}' if RELAY_PORT != 22 else RELAY_HOST
    ca_key = RELAY_HOST_CA_PUBKEY
    is_placeholder = 'PLACEHOLDER' in ca_key

    if is_placeholder:
        # Development: fall back to TOFU keyscan.
        if known_hosts.exists() and tag in known_hosts.read_text():
            return
        r = subprocess.run(
            ['ssh-keyscan', '-p', str(RELAY_PORT), RELAY_HOST],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=10,
        )
        if r.returncode != 0 or not r.stdout.strip():
            return
        known_hosts.parent.mkdir(mode=0o700, exist_ok=True)
        with open(known_hosts, 'a') as f:
            f.write(r.stdout)
    else:
        # Release build: use @cert-authority line -- no TOFU.
        ca_line = f'@cert-authority {tag} {ca_key}\n'
        if known_hosts.exists() and ca_line in known_hosts.read_text():
            return
        known_hosts.parent.mkdir(mode=0o700, exist_ok=True)
        with open(known_hosts, 'a') as f:
            f.write(ca_line)


# ---------------------------------------------------------------------------
# Quoting helpers for nested command string construction
# ---------------------------------------------------------------------------

def _sq(s):
    """Shell-quote a value for safe interpolation into a shell command string."""
    return shlex.quote(str(s))


def _tmux_sq(s):
    """Shell-quote and escape '#' for use inside a tmux run-shell argument.

    tmux expands #{...} in run-shell arguments before passing to sh -c.
    Doubling '#' prevents accidental format expansion.
    """
    return shlex.quote(str(s)).replace('#', '##')


def _tmux_fmt(s):
    """Escape a value for embedding in a tmux format/style string.

    Prevents #{...} and #[...] interpretation by doubling '#'.
    """
    return str(s).replace('#', '##')


# ---------------------------------------------------------------------------
# SSH stderr debug logger
# ---------------------------------------------------------------------------

class _SshStderrLogger:
    """Background thread: reads an SSH process stderr pipe, writes timestamped
    lines to an append-only log file, and buffers the last N lines so callers
    can retrieve the most recent error without blocking.
    """

    _MAX_BUFFER = 30

    def __init__(self, stderr_pipe, log_path=None):
        self._buf  = []
        self._lock = threading.Lock()
        self._thread = threading.Thread(
            target=self._run, args=(stderr_pipe, log_path or SSH_LOG_PATH),
            daemon=True,
        )
        self._thread.start()

    def _run(self, pipe, log_path):
        try:
            with open(log_path, 'a') as f:
                for raw in pipe:
                    ts   = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
                    line = raw.decode('utf-8', errors='replace').rstrip('\n')
                    f.write(f'{ts} {line}\n')
                    f.flush()
                    with self._lock:
                        self._buf.append(line)
                        if len(self._buf) > self._MAX_BUFFER:
                            self._buf.pop(0)
        except Exception:
            pass

    def last_error(self):
        """Return the last non-blank stderr line, or ''."""
        with self._lock:
            return next((l for l in reversed(self._buf) if l.strip()), '')

    def recent_lines(self, n=10):
        """Return the last n non-debug stderr lines."""
        with self._lock:
            return [l for l in self._buf if not l.startswith('debug')][-n:]

    def join(self, timeout=1.0):
        self._thread.join(timeout=timeout)


# ---------------------------------------------------------------------------
# SSH tunnel to relay API
# ---------------------------------------------------------------------------

def _free_port():
    """Bind to an OS-assigned port, then release it and return the number."""
    with socket.socket() as s:
        s.bind(('127.0.0.1', 0))
        return s.getsockname()[1]


def _wait_for_port(port, timeout=10, ssh_logger=None):
    """Block until TCP port on 127.0.0.1 accepts connections."""
    deadline = time.time() + timeout
    while time.time() < deadline:
        try:
            with socket.create_connection(('127.0.0.1', port), timeout=0.5):
                return
        except (ConnectionRefusedError, OSError):
            time.sleep(0.2)
    reason_str = ''
    if ssh_logger is not None:
        ssh_logger.join(timeout=0.5)
        err = ssh_logger.last_error()
        if err:
            reason_str = f': {err}'
        recent = ssh_logger.recent_lines()
        if recent:
            reason_str += '\n  ' + '\n  '.join(recent)
    raise RuntimeError(
        f"Could not reach relay at {RELAY_HOST}:{RELAY_PORT}{reason_str}"
    )


@contextlib.contextmanager
def _api_tunnel():
    """Open an SSH local port-forward to the relay API; yield the local port."""
    local_port = _free_port()
    cmd = [
        'ssh',
        '-p', str(RELAY_PORT),
        '-o', 'ControlPath=none',
        '-o', 'BatchMode=yes',
        '-o', 'ConnectTimeout=15',
        '-o', 'ExitOnForwardFailure=yes',
        *_agent_opt(),
        '-L', f'127.0.0.1:{local_port}:127.0.0.1:{RELAY_API_PORT}',
        '-N',
        f'{SUPPORT_USER}@{RELAY_HOST}',
    ]
    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.PIPE,
    )
    logger = _SshStderrLogger(proc.stderr)
    try:
        _wait_for_port(local_port, ssh_logger=logger)
        yield local_port
    finally:
        proc.terminate()
        try:
            proc.wait(timeout=3)
        except subprocess.TimeoutExpired:
            proc.kill()
        logger.join()


def _api_request(method, path, payload=None):
    """Send an HTTP request via SSH tunnel; return (status, parsed JSON).

    For GET: raises RuntimeError on HTTP errors (legacy callers expect that).
    For POST/DELETE/PATCH: returns (status_code, parsed_json) on all responses.
    """
    with _api_tunnel() as port:
        data = json.dumps(payload).encode() if payload is not None else None
        headers = {'Content-Type': 'application/json'} if data else {}
        req = urllib.request.Request(
            f'http://127.0.0.1:{port}{path}',
            data=data, headers=headers, method=method,
        )
        try:
            with urllib.request.urlopen(req, timeout=10) as resp:
                try:
                    body = json.loads(resp.read().decode())
                except (json.JSONDecodeError, ValueError):
                    raise RuntimeError(f"API {path}: invalid JSON in response")
                if method == 'GET':
                    return body
                return resp.status, body
        except urllib.error.HTTPError as e:
            try:
                body = json.loads(e.read().decode())
            except (json.JSONDecodeError, ValueError):
                body = {"error": f"HTTP {e.code}: {e.reason}"}
            if method == 'GET':
                raise RuntimeError(
                    f"API {path} returned {e.code}: {body.get('error', e.reason)}"
                ) from e
            return e.code, body
        except urllib.error.URLError as e:
            msg = f"API {path}: {e.reason}"
            if method == 'GET':
                raise RuntimeError(msg) from e
            return 0, {"error": msg}


def _api_get(path):
    return _api_request('GET', path)


def _api_post(path, payload=None):
    return _api_request('POST', path, payload)


def _api_delete(path):
    return _api_request('DELETE', path)


def _api_patch(path, payload=None):
    return _api_request('PATCH', path, payload)


def _api_download(path, out_fh, chunk=64 * 1024):
    """GET <path> from the relay API, stream the response body to out_fh.

    Returns (status_code, content_type, error_or_None).  Used for endpoints
    that return raw bytes (cast files, archives) rather than JSON.  Out_fh
    must be a binary-mode file-like object with write().
    """
    with _api_tunnel() as port:
        req = urllib.request.Request(
            f'http://127.0.0.1:{port}{path}', method='GET',
        )
        try:
            with urllib.request.urlopen(req, timeout=30) as resp:
                ctype = resp.headers.get('Content-Type', '')
                while True:
                    buf = resp.read(chunk)
                    if not buf:
                        break
                    out_fh.write(buf)
                return resp.status, ctype, None
        except urllib.error.HTTPError as e:
            # Error responses are small JSON blobs; read+return for the caller
            # to format.  Do not write them to out_fh.
            body = e.read()
            try:
                msg = json.loads(body).get('error', str(e))
            except (json.JSONDecodeError, ValueError):
                msg = body.decode(errors='replace') or e.reason
            return e.code, '', msg
        except urllib.error.URLError as e:
            return 0, '', f"{e.reason}"


def _fetch_all_sessions():
    """Return (active_sessions, pending_sessions) from the relay API.

    Used by commands that need both lists separately (pick, join, cancel).
    cmd_list uses the unified /api/v1/sessions endpoint directly.
    """
    sessions = _api_get('/api/v1/sessions')
    try:
        pending = _api_get('/api/v1/pending')
    except RuntimeError:
        pending = []
    return sessions, pending


# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------

def _fmt_age(seconds):
    s = int(seconds)
    if s < 60:
        return f"{s}s"
    elif s < 3600:
        return f"{s // 60}m{s % 60:02d}s"
    else:
        return f"{s // 3600}h{(s % 3600) // 60:02d}m"


def _fmt_time(ts):
    import datetime
    return datetime.datetime.fromtimestamp(
        ts, datetime.timezone.utc
    ).strftime('%Y-%m-%d %H:%M UTC')


def _parse_tunl_datetime(s, round_up=False):
    """Parse a date/datetime string to an epoch float.

    Accepts: YYYY-MM-DD, YYYY-MM-DD HH, YYYY-MM-DD HH:MM, YYYY-MM-DD HH:MM:SS
    (T separator also accepted).  round_up=True advances to the next unit
    boundary -- e.g. date-only "2026-02-01" with round_up becomes
    2026-02-02T00:00:00 (exclusive end-of-day, correct for --before).
    Falls back to interpreting the input as a raw epoch float.
    """
    import datetime
    s = s.strip().replace('T', ' ')
    fmts_and_deltas = [
        ('%Y-%m-%d %H:%M:%S', datetime.timedelta(seconds=1)),
        ('%Y-%m-%d %H:%M',    datetime.timedelta(minutes=1)),
        ('%Y-%m-%d %H',       datetime.timedelta(hours=1)),
        ('%Y-%m-%d',          datetime.timedelta(days=1)),
    ]
    for fmt, delta in fmts_and_deltas:
        try:
            dt = datetime.datetime.strptime(s, fmt).replace(
                tzinfo=datetime.timezone.utc)
            if round_up:
                dt += delta
            return dt.timestamp()
        except ValueError:
            continue
    try:
        return float(s)
    except ValueError:
        print(f"error: cannot parse date/time: {s!r}", file=sys.stderr)
        sys.exit(1)


# ---------------------------------------------------------------------------
# tunl list
# ---------------------------------------------------------------------------

# Sort group: closed/archived first (index 0), active next (1), pending last (2).
# Within each group oldest created_at first so most recent is visible at bottom.
_LIST_GROUP = {'closed': 0, 'archived': 0, 'active': 1, 'pending': 2}


def cmd_list(status=None, customer=None, case=None,
             before=None, after=None, limit=None, verbose=0):
    from urllib.parse import urlencode
    has_filter = any(x is not None for x in (status, customer, case, before, after))

    params = {}
    params['status'] = status if status is not None else 'active,pending'
    if customer:
        params['customer'] = customer
    if case:
        params['case'] = case
    if before is not None:
        params['before'] = before
    if after is not None:
        params['after'] = after
    if limit is not None:
        params['limit'] = limit
    elif has_filter:
        params['limit'] = 20

    sessions = _api_get('/api/v1/sessions?' + urlencode(params))

    if not sessions:
        print("(no sessions)")
        return

    sessions.sort(key=lambda s: (
        _LIST_GROUP.get(s.get('status', 'active'), 1),
        s.get('created_at', 0),
    ))

    now = int(time.time())
    col = "{}  {:<25}  {:<12}  {:<30}  {:<12}  {}"
    print(col.format(' ', "SESSION ID", "STATUS/MODE", "HOST/CUSTOMER", "CASE", "AGE"))
    print("-" * 95)
    for s in sessions:
        st = s.get('status', 'active')
        if st == 'pending':
            sym      = SYM_OFF
            mode_col = 'pending'
            host_col = s.get('customer', '')[:30]
            age_ts   = s.get('created_at', now)
        elif st in ('closed', 'archived'):
            sym      = ' '
            mode_col = st
            ips = ', '.join(s.get('ips', [])) if isinstance(s.get('ips'), list) \
                  else str(s.get('ips', ''))
            host = s.get('host', '?')
            if ips:
                host = f"{host} ({ips})"
            host_col = host[:30]
            age_ts   = s.get('closed_at') or s.get('created_at', now)
        else:
            sym = SYM_ON
            ips = ', '.join(s.get('ips', [])) if isinstance(s.get('ips'), list) \
                  else str(s.get('ips', ''))
            host = s.get('host', '?')
            if ips:
                host = f"{host} ({ips})"
            mode = s.get('mode', 'share')
            if s.get('tunnel_port') and mode != 'tunnel':
                mode = f"{mode}+tunnel"
            mode_col = mode[:12]
            host_col = host[:30]
            age_ts   = s.get('connected_at') or s.get('created_at', now)

        print(col.format(
            sym,
            s.get('id', '?'),
            mode_col,
            host_col,
            s.get('case', '')[:12],
            _fmt_age(now - age_ts),
        ))

        if verbose >= 1:
            _list_print_verbose(s, now)
        if verbose >= 2:
            _list_print_verbose_all(s)


def _list_print_verbose(s, now):
    """Print one extra line of detail per session for -v."""
    parts = []
    ct = s.get('created_at')
    if ct:
        parts.append(f"created {_fmt_time(ct)}")
    fsc = s.get('first_support_connect')
    if fsc:
        parts.append(f"first-support {_fmt_time(fsc)}")
    origin = s.get('origin')
    if origin:
        parts.append(f"origin:{origin}")
    user = s.get('user')
    if user:
        parts.append(f"user:{user}")
    if parts:
        print(f"    {' | '.join(parts)}")


def _list_print_verbose_all(s):
    """Print all remaining session fields for -vv."""
    skip = {'id', 'status', 'mode', 'host', 'ips', 'case', 'customer',
            'created_at', 'connected_at', 'closed_at', 'first_support_connect',
            'origin', 'user'}
    for k, v in sorted(s.items()):
        if k not in skip and v not in (None, '', [], {}):
            print(f"    {k}: {v}")


# ---------------------------------------------------------------------------
# tunl show <id>
# ---------------------------------------------------------------------------

def cmd_show(sid):
    try:
        s = _api_get(f'/api/v1/sessions/{sid}')
    except RuntimeError as e:
        if '404' in str(e):
            # Session not yet active -- check if it's pending.
            try:
                p = _api_get(f'/api/v1/pending/{sid}')
                now = int(time.time())
                age = _fmt_age(now - p.get('created_at', now))
                print(f"id:       {p.get('id', sid)}")
                print(f"status:   pending (customer not yet connected)")
                print(f"case:     {p.get('case', '(none)')}")
                print(f"customer: {p.get('customer', '(none)')}")
                print(f"age:      {age}")
                return
            except RuntimeError:
                pass
        print(f"{PREFIX} ERROR: {e}", file=sys.stderr)
        sys.exit(1)

    now = int(time.time())
    reg_ts = s.get('connected_at')
    age = now - reg_ts if reg_ts else 0
    reg_str = _fmt_time(reg_ts) if reg_ts else '?'
    ips = ', '.join(s.get('ips', [])) if isinstance(s.get('ips'), list) \
          else str(s.get('ips', ''))

    tunnel_port = s.get('tunnel_port')
    mode = s.get('mode', 'share')
    if tunnel_port and mode != 'tunnel':
        mode = f"{mode}+tunnel"

    case_raw  = s.get('case', '(none)')
    case_slug = s.get('case_slug', '')
    cust_raw  = s.get('customer', '(none)')
    cust_slug = s.get('customer_slug', '')
    print(f"case:         {case_raw}")
    if case_slug and case_slug != case_raw:
        print(f"  (slug:      {case_slug})")
    print(f"customer:     {cust_raw}")
    if cust_slug and cust_slug != cust_raw:
        print(f"  (slug:      {cust_slug})")
    print(f"id:           {s.get('id', '?')}")
    print(f"mode:         {mode}")
    print(f"host:         {s.get('host', '?')}")
    print(f"ips:          {ips}")
    print(f"user:         {s.get('user', '?')}")
    created = s.get('created_at')
    if created:
        created_str = _fmt_time(created)
        print(f"created:      {created_str}")
    print(f"registered:   {reg_str} ({_fmt_age(age)} ago)")
    print(f"port:         {s.get('port', '?')}")
    if tunnel_port:
        print(f"tunnel_port:  {tunnel_port}")
    remote = s.get('remote_ip', '?')
    proxy  = s.get('proxy_ip', '')
    if proxy:
        print(f"remote_ip:    {remote} (via proxy {proxy})")
    else:
        print(f"remote_ip:    {remote}")


# ---------------------------------------------------------------------------
# SOCKS port selection
# ---------------------------------------------------------------------------

def _find_free_socks_port(start, max_tries=20):
    """Return the first free TCP port in [start, start+max_tries).

    Binds briefly to check availability; releases before returning.
    Falls back to start if none found (SSH will warn if it can't bind).
    """
    for port in range(start, start + max_tries):
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.bind(('127.0.0.1', port))
                return port
        except OSError:
            continue
    return start


def _wait_for_socks(port, timeout=15):
    """Block until the local SOCKS port accepts connections or timeout."""
    deadline = time.time() + timeout
    while time.time() < deadline:
        try:
            with socket.create_connection(('127.0.0.1', port), timeout=0.5):
                return True
        except (ConnectionRefusedError, OSError):
            time.sleep(0.3)
    return False


# ---------------------------------------------------------------------------
# Proxy shell window
# ---------------------------------------------------------------------------

def _write_proxy_env_script(sid, tunnel_port, socks_port, cust_user, nets, dns_servers):
    """Write a self-deleting bash rcfile that exports proxy env vars and prints a banner.

    Returns the script path.  Caller is responsible for launching bash --rcfile.
    """
    proxy_url  = f'socks5h://127.0.0.1:{socks_port}'
    no_proxy   = 'localhost,127.0.0.1'
    nets_str   = ', '.join(nets) if nets else '(unknown)'
    dns_str    = ', '.join(dns_servers) if dns_servers else '(unknown)'

    relay      = f'{SUPPORT_USER}@{RELAY_HOST}'
    jump       = f'{relay}:{RELAY_PORT}'
    nc_proxy   = f'nc -X 5 -x 127.0.0.1:{socks_port}'

    fd, script = tempfile.mkstemp(prefix='tunl-proxy-env-', suffix='.sh')
    try:
        os.write(fd, f"""\
rm -f {_sq(script)}
export HTTPS_PROXY={_sq(proxy_url)}
export HTTP_PROXY={_sq(proxy_url)}
export ALL_PROXY={_sq(proxy_url)}
export NO_PROXY={_sq(no_proxy)}
_TUNL_CUST_USER={_sq(cust_user)}
_TUNL_SOCKS_PORT={_sq(str(socks_port))}
_TUNL_PROXY_CMD={_sq(f'ssh -o ControlPath=none -o BatchMode=yes -p {RELAY_PORT} {SUPPORT_USER}@{RELAY_HOST} nc 127.0.0.1 {tunnel_port}')}
_TUNL_SSH_OPTS=(-o ControlPath=none -o "ProxyCommand=$_TUNL_PROXY_CMD" -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null)
_TUNL_SSH_WRAPPER=$(mktemp /tmp/tunl-ssh-wrapper.XXXXXX)
cat > "$_TUNL_SSH_WRAPPER" <<'SSHWRAP'
#!/bin/sh
exec ssh -o ControlPath=none \
  -o "ProxyCommand={f'ssh -o ControlPath=none -o BatchMode=yes -p {RELAY_PORT} {SUPPORT_USER}@{RELAY_HOST} nc 127.0.0.1 {tunnel_port}'}" \
  -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null "$@"
SSHWRAP
chmod +x "$_TUNL_SSH_WRAPPER"
cssh() {{
  local user="${{1:-$_TUNL_CUST_USER}}"
  shift 2>/dev/null
  ssh -t "${{_TUNL_SSH_OPTS[@]}}" "$user@127.0.0.1" "$@"
}}
cscp() {{
  scp "${{_TUNL_SSH_OPTS[@]}}" "$@"
}}
crsync() {{
  rsync -e "$_TUNL_SSH_WRAPPER" "$@"
}}
cproxy() {{
  local user_host="${{1:?usage: cproxy USER@HOST}}"
  shift
  ssh -t -o ControlPath=none -o "ProxyCommand=nc -X 5 -x 127.0.0.1:$_TUNL_SOCKS_PORT %h %p" \\
      "$user_host" "$@"
}}
printf '\\033[1;36m[tunl] access info for %s\\033[m\\n\\n' {_sq(sid)}
printf '  Customer networks : %s\\n' {_sq(nets_str)}
printf '  Customer DNS      : %s\\n\\n' {_sq(dns_str)}
printf '\\033[1mShell functions\\033[m (c = customer; tab-complete, pass extra args):\\n'
printf '    cssh                    -- SSH to customer as %s\\n' {_sq(cust_user)}
printf '    cssh OTHER_USER         -- SSH to customer as OTHER_USER\\n'
printf '    cscp FILE... %s@127.0.0.1:  -- scp to customer\\n' {_sq(cust_user)}
printf '    crsync SRC... DEST      -- rsync via tunnel (pass full user@host:path args)\\n'
printf '    cproxy USER@HOST        -- SSH to HOST reachable from customer via SOCKS5\\n\\n'
printf '\\033[1mEnvironment\\033[m (curl, wget, etc. auto-use these):\\n'
printf '    HTTPS_PROXY=%s\\n' "$HTTPS_PROXY"
printf '    ALL_PROXY=%s  NO_PROXY=%s\\n\\n' "$ALL_PROXY" "$NO_PROXY"
""".encode())
    finally:
        os.close(fd)
    os.chmod(script, 0o700)
    return script


def _open_proxy_shell(sid, tunnel_port, socks_port, cust_user, nets, dns_servers):
    """Open a new tmux window with proxy env vars pre-set and a usage banner.

    Only called when running inside a tmux session (TMUX env var set).
    """
    script = _write_proxy_env_script(sid, tunnel_port, socks_port, cust_user, nets, dns_servers)
    subprocess.Popen(
        ['tmux', 'new-window', '-n', 'proxy-env',
         f'bash --rcfile {shlex.quote(script)}'],
    )


# ---------------------------------------------------------------------------
# Browser launch
# ---------------------------------------------------------------------------

# Ordered candidate lists per family.
_CHROMIUM_BINS = [
    'chromium', 'chromium-browser',
    'google-chrome', 'google-chrome-stable',
    'brave-browser', 'microsoft-edge',
]
_FIREFOX_BINS = ['firefox', 'firefox-esr']


def _read_single_key(prompt: str) -> str:
    """Print prompt, read one keypress without waiting for Enter, return it."""
    sys.stdout.write(prompt)
    sys.stdout.flush()
    fd = sys.stdin.fileno()
    old = termios.tcgetattr(fd)
    try:
        tty.setraw(fd)
        ch = sys.stdin.read(1)
    finally:
        termios.tcsetattr(fd, termios.TCSADRAIN, old)
    sys.stdout.write(ch + '\n')
    sys.stdout.flush()
    return ch


def _has_display() -> bool:
    """Return True if a graphical display appears accessible."""
    return bool(os.environ.get('DISPLAY') or os.environ.get('WAYLAND_DISPLAY'))


def _display_maybe_inaccessible() -> bool:
    """Return True when we appear to be a road-warrior SSH session inside a
    multiplexer: $DISPLAY is set (inherited from the original desktop session)
    but we are connected via SSH and thus cannot reach that display.
    """
    if not os.environ.get('TMUX') and not os.environ.get('STY'):
        return False
    return bool(os.environ.get('SSH_CONNECTION') or os.environ.get('SSH_TTY'))


def _session_dir(sid: str) -> Path:
    """Per-session temp directory.  Created on first call, idempotent."""
    d = Path(f'/tmp/tunl-{sid}')
    d.mkdir(mode=0o700, exist_ok=True)
    return d


def _browser_pid_path(sid: str) -> Path:
    return _session_dir(sid) / 'browser.pid'


def _save_browser_pid(sid: str, pid: int):
    try:
        _browser_pid_path(sid).write_text(str(pid))
    except OSError:
        pass


def _kill_browser_by_pid(sid: str) -> bool:
    """Kill a browser process saved by a previous tunl browser run.
    Returns True if a process was found and signalled."""
    import signal as _sig
    p = _browser_pid_path(sid)
    try:
        pid = int(p.read_text().strip())
        os.kill(pid, _sig.SIGTERM)
        p.unlink()
        return True
    except (OSError, ValueError, ProcessLookupError):
        try:
            p.unlink()
        except OSError:
            pass
        return False


# ---------------------------------------------------------------------------
# Per-session temp file paths (all inside _session_dir)
# ---------------------------------------------------------------------------

def _master_sock(sid: str) -> Path:
    return _session_dir(sid) / 'master.sock'

def _master_pid_file(sid: str) -> Path:
    return _session_dir(sid) / 'master.pid'

def _socks_port_file(sid: str) -> Path:
    return _session_dir(sid) / 'socks.port'



_MASTER_WIN_NAME = 'master'


def _master_ssh_args(sid, sock, socks_port, tunnel_port, cust_user):
    """Build the SSH argument list for the ControlMaster (excluding ssh itself)."""
    sdir = _session_dir(sid)
    # Proxy (inner) ssh: use verbose level from TUNL_SSH_TRACE if set, else -v.
    proxy_dbg = ssh_debug_args('master.proxy', sdir)
    if proxy_dbg:
        # ssh_debug_args returns ['-vvv', '-E', path]; serialize for the sh script.
        proxy_dbg_str = ' '.join(shlex.quote(a) for a in proxy_dbg)
    else:
        proxy_dbg_str = '-v'
    proxy_script = sdir / 'proxy-cmd.sh'
    proxy_script.write_text(
        f'#!/bin/sh\n'
        f'exec ssh {proxy_dbg_str} -o ControlPath=none -o BatchMode=yes'
        f' -p {RELAY_PORT}'
        f' {SUPPORT_USER}@{RELAY_HOST}'
        f' nc 127.0.0.1 {tunnel_port}\n'
    )
    proxy_script.chmod(0o755)
    # Outer (master) ssh: default -v + global log, or debug_args when tracing.
    dbg = ssh_debug_args('master', sdir) or ['-v', '-E', SSH_LOG_PATH]
    return [
        '-N',
        *dbg,
        '-M', '-S', str(sock),
        '-D', str(socks_port),
        '-o', 'ControlPersist=no',
        '-o', f'ProxyCommand={proxy_script}',
        '-o', 'StrictHostKeyChecking=no',
        '-o', 'UserKnownHostsFile=/dev/null',
        '-o', 'ExitOnForwardFailure=yes',
        *_agent_opt(),
        f'{cust_user}@127.0.0.1',
    ]


def _start_master_in_tmux(sid, sock, ssh_args, tunnel_port, cust_user):
    """Start or respawn the SSH ControlMaster in a visible tmux window."""
    sock_q = shlex.quote(str(sock))
    ssh_str = ' '.join(shlex.quote(a) for a in ssh_args)
    pane_cmd = f'rm -f {sock_q} 2>/dev/null; exec {ssh_str}'

    # Reuse existing master window if present.
    r = subprocess.run(
        ['tmux', 'list-windows', '-F', '#{window_name} #{window_index}'],
        stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, universal_newlines=True,
    )
    existing_idx = None
    for line in (r.stdout or '').strip().split('\n'):
        parts = line.split(None, 1)
        if parts and parts[0] == _MASTER_WIN_NAME:
            existing_idx = parts[1] if len(parts) > 1 else '0'
            break

    if existing_idx is not None:
        subprocess.run(
            ['tmux', 'respawn-pane', '-k', '-t', f':{existing_idx}', pane_cmd],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )
    else:
        subprocess.run(
            ['tmux', 'new-window', '-d', '-n', _MASTER_WIN_NAME, pane_cmd],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )

    target = f':{_MASTER_WIN_NAME}'
    for opt, val in [
        ('remain-on-exit', 'on'),
    ]:
        subprocess.run(
            ['tmux', 'set-option', '-p', '-t', f'{target}.0', opt, val],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )
    border_label = (
        f'#[bold][SSH MASTER]#[nobold]'
        f'  {_tmux_fmt(cust_user)}@customer via tunnel:{tunnel_port}'
        f'  ~C=add-forward  ~.=disconnect  Enter=reconnect'
    )
    subprocess.run(
        ['tmux', 'set-option', '-w', '-t', target,
         'pane-border-status', 'top'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    subprocess.run(
        ['tmux', 'set-option', '-p', '-t', f'{target}.0',
         'pane-border-format', border_label],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )


def _ensure_customer_master(sid: str, tunnel_port: int, cust_user: str) -> int:
    """Ensure an SSH ControlMaster to the customer is running.

    When running inside a tmux session (e.g. a join session), the master
    opens in a visible window with SSH -v output so the support engineer
    can monitor the connection and use ~C escapes to add forwards.

    The master carries a SOCKS5 proxy (-D) so slaves and the browser can
    share the same connection.  Idempotent: if a live master exists,
    returns its port.  Returns the local SOCKS5 port number.
    """
    sock = _master_sock(sid)
    port_file = _socks_port_file(sid)

    # Check if an existing master is still alive.
    check = subprocess.run(
        ['ssh', '-O', 'check', '-S', str(sock), '127.0.0.1'],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    )
    if check.returncode == 0:
        try:
            return int(port_file.read_text().strip())
        except (OSError, ValueError):
            pass  # port file missing -- fall through to respawn

    sock.unlink(missing_ok=True)

    socks_port = _find_free_socks_port(SOCKS_PORT)
    port_file.write_text(str(socks_port))

    ssh_args = _master_ssh_args(sid, sock, socks_port, tunnel_port, cust_user)

    trace('master_spawn', sid=sid, tunnel_port=tunnel_port,
          socks_port=socks_port, in_tmux=bool(os.environ.get('TMUX')))
    if os.environ.get('TMUX'):
        _start_master_in_tmux(sid, sock, ['ssh'] + ssh_args,
                              tunnel_port, cust_user)
    else:
        master = subprocess.Popen(
            ['ssh', '-q'] + ssh_args + ['-o', 'ControlPersist=yes'],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            start_new_session=True,
        )
        _master_pid_file(sid).write_text(str(master.pid))

    # Wait for the control socket so slaves can connect immediately.
    deadline = time.time() + 15
    while not sock.exists() and time.time() < deadline:
        time.sleep(0.2)

    if not sock.exists():
        trace('master_failed', sid=sid, reason='no_ctl_socket')
        raise RuntimeError(
            f'SSH ControlMaster failed to start (check {SSH_LOG_PATH})')

    trace('master_ready', sid=sid, socks_port=socks_port)
    return socks_port


def _cleanup_session_local(sid: str):
    """Kill the SSH ControlMaster and remove the per-session temp directory."""
    import signal as _sig
    sdir = Path(f'/tmp/tunl-{sid}')
    sock = sdir / 'master.sock'
    # Graceful shutdown via control socket.
    subprocess.run(
        ['ssh', '-O', 'exit', '-S', str(sock), '127.0.0.1'],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    )
    # Fallback: SIGTERM the saved PID.
    pid_file = sdir / 'master.pid'
    try:
        pid = int(pid_file.read_text().strip())
        try:
            os.kill(pid, _sig.SIGTERM)
        except ProcessLookupError:
            pass
    except (OSError, ValueError):
        pass
    # Kill browser if still running.
    browser_pid = sdir / 'browser.pid'
    try:
        pid = int(browser_pid.read_text().strip())
        os.kill(pid, _sig.SIGTERM)
    except (OSError, ValueError, ProcessLookupError):
        pass
    try:
        browser_pid.unlink()
    except OSError:
        pass
    # Kill the join tmux server to prevent orphaned processes.
    join_sock = str(sdir / 'tmux.sock')
    subprocess.run(
        ['tmux', '-S', join_sock, 'kill-server'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    # Remove the per-session directory (unless --debug marker is present).
    if (sdir / '.debug').exists():
        print(f'{PREFIX} --debug: session logs preserved in {sdir}',
              file=sys.stderr)
    else:
        shutil.rmtree(sdir, ignore_errors=True)


def _detect_browser(preference='auto'):
    """Return (family, binary) or None if nothing found.

    preference: 'auto' | 'chrome' | 'chromium' | 'firefox'
    """
    if preference in ('auto', 'chrome', 'chromium'):
        for b in _CHROMIUM_BINS:
            if shutil.which(b):
                return ('chromium', b)
    if preference in ('auto', 'firefox'):
        for b in _FIREFOX_BINS:
            if shutil.which(b):
                return ('firefox', b)
    if preference not in ('auto', 'chrome', 'chromium', 'firefox'):
        # Treat unknown preference as a literal binary name.
        if shutil.which(preference):
            return ('chromium', preference)   # assume chromium-compatible
    return None


def _launch_browser(family, binary, socks_port):
    """Start a browser instance configured to use a SOCKS5 proxy.

    Returns the Popen object (caller must eventually terminate it).
    The browser gets its own temp profile so it doesn't disturb the user's
    default profile and closes cleanly when killed.
    """
    proxy = f'socks5://127.0.0.1:{socks_port}'

    profile_dir = None

    if family == 'chromium':
        cmd = [
            binary,
            f'--proxy-server={proxy}',
            '--temp-profile',
            '--no-first-run',
            '--no-default-browser-check',
            '--new-window',
            'about:blank',
        ]
    else:  # firefox
        profile_dir = tempfile.mkdtemp(prefix='tunl-ff-')
        user_js = os.path.join(profile_dir, 'user.js')
        with open(user_js, 'w') as fh:
            fh.write('user_pref("network.proxy.type", 1);\n')
            fh.write(f'user_pref("network.proxy.socks", "127.0.0.1");\n')
            fh.write(f'user_pref("network.proxy.socks_port", {socks_port});\n')
            fh.write('user_pref("network.proxy.socks_version", 5);\n')
            fh.write('user_pref("network.proxy.socks_remote_dns", true);\n')
        cmd = [
            binary,
            '--no-remote',
            '--profile', profile_dir,
            '--new-window',
            'about:blank',
        ]

    proc = subprocess.Popen(
        cmd,
        stdout=subprocess.DEVNULL,
        stderr=subprocess.DEVNULL,
    )
    proc._tunl_profile_dir = profile_dir
    return proc


def _cleanup_browser(proc):
    try:
        proc.terminate()
        proc.wait(timeout=5)
    except Exception:
        pass
    profile_dir = getattr(proc, '_tunl_profile_dir', None)
    if profile_dir:
        try:
            shutil.rmtree(profile_dir, ignore_errors=True)
        except Exception:
            pass


# ---------------------------------------------------------------------------
# Browser profile management
# ---------------------------------------------------------------------------

PROFILES_DIR = Path.home() / '.local' / 'share' / 'linbit-tunl' / 'profiles'


def _list_profiles():
    """Return sorted list of profile directory names under PROFILES_DIR."""
    PROFILES_DIR.mkdir(parents=True, exist_ok=True)
    return sorted(
        p.name for p in PROFILES_DIR.iterdir()
        if p.is_dir() and p.name.startswith('tunl-')
    )


def _select_profile(session_info=None):
    """Interactive profile picker.

    Returns (profile_dir: Path, is_temporary: bool).

    session_info is the session dict from the API (used to suggest a profile
    name based on the customer's hostname).
    """
    profiles = _list_profiles()
    print()
    for i, name in enumerate(profiles, 1):
        print(f"  {i:3d}.  {name}")
    if profiles:
        print()
    print(f"    n.  Create new named profile")
    print(f"    t.  Use throw-away profile (no persistence)")
    if session_info:
        host = session_info.get('host', '')
        if host:
            print(f"         (suggested name for this system: {host})")
    print()

    try:
        raw = input("Profile [number / n / t]: ").strip().lower()
    except (EOFError, KeyboardInterrupt):
        print()
        sys.exit(0)

    if raw == 't':
        return None, True

    if raw == 'n':
        try:
            name_input = input("Profile name (stored as tunl-<name>): ").strip()
        except (EOFError, KeyboardInterrupt):
            print()
            sys.exit(0)
        if not name_input:
            print("Name cannot be empty.", file=sys.stderr)
            sys.exit(1)
        profile_name = (f'tunl-{name_input}'
                        if not name_input.startswith('tunl-') else name_input)
        d = PROFILES_DIR / profile_name
        d.mkdir(parents=True, exist_ok=True)
        return d, False

    try:
        idx = int(raw) - 1
        if 0 <= idx < len(profiles):
            return PROFILES_DIR / profiles[idx], False
    except ValueError:
        pass

    print(f"Invalid selection '{raw}'.", file=sys.stderr)
    sys.exit(1)


def _launch_browser_with_profile(binary, profile_dir, socks_port):
    """Launch Chrome/Chromium with an explicit or temp profile and SOCKS5 proxy.

    If profile_dir is None, uses --temp-profile (Chrome manages and cleans up).
    """
    profile_flag = (['--temp-profile'] if profile_dir is None
                    else [f'--user-data-dir={profile_dir}'])
    cmd = [
        binary,
        *profile_flag,
        f'--proxy-server=socks5://127.0.0.1:{socks_port}',
        '--no-first-run',
        '--no-default-browser-check',
        '--disable-sync',
        '--new-window',
        'about:blank',
    ]
    return subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


# ---------------------------------------------------------------------------
# tunl browser <session-id> [--profile NAME]
# ---------------------------------------------------------------------------

def cmd_browser(sid, profile=None, kill=False):
    """Open a browser pre-configured with a SOCKS5 proxy through the relay.

    The SOCKS proxy reaches the customer's internal network via the TCP
    reverse tunnel (requires customer sshd and support keys in authorized_keys).

    Profile management:
      - Named profiles (tunl-<name>) persist across sessions.  Import CA certs
        or configure per-cluster settings once; they are reused automatically.
      - --profile NAME uses that profile directly (created if it does not exist).
      - Without --profile an interactive menu lets you pick an existing profile,
        create a new named one, or use a throw-away profile (cleaned up on exit).
    """
    if kill:
        if _kill_browser_by_pid(sid):
            print(f"Browser for {sid} terminated.")
        else:
            print(f"No browser PID on record for {sid}.", file=sys.stderr)
        return

    if not _has_display():
        print(f"{PREFIX} ERROR: no graphical display available "
              f"($DISPLAY and $WAYLAND_DISPLAY are unset).", file=sys.stderr)
        sys.exit(1)

    if _display_maybe_inaccessible():
        print(f"{PREFIX} WARNING: you appear to be connected via SSH inside a "
              f"multiplexer (tmux/screen).", file=sys.stderr)
        print(f"{PREFIX}          The display ($DISPLAY={os.environ.get('DISPLAY','')}) "
              f"may belong to a desktop you cannot reach from here.", file=sys.stderr)
        try:
            ans = input(f"{PREFIX} Launch browser anyway? [y/N] ").strip().lower()
        except (EOFError, KeyboardInterrupt):
            print()
            sys.exit(0)
        if ans not in ('y', 'yes'):
            sys.exit(0)

    found = _detect_browser('auto')
    if found is None:
        print(f"{PREFIX} ERROR: no Chrome/Chromium browser found in PATH.",
              file=sys.stderr)
        sys.exit(1)
    _family, binary = found

    s = _get_session(sid)

    if profile:
        pname = (f'tunl-{profile}'
                 if not profile.startswith('tunl-') else profile)
        profile_dir = PROFILES_DIR / pname
        profile_dir.mkdir(parents=True, exist_ok=True)
    else:
        profile_dir, is_tmp = _select_profile(session_info=s)
        if is_tmp:
            profile_dir = None   # --temp-profile; Chrome cleans up itself

    try:
        port, socks_port, cust_user = _ensure_tunnel_and_master(sid)
    except RuntimeError as e:
        print(f"{PREFIX} ERROR: tunnel upgrade failed: {e}", file=sys.stderr)
        sys.exit(1)

    browser_proc = None
    try:
        if _wait_for_socks(socks_port):
            print(f"SOCKS5 proxy ready on 127.0.0.1:{socks_port}; "
                  f"launching {binary}.")
        else:
            print(f"{PREFIX} WARNING: SOCKS5 proxy not ready; launching browser anyway.",
                  file=sys.stderr)

        browser_proc = _launch_browser_with_profile(binary, profile_dir, socks_port)
        _save_browser_pid(sid, browser_proc.pid)
        browser_proc.wait()
    finally:
        if browser_proc is not None:
            _cleanup_browser(browser_proc)
        _browser_pid_path(sid).unlink(missing_ok=True)


# ---------------------------------------------------------------------------
# Shared tunnel + ControlMaster helpers
# ---------------------------------------------------------------------------

def _get_session(sid):
    """Fetch session details from relay API; exit on error."""
    try:
        return _api_get(f'/api/v1/sessions/{sid}')
    except RuntimeError as e:
        print(f"{PREFIX} ERROR: {e}", file=sys.stderr)
        sys.exit(1)


def _wait_for_active(sid: str) -> dict:
    """Poll until a pending session becomes active; return session dict.

    Prints a waiting message then dots every 3 s.  Exits zero on Ctrl-C,
    or with an error if the pending record disappears before the customer
    connects (expired or cancelled).
    """
    print(f"Session {sid} -- waiting for the customer to connect...",
          flush=True)
    print(f"(Ctrl-C to cancel)", flush=True)
    try:
        while True:
            time.sleep(3)
            try:
                s = _api_get(f'/api/v1/sessions/{sid}')
                print(f"\nCustomer connected.", flush=True)
                return s
            except RuntimeError as e:
                if '404' not in str(e):
                    print(f"\n{PREFIX} ERROR: {e}", file=sys.stderr)
                    sys.exit(1)
            # Still pending?
            try:
                _api_get(f'/api/v1/pending/{sid}')
            except RuntimeError:
                print(f"\n{PREFIX} ERROR: session {sid} expired or was cancelled.",
                      file=sys.stderr)
                sys.exit(1)
            print('.', end='', flush=True)
    except KeyboardInterrupt:
        print()
        sys.exit(0)


def _request_tunnel_upgrade(sid: str) -> int:
    """SSH to relay, request tunnel upgrade, return the tunnel port.

    The broker picks a free port on the relay, tells the customer to open the
    reverse tunnel, and reports the port back via TUNNEL_PORT N on stdout.
    """
    trace('upgrade_request', sid=sid)
    remote_cmd = f'python3 {SHARE_RELAY_SCRIPT} consume {sid} --upgrade'
    dbg = ssh_debug_args('upgrade', _session_dir(sid)) or ['-E', SSH_LOG_PATH]
    result = subprocess.run(
        ['ssh',
         *dbg,
         '-p', str(RELAY_PORT),
         '-o', 'ControlPath=none',
         '-o', 'BatchMode=yes',
         '-o', 'ConnectTimeout=15',
         *_agent_opt(),
         f'{SUPPORT_USER}@{RELAY_HOST}',
         remote_cmd],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=120,
    )
    for line in result.stdout.splitlines():
        if line.startswith('TUNNEL_PORT '):
            port = int(line.split()[1])
            trace('upgrade_accepted', sid=sid, port=port)
            return port
        if line.startswith('TUNNEL_REJECTED '):
            reason = line[len('TUNNEL_REJECTED '):]
            trace('upgrade_rejected', sid=sid, reason=reason)
            raise RuntimeError(f'rejected by session policy: {reason}')
    trace('upgrade_no_response', sid=sid, rc=result.returncode,
          stderr=result.stderr.strip()[:200])
    raise RuntimeError(result.stderr.strip() or 'no TUNNEL_PORT in response')


def _ensure_tunnel_and_master(sid):
    """Lazy tunnel upgrade + ControlMaster. Returns (port, socks_port, cust_user)."""
    s = _get_session(sid)
    cust_user = s.get('user') or 'unknown'
    port = s.get('tunnel_port')
    if not port:
        print(f"Enabling reverse tunnel for {sid}...", flush=True)
        port = _request_tunnel_upgrade(sid)
        print(f"Tunnel ready on relay port {port}.", flush=True)
    socks_port = _ensure_customer_master(sid, port, cust_user)
    return port, socks_port, cust_user



# ---------------------------------------------------------------------------
# tunl expect [--ttl MINUTES]
# ---------------------------------------------------------------------------

def cmd_expect(args):
    ttl = None
    i = 0
    while i < len(args):
        if args[i] == '--ttl':
            i += 1
            if i >= len(args):
                print("Usage: tunl expect [--ttl MINUTES]", file=sys.stderr)
                sys.exit(1)
            try:
                ttl = int(args[i])
            except ValueError:
                print(f"ERROR: --ttl requires an integer, got '{args[i]}'",
                      file=sys.stderr)
                sys.exit(1)
        else:
            print(f"Unknown argument: {args[i]}", file=sys.stderr)
            sys.exit(1)
        i += 1

    payload = {}
    if ttl is not None:
        payload['ttl_minutes'] = ttl

    status, data = _api_post('/api/v1/expected', payload)
    if status != 201:
        print(f"{PREFIX} ERROR: could not create token (HTTP {status})", file=sys.stderr)
        sys.exit(1)

    token      = data['token']
    expires_at = data['expires_at']
    ttl_min    = data['ttl_minutes']

    print(f"Pre-registration token created.")
    print(f"Token:   {token}")
    print(f"Expires: {_fmt_time(expires_at)} ({ttl_min} minutes)")
    print()
    print(f"Share this token with the customer (verbally or via case system).")
    print(f"Customer runs:  sudo linbit-tunl.py --token {token}")


# ---------------------------------------------------------------------------
# tunl join  (share-mode consumer)
# ---------------------------------------------------------------------------

SHARE_RELAY_SCRIPT = os.environ.get(
    'TUNL_SHARE_SCRIPT',
    '/usr/local/lib/linbit-tunl/relay-share.py',
)

JOIN_TMUX_NAME = 'tunl-join'


def _tmux_key_display(key):
    """Render a tmux key spec (e.g. 'C-a', 'M-Space', 'F12') as a label.

    'C-' becomes 'Ctrl-', 'M-' becomes 'Alt-', 'S-' becomes 'Shift-'.
    Other modifiers and the base key are passed through verbatim.
    """
    parts = key.split('-')
    out = []
    for m in parts[:-1]:
        out.append({'C': 'Ctrl', 'M': 'Alt', 'S': 'Shift'}.get(m, m))
    out.append(parts[-1])
    return '-'.join(out)


def _resolve_tmux_prefix():
    """Read TUNL_PREFIX from env, return (tmux_key, display_label).

    Default 'C-b' / 'Ctrl-b' preserves prior behavior.  Empty / unset env
    falls through to the default; further validation happens when the
    value is applied to tmux.
    """
    raw = os.environ.get('TUNL_PREFIX', '').strip() or 'C-b'
    return raw, _tmux_key_display(raw)


def _apply_tmux_prefix(tmux_argv, target, prefix_key):
    """Set -g prefix on the given tmux session, plus a send-prefix binding.

    tmux_argv is the command list including socket flags ('tmux', '-S',
    sock, ...).  target is the session name passed via -t.

    Returns the prefix_key that was actually applied (the requested one
    if tmux accepted it; 'C-b' on rejection).  Logs a warning on
    rejection -- we do not abort the session.
    """
    if prefix_key == 'C-b':
        return 'C-b'
    r = subprocess.run(
        [*tmux_argv, 'set-option', '-t', target, '-g', 'prefix', prefix_key],
        stderr=subprocess.PIPE, universal_newlines=True,
    )
    if r.returncode != 0:
        msg = (r.stderr or '').strip() or f'rejected by tmux'
        print(f"{PREFIX} TUNL_PREFIX={prefix_key!r} {msg}; using Ctrl-b.",
              file=sys.stderr)
        return 'C-b'
    subprocess.run(
        [*tmux_argv, 'bind-key', '-T', 'prefix', prefix_key, 'send-prefix'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    return prefix_key


def _render_cheat_sheet(template, prefix_label):
    """Render a cheat-sheet template with the active prefix label.

    Cheat sheets are written with 'Ctrl-b' as the placeholder for the
    prefix.  When TUNL_PREFIX selects a different key we substitute
    here.  No-op when the prefix is the default.
    """
    if prefix_label == 'Ctrl-b':
        return template
    return template.replace('Ctrl-b', prefix_label)


def _write_set_prefix_helper(session_dir, template_path, cheat_path):
    """Generate set-prefix.py in session_dir and return its path.

    Invoked from the "Change tmux prefix..." menu entry's
    command-prompt: applies a new prefix to the running tmux server
    (whose socket is reachable via $TMUX, exported by run-shell) and
    re-renders the cheat-sheet file from its template.  Other one-shot
    labels (status bar, pane borders) stay set to the old key until
    reconnect; the helper says so via tmux display-message.
    """
    helper = session_dir / 'set-prefix.py'
    body = (
        f"#!{sys.executable}\n"
        "import subprocess, sys\n"
        f"TEMPLATE = {str(template_path)!r}\n"
        f"CHEAT    = {str(cheat_path)!r}\n"
        "key = sys.argv[1].strip() if len(sys.argv) > 1 else ''\n"
        "if not key:\n"
        "    subprocess.run(['tmux', 'display-message',\n"
        "                    'No prefix given; nothing changed.'])\n"
        "    sys.exit(0)\n"
        "r = subprocess.run(['tmux', 'set-option', '-g', 'prefix', key],\n"
        "                   stderr=subprocess.PIPE, universal_newlines=True)\n"
        "if r.returncode != 0:\n"
        "    err = (r.stderr or '').strip() or 'rejected by tmux'\n"
        "    subprocess.run(['tmux', 'display-message', f'Prefix {key!r} {err}'])\n"
        "    sys.exit(0)\n"
        "subprocess.run(['tmux', 'bind-key', '-T', 'prefix', key, 'send-prefix'])\n"
        "parts = key.split('-')\n"
        "label = '-'.join([{'C': 'Ctrl', 'M': 'Alt', 'S': 'Shift'}.get(m, m)\n"
        "                  for m in parts[:-1]] + [parts[-1]])\n"
        "with open(TEMPLATE) as fh:\n"
        "    tpl = fh.read()\n"
        "with open(CHEAT, 'w') as fh:\n"
        "    fh.write(tpl if label == 'Ctrl-b' else tpl.replace('Ctrl-b', label))\n"
        "subprocess.run(['tmux', 'display-message',\n"
        "    f'Prefix is now {label}.  Status/pane labels show the old key '\n"
        "    f'until reconnect.'])\n"
    )
    helper.write_text(body)
    helper.chmod(0o755)
    return helper


_SUPPORT_CHEAT_SHEET = """\
LINBIT tunl -- support cheat sheet
====================================

Ctrl-b is the tmux prefix.

Hotkeys:
  Ctrl-b 0  /  Ctrl-b 1
                    switch to main (0) or chat (1) window
  Ctrl-b C-c        toggle main / chat window
  Ctrl-b z          toggle customer-side zoom
  Ctrl-b V          toggle viewport (strict / relaxed)
  Ctrl-b t          open SSH window to customer (not in restricted mode)
  Ctrl-b d          detach (resume with `tmux attach`)
  Ctrl-b Q          close my window (customer continues)
  F1  /  Ctrl-b Enter
                    open the action menu
  Ctrl-b ?          show this cheat sheet

Dead consumer pane (e.g. after SESSION_ENDED):
  Enter             re-launch consumer (status 42 stays dead, popup shows)

Chat window (curses input on window 1):
  /help             list available slash commands
  /who              show customer + connected viewers
  /me <action>      send an emote-style chat line
  /nick <name>      set your display name (propagates to peers)
  /note <text>      record a session-scoped note (relay-side)
  /search <regex>   filter scrollback;  /search alone clears
  /quit             disconnect chat (Ctrl-b Q closes the window)
  Tab               complete @nick or /command
  PgUp  /  PgDn     scroll chat history

Menu (F1 / Ctrl-b Enter): full action list with mnemonics for SSH,
proxy shell, browser profiles, and resize controls.
"""


def _join_sock_path(sid: str) -> str:
    """Per-join-session tmux socket path (isolates from user tmux.conf)."""
    return str(_session_dir(sid) / 'tmux.sock')


def _resolve_join_display_name() -> str:
    """Return a display name for the support engineer joining a session."""
    explicit = os.environ.get('TUNL_DISPLAY_NAME', '').strip()
    if explicit:
        return explicit
    try:
        import pwd
        gecos = pwd.getpwuid(os.getuid()).pw_gecos
        name = gecos.split(',')[0].strip()
        if name:
            return name
    except Exception:
        pass
    try:
        r = subprocess.run(
            ['git', 'config', 'user.name'],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=3,
        )
        name = r.stdout.strip()
        if name:
            return name
    except Exception:
        pass
    return f'{os.environ.get("USER", "support")}@{socket.gethostname()}'


def _ssh_consume_cmd(sid: str, read_only: bool, chat: bool = False,
                     status: bool = False, name: str = '') -> list:
    """Build an SSH command that runs relay-share.py consume on the relay."""
    flags = ''
    if read_only:
        flags += ' --read-only'
    if chat:
        flags += ' --chat'
    if status:
        flags += ' --status'
    if name:
        flags += f' --name {shlex.quote(name)}'
    remote = f'python3 {SHARE_RELAY_SCRIPT} consume {_sq(sid)}{flags}'
    ssh_flags = ['-t'] if not status and not chat else []
    purpose = ('consume.status' if status
               else 'consume.chat' if chat
               else 'consume.main')
    dbg = ssh_debug_args(purpose, _session_dir(sid)) or ['-E', SSH_LOG_PATH]
    return [
        'ssh', *ssh_flags,
        *dbg,
        '-p', str(RELAY_PORT),
        '-o', 'ControlPath=none',
        '-o', 'BatchMode=yes',
        '-o', 'ConnectTimeout=15',
        '-o', 'ServerAliveInterval=15',
        '-o', 'ServerAliveCountMax=3',
        *_agent_opt(),
        f'{SUPPORT_USER}@{RELAY_HOST}',
        remote,
    ]


def _pick_session(sid: str, mode_filter=None) -> str:
    """Return sid unchanged if given; otherwise query the relay and pick one.

    Includes both active sessions and pending (not-yet-connected) sessions.
    mode_filter: if set, only active sessions whose mode field contains this
    string are offered ('share' excludes tunnel-only).  Pending sessions are
    always included regardless of mode_filter (they have no mode yet).
    When exactly one session matches it is selected automatically; more than
    one triggers an interactive numbered prompt; zero triggers an error exit.
    """
    if sid:
        return sid

    try:
        sessions, pending = _fetch_all_sessions()
    except RuntimeError as e:
        print(f"{PREFIX} ERROR: could not fetch sessions: {e}", file=sys.stderr)
        sys.exit(1)

    if mode_filter:
        sessions = [s for s in sessions if mode_filter in s.get('mode', 'share')]

    # Annotate entries so the prompt can distinguish them.
    entries = [dict(s, _state='pending') for s in pending] + \
              [dict(s, _state='active')  for s in sessions]

    if not entries:
        what = f"(mode={mode_filter}) " if mode_filter else ""
        print(f"{PREFIX} No active or pending sessions {what}found on relay.",
              file=sys.stderr)
        sys.exit(1)

    if len(entries) == 1:
        chosen = entries[0]['id']
        label = entries[0].get('host') or entries[0].get('customer') or '?'
        state = entries[0]['_state']
        print(f"Connecting to {state} session {chosen}  ({label})",
              flush=True)
        return chosen

    # Multiple sessions -- prompt.
    now = int(time.time())
    print(f"Multiple sessions available:")
    for i, s in enumerate(entries, 1):
        state = s['_state']
        if state == 'pending':
            age = _fmt_age(now - s.get('created_at', now))
            label = s.get('customer') or '(customer not yet connected)'
        else:
            age = _fmt_age(now - s.get('connected_at', now))
            label = s.get('host', '?')
        case = s.get('case', '')
        case_str = f"  case={case}" if case else ''
        print(f"  [{i}]  {s['id']:<30}  [{state}]  {label}  {age}{case_str}")
    try:
        choice = input(f"Select session [1-{len(entries)}]: ").strip()
        idx = int(choice) - 1
        if not 0 <= idx < len(entries):
            raise ValueError
    except (ValueError, EOFError):
        print("Aborted.", file=sys.stderr)
        sys.exit(1)
    return entries[idx]['id']


def cmd_join(sid: str = '', read_only: bool = False, debug: bool = False):
    """Watch a customer's share session in a 3-window local tmux session.

    Window layout:
      0  main    -- full VT stream, full height; keystrokes -> customer main pane
      1  status  -- relay session info, auto-refreshed every 5s
      2  chat    -- customer chat; keystrokes -> customer chat pane

    Two separate SSH consumers connect to the broker (one for main, one for
    chat).  The chat window has monitor-activity on so its tab blinks when
    the customer sends a message.

    Key bindings (scoped to this join session):
      Ctrl-b 0/1/2  switch windows
      Ctrl-b Q      kill the join session (disconnect)
      Enter         on a dead pane: reconnect; on a live pane: pass through

    The session is named '{JOIN_TMUX_NAME}-SID' so multiple engineers can
    join the same session concurrently from the same machine.

    With --read-only neither consumer forwards keystrokes or RESIZE.
    If sid is omitted, the relay is queried and the session is picked
    automatically.
    """
    sid = _pick_session(sid, mode_filter='share')
    init('support', sid=sid,
         path=str(_session_dir(sid) / 'trace.jsonl'),
         lastgasp_path=str(_session_dir(sid) / 'lastgasp.jsonl'))
    trace('join_begin', sid=sid, read_only=read_only, debug=debug)

    # Resolve session details; if the session is still pending, wait for the
    # customer to connect before setting up the local tmux environment.
    try:
        s = _api_get(f'/api/v1/sessions/{sid}')
    except RuntimeError as e:
        if '404' not in str(e):
            trace('join_api_error', sid=sid, err=str(e))
            print(f"{PREFIX} ERROR: {e}", file=sys.stderr)
            sys.exit(1)
        try:
            _api_get(f'/api/v1/pending/{sid}')
        except RuntimeError:
            trace('join_not_found', sid=sid)
            print(f"{PREFIX} ERROR: session {sid} not found.", file=sys.stderr)
            sys.exit(1)
        s = _wait_for_active(sid)

    is_restricted = s.get('mode') == 'restricted'

    print(f"SSH debug log: {SSH_LOG_PATH}", flush=True)
    session_name = f'{JOIN_TMUX_NAME}-{sid}'
    sock = _join_sock_path(sid)
    _tx = ['tmux', '-f', '/dev/null', '-S', sock]

    # Re-attach if a session for this SID is already running.
    r = subprocess.run(
        [*_tx, 'has-session', '-t', session_name],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    if r.returncode == 0:
        print(f"Attaching to existing join session {session_name}",
              flush=True)
        os.execvp(_tx[0], [*_tx, 'attach-session', '-t', session_name])

    display_name = _resolve_join_display_name()

    # Two-window layout: status is folded into the chat window's
    # pinned header (driven off SESSION_INFO, which the broker fans to
    # chat consumers with dedup).
    #
    # Main window runs tunl.py --_viewport_filter: a Python wrapper
    # that spawns the SSH subprocess, demuxes v4 frames from the
    # broker, and drives the support-side viewport mode (relaxed /
    # strict) state machine via tmux resize-window.  The SSH subprocess
    # runs `relay-share.py consume SID` on the relay as a transparent
    # byte-copy, so v4 frames reach this filter intact.
    # --tmux-target is omitted: the viewport filter runs as the pane
    # process and reads $TMUX_PANE from the env, so it gets the live
    # pane id for free.  Tests still pass --tmux-target explicitly.
    _viewport_args = [
        sys.executable, os.path.abspath(sys.argv[0]),
        '--_viewport_filter', sid,
        '--tmux-sock', sock,
        '--name', display_name,
    ]
    if read_only:
        _viewport_args.append('--read-only')
    if debug:
        _viewport_args.append('--debug')
    _viewport_args.append('--')
    _viewport_args.extend(_ssh_consume_cmd(sid, read_only, name=display_name))
    main_ssh = ' '.join(shlex.quote(a) for a in _viewport_args)

    # Chat window runs tunl.py --_chat_window: a curses UI that spawns
    # the SSH consumer as a subprocess and renders pinned status rows
    # above the scrollback.
    _chat_args = [sys.executable, os.path.abspath(sys.argv[0]),
                  '--_chat_window', sid, '--name', display_name]
    if read_only:
        _chat_args.append('--read-only')
    if debug:
        _chat_args.append('--debug')
    chat_cmd = ' '.join(shlex.quote(a) for a in _chat_args)

    # Reconnect banner (shown only on respawn, not first launch) then
    # exec the wrapper / chat UI.  Using exec makes the wrapper the
    # pane process so SIGWINCH from tmux reaches it directly.  The
    # wrapper exits 42 on SESSION_ENDED; remain-on-exit + respawn-pane
    # handles the reconnect loop.
    _reconnect_prefix = r'[ -n "$TUNL_RECONNECT" ] && printf "\033[33m[tunl] Reconnecting...\033[m\n"; exec '
    main_ssh = _reconnect_prefix + main_ssh
    chat_cmd = _reconnect_prefix + chat_cmd

    if debug:
        (_session_dir(sid) / '.debug').touch()

    # Resolve the support-side tmux prefix from the env, then apply it
    # to the new tmux server below before we write any artifact that
    # references the prefix label.  Default 'C-b' is a no-op; an
    # invalid TUNL_PREFIX is rejected by tmux, _apply_tmux_prefix logs
    # a warning and falls back to 'C-b' so the rest of the function
    # renders the correct label.
    _prefix_key, _prefix_label = _resolve_tmux_prefix()

    print(f"Starting join session '{session_name}'...", flush=True)

    # Use actual terminal dimensions so both windows start at the right size.
    _term_cols, _term_rows = shutil.get_terminal_size(fallback=(220, 50))

    # Window 0: main SSH consumer (full height, full width).
    subprocess.run(
        [*_tx, 'new-session', '-d', '-s', session_name,
         '-x', str(_term_cols), '-y', str(_term_rows), '-n', 'main',
         main_ssh],
        check=True,
    )

    _applied = _apply_tmux_prefix(_tx, session_name, _prefix_key)
    if _applied != _prefix_key:
        _prefix_key, _prefix_label = _applied, _tmux_key_display(_applied)

    # Shell script run by the display-popup on session close (exit 42).
    # Writing to a file avoids nested shell+tmux quoting for the popup body.
    _closed_popup = _session_dir(sid) / 'session-closed-popup.sh'
    _closed_popup.write_text(
        '#!/bin/sh\n'
        'printf "\\n"\n'
        'printf "  \\033[1;31mSESSION CLOSED\\033[m\\n\\n"\n'
        'printf "  This session has ended permanently.\\n"\n'
        'printf "  Reconnecting will not restore it.\\n\\n"\n'
        'printf "  Press any key to close this popup.\\n"\n'
        f'printf "  {_prefix_label} Q closes the join window.\\n"\n'
        'stty -icanon -echo 2>/dev/null\n'
        'dd bs=1 count=1 >/dev/null 2>&1\n'
    )
    _closed_popup.chmod(0o700)

    print(f"Windows: {_prefix_label} 0=main  {_prefix_label} 1=chat"
          " (status pinned above scrollback)",
          flush=True)
    print(f"Detach: {_prefix_label} d   Disconnect: {_prefix_label} Q",
          flush=True)

    # Window 1: chat (curses chat UI wrapping an SSH consumer; carries
    # two pinned status rows since M3).
    subprocess.run(
        [*_tx, 'new-window', '-t', f'{session_name}:1', '-n', 'chat',
         chat_cmd],
        check=True,
    )

    # Capture the live pane ids so every binding can address main/chat
    # by id rather than by window:index.  Pane ids are stable across
    # join-pane / break-pane / window-renumber, so the user can move
    # the chat into the main window or split it back out without
    # breaking the toggle, the menu's "Go to chat", or any send-keys
    # plumbing.  Also tag both panes with a role so a future
    # reconnect / inspector can find them by `list-panes -F`.
    def _pane_id(target: str) -> str:
        return subprocess.run(
            [*_tx, 'display-message', '-p', '-t', target, '#{pane_id}'],
            check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        ).stdout.decode('ascii', 'replace').strip()
    main_pane_id = _pane_id(f'{session_name}:0.0')
    chat_pane_id = _pane_id(f'{session_name}:1.0')
    for _pid, _role in ((main_pane_id, 'main'), (chat_pane_id, 'chat')):
        subprocess.run(
            [*_tx, 'set-option', '-p', '-t', _pid, '@tunl-role', _role],
            check=True,
        )

    # Keep consumer panes alive after SSH exits so the last output is readable.
    for _pid in (main_pane_id, chat_pane_id):
        subprocess.run(
            [*_tx, 'set-option', '-p', '-t', _pid, 'remain-on-exit', 'on'],
            check=True,
        )

    # Enter on a dead pane re-launches the SSH consumer; pass-through on live.
    # Exit 42 means SESSION_ENDED (relay closed the session permanently) -- in that
    # case do not respawn; the "SESSION CLOSED" banner is already on screen.
    # Set TUNL_RECONNECT=1 on respawn so the shell command can print feedback.
    # The inner if-shell is delivered as a single string argument; when tmux
    # re-parses it via cmd_parse_from_string, an unquoted `#` is a comment.
    # The format `#{==:...}` must therefore be wrapped in double quotes.
    subprocess.run(
        [*_tx, 'bind-key', '-T', 'root', 'Enter',
         'if', '-F', '#{pane_dead}',
         'if -F "#{==:#{pane_dead_status},42}" "" "respawn-pane -e TUNL_RECONNECT=1"',
         'send-keys Enter'],
        check=True,
    )
    # Ctrl-b Q kills the join session.
    subprocess.run(
        [*_tx, 'bind-key', '-T', 'prefix', 'Q',
         'kill-session', '-t', session_name],
        check=True,
    )
    _cleanup_cmd = (
        f'{sys.executable} {shlex.quote(sys.argv[0])}'
        f' --_cleanup {shlex.quote(sid)}'
    )
    subprocess.run(
        [*_tx, 'set-hook', '-t', session_name, 'session-closed',
         'unbind-key -T root Enter; unbind-key -T root F1;'
         ' unbind-key -T prefix Q; unbind-key -T prefix Enter;'
         f' run-shell {shlex.quote(_cleanup_cmd)}'],
        check=True,
    )

    # Pane-died hook: when the main consumer exits with status 42
    # (SESSION_ENDED from relay), show a prominent popup.  Any other exit
    # status is a transient disconnect -- the Enter binding handles respawn.
    _popup_cmd = f'display-popup -E -w 50 -h 9 {shlex.quote(str(_closed_popup))}'
    subprocess.run(
        [*_tx, 'set-hook', '-t', session_name, 'pane-died',
         f'if -F "#{{==:#{{pane_dead_status}},42}}" "{_popup_cmd}" ""'],
        check=True,
    )

    # Chat window: alert support when a message arrives.  The window
    # option is set on whichever window the chat pane currently lives
    # in; if the user later joins or breaks panes, the option stays
    # attached to the surviving window.
    subprocess.run(
        [*_tx, 'set-option', '-w', '-t', chat_pane_id,
         'monitor-activity', 'on'],
        check=True,
    )
    subprocess.run(
        [*_tx, 'set-option', '-t', session_name, 'visual-activity', 'on'],
        check=True,
    )

    # Per-window pane border labels.
    #
    # The main window (0) renders two state badges from window-level
    # tmux user options the viewport filter keeps up to date:
    #   @tunl-viewport   "relaxed" | "strict"
    #   @tunl-override   "" (no override) | "<cols>x<rows>"
    # The viewport filter runs `tmux set-option -w` whenever those
    # change, so the badges flip without the wrapper having to drive
    # the redraw itself.
    ro_note  = ' (read-only)'  if read_only    else ''
    rst_note = ' (restricted)' if is_restricted else ''

    # Initial values; the wrapper overwrites these when it starts.
    # Targeted by main_pane_id so the badge follows the pane if it is
    # joined into a different window.
    #   @tunl-link "" (no badge) | "<peer> lost <HH:MM:SS>"
    # Updated by the viewport wrapper on PEER_LOST / PEER_BACK frames.
    for opt, val in (('@tunl-viewport', 'relaxed'),
                     ('@tunl-override', ''),
                     ('@tunl-link', '')):
        subprocess.run(
            [*_tx, 'set-option', '-w', '-t', main_pane_id, opt, val],
            check=True,
        )

    # Format expansion:
    #   #{?#{==:#{@tunl-viewport},strict},#[reverse]VIEWPORT:strict#[noreverse],viewport:relaxed}
    #   #{?#{@tunl-override},#[fg=yellow,bold] OVERRIDE #{@tunl-override}#[default],}
    #   #{?#{@tunl-link},#[fg=red,bold] <link state> #[default],}
    _viewport_badge = (
        '#{?#{==:#{@tunl-viewport},strict},'
        '#[reverse]VIEWPORT strict#[noreverse],'
        '#[dim]viewport relaxed#[nodim]}'
    )
    _override_badge = (
        '#{?#{@tunl-override},'
        '  #[fg=yellow#,bold]OVERRIDE #{@tunl-override}#[default],}'
    )
    # Peer-state badge: empty when all peers are healthy; rendered
    # in red when the broker has reported a peer lost
    # (PEER_LOST customer / PEER_LOST support NAME).  Cleared on
    # PEER_BACK.  See docs/share.md "Liveness".  Glyph picked with
    # the same _ASCII fallback as SYM_OFF / SYM_ON elsewhere.
    _LINK_GLYPH = '!' if _ASCII else '⚠'  # warning sign
    _link_badge = (
        '#{?#{@tunl-link},'
        f'  #[fg=red#,bold]{_LINK_GLYPH} '
        '#{@tunl-link}#[default],}'
    )
    main_label = (
        f'#[bold][SHELL{ro_note}{rst_note}]#[nobold]'
        f'  {_viewport_badge}{_override_badge}{_link_badge}'
        f'  {_prefix_label}: Enter=menu 1=chat Q=disconnect'
    )
    chat_label = (
        f'#[bold][CHAT{ro_note}{rst_note}]#[nobold]'
        f'  type message + Enter  {_prefix_label}: 0=main'
    )
    for _pid, win_label in [(main_pane_id, main_label),
                            (chat_pane_id, chat_label)]:
        subprocess.run(
            [*_tx, 'set-option', '-w', '-t', _pid,
             'pane-border-status', 'top'],
            check=True,
        )
        subprocess.run(
            [*_tx, 'set-option', '-p', '-t', _pid,
             'pane-border-format', win_label],
            check=True,
        )

    # Ctrl-b t: open a new window running 'tunl ssh <sid>' (tunnel + root shell).
    # Only bound when the session is not restricted (restricted sessions block upgrade).
    _tunl_ssh_cmd = f'{sys.executable} {shlex.quote(sys.argv[0])} ssh {shlex.quote(sid)}'
    if not is_restricted:
        subprocess.run([
            *_tx, 'bind-key', '-T', 'prefix', 't',
            'new-window', '-t', session_name, '-n', 'ssh', _tunl_ssh_cmd,
        ], check=True)

    # Ctrl-b C-c: toggle between main and chat panes.  Compares the
    # active pane id to chat_pane_id and select the other.
    # Pane-id based instead of window-index so the toggle survives
    # join-pane (chat merged into main's window) and break-pane (chat
    # split out to a different window or pane index).
    # Chains select-window + select-pane: select-pane alone only sets
    # the pane active within ITS window and does not switch the
    # client's current window when the target is elsewhere.  The
    # select-window leg switches windows when needed (no-op when both
    # panes already share a window after join-pane); select-pane then
    # picks the right pane within.
    # Customer side has Ctrl-b c bound to the same toggle; here we use
    # the chord so Ctrl-b c stays free for tmux's default new-window
    # (support staff get the full tmux).
    _toggle_main_chat = (
        f'if -F "#{{==:#{{pane_id}},{chat_pane_id}}}"'
        f' "select-window -t {shlex.quote(main_pane_id)} ; '
        f'select-pane -t {shlex.quote(main_pane_id)}"'
        f' "select-window -t {shlex.quote(chat_pane_id)} ; '
        f'select-pane -t {shlex.quote(chat_pane_id)}"'
    )
    subprocess.run([
        *_tx, 'bind-key', '-T', 'prefix', 'C-c', _toggle_main_chat,
    ], check=True)

    # Ctrl-b Enter: main menu (display-menu).
    # ESC Z (\x1b\x5a) is intercepted by the viewport filter's stdin
    # loop and translated to a ZOOM CTRL frame on the broker socket.
    # Using send-keys -H because signals don't traverse the SSH
    # connection.
    _toggle_zoom  = f"send-keys -t {shlex.quote(main_pane_id)} -H 1b 5a"
    # ESC R (\x1b\x52) triggers RESIZE_OVERRIDE -- uses support's full
    # terminal dimensions, bypassing smallest-wins.  Next regular
    # resize reverts.
    _force_my_dims = f"send-keys -t {shlex.quote(main_pane_id)} -H 1b 52"
    _tunl_proxy_cmd = f'{sys.executable} {shlex.quote(sys.argv[0])} --_proxy_shell {shlex.quote(sid)}'
    _tunl_browser_temp = f'{sys.executable} {shlex.quote(sys.argv[0])} browser {shlex.quote(sid)} --profile throwaway'
    _tunl_browser_pick = f'{sys.executable} {shlex.quote(sys.argv[0])} browser {shlex.quote(sid)}'
    # Cheat-sheet template + rendered file in the session dir.  The
    # template is referenced by both the Ctrl-b ? popup binding (below)
    # and the "Change tmux prefix..." menu entry, so set both up before
    # the menu is built.
    _cheat_path     = _session_dir(sid) / 'cheat-sheet.txt'
    _template_path  = _session_dir(sid) / 'cheat-sheet.template.txt'
    _template_path.write_text(_SUPPORT_CHEAT_SHEET)
    _cheat_path.write_text(_render_cheat_sheet(_SUPPORT_CHEAT_SHEET, _prefix_label))
    _set_prefix_helper = _write_set_prefix_helper(
        _session_dir(sid), _template_path, _cheat_path)
    _menu_items = []
    if not is_restricted:
        _menu_items += [
            'SSH to customer', 's',
            f'new-window -t {shlex.quote(session_name)} -n ssh {shlex.quote(_tunl_ssh_cmd)}',
            'Proxy shell', 'x',
            f'new-window -t {shlex.quote(session_name)} -n proxy-env {shlex.quote(_tunl_proxy_cmd)}',
            'Browser (temp profile)', 'b',
            f'new-window -t {shlex.quote(session_name)} -n browser {shlex.quote(_tunl_browser_temp)}',
            'Browser (choose profile...)', 'B',
            f'new-window -t {shlex.quote(session_name)} -n browser {shlex.quote(_tunl_browser_pick)}',
        ]
        _menu_items += ['', '', '']
    # Viewport mode toggle: write 'TOGGLE' to <session_dir>/viewport.fifo.
    # The wrapper opens this FIFO O_RDWR so writes do not block on a
    # missing reader.  See _viewport_filter_main in tunl.py.
    _viewport_fifo = _session_dir(sid) / 'viewport.fifo'
    _viewport_toggle_cmd = (
        f'run-shell {_tmux_sq(f"echo TOGGLE >> {_viewport_fifo}")}')
    _menu_items += ['Go to chat', 'c',
                    f'select-window -t {shlex.quote(chat_pane_id)} ; '
                    f'select-pane -t {shlex.quote(chat_pane_id)}']
    _menu_items += ['', '', '']
    _menu_items += ['Toggle customer zoom',          'z', _toggle_zoom]
    _menu_items += ['Force my dimensions',           'R', _force_my_dims]
    _menu_items += ['Toggle viewport (strict/relaxed)', 'V',
                    _viewport_toggle_cmd]
    _menu_items += ['', '', '']
    _menu_items += ['Detach (resume with tmux attach)',          'd',
                    'detach-client']
    _menu_items += ['', '', '']
    # "Change tmux prefix..." opens a command-prompt; %1 (the user's
    # input) is substituted by tmux into the run-shell template.  The
    # set-prefix.py helper does the actual work and reports the result
    # via tmux display-message.
    _set_prefix_action = (
        f"command-prompt -p 'New tmux prefix (e.g. C-a, M-Space):'"
        f" {shlex.quote(f'run-shell {shlex.quote(str(_set_prefix_helper))} %1')}"
    )
    _menu_items += ['Change tmux prefix...', 'p', _set_prefix_action]
    _menu_items += ['', '', '']
    _menu_items += ['Close my window (customer continues)',      'Q',
                    f'kill-session -t {shlex.quote(session_name)}']
    # F1 (root) and Ctrl-b Enter (prefix) both open the menu; see the
    # matching comment in linbit-tunl.py for rationale.
    for _table, _key in (('root', 'F1'), ('prefix', 'Enter')):
        subprocess.run(
            [*_tx, 'bind-key', '-T', _table, _key, 'display-menu',
             '-T', f'#[bold]{_tmux_fmt(sid)}#[nobold]'] + _menu_items,
            check=True,
        )
    # Ctrl-b z: toggle customer-side zoom (sends ESC Z to the consumer's stdin
    # which forwards ZOOM to the broker, then to the customer's tmux).
    subprocess.run(
        [*_tx, 'bind-key', '-T', 'prefix', 'z',
         'send-keys', '-t', main_pane_id, '-H', '1b', '5a'],
        check=True,
    )
    # Ctrl-b V: direct keybinding for viewport toggle (matches the
    # menu entry above; Ctrl-b v is reserved by tmux for visual copy
    # mode, so use uppercase).
    subprocess.run(
        [*_tx, 'bind-key', '-T', 'prefix', 'V',
         'run-shell', f'echo TOGGLE >> {_viewport_fifo}'],
        check=True,
    )

    # Ctrl-b ?: project-specific cheat-sheet popup.  Shadows tmux's
    # default list-keys -- the raw tmux key list is rarely what
    # support staff want; a focused, side-specific reference is.
    # _cheat_path / _template_path were prepared above alongside the
    # menu setup so the "Change tmux prefix..." action can re-render
    # the popup contents at runtime.
    _cheat_cmd = (
        f'cat {shlex.quote(str(_cheat_path))};'
        f' printf "\\n[press Enter to close]";'
        f' read _'
    )
    subprocess.run(
        [*_tx, 'bind-key', '-T', 'prefix', '?',
         'display-popup', '-E', '-T', ' Support cheat sheet ',
         '-h', '85%', '-w', '80%', _cheat_cmd],
        check=True,
    )

    # Start on the main pane and attach.
    subprocess.run(
        [*_tx, 'select-pane', '-t', main_pane_id],
        check=True,
    )
    os.execvp(_tx[0], [*_tx, 'attach-session', '-t', session_name])


# ---------------------------------------------------------------------------
# tunl close / tunl detach
# ---------------------------------------------------------------------------

def _ssh_relay_run(remote_cmd: str):
    """Run a one-shot command on the relay via SSH (no PTY) and return stdout."""
    dbg = ssh_debug_args('relay_run', Path(tempfile.gettempdir())) \
        or ['-E', SSH_LOG_PATH]
    result = subprocess.run(
        [
            'ssh',
            *dbg,
            '-p', str(RELAY_PORT),
            '-o', 'ControlPath=none',
            '-o', 'BatchMode=yes',
            '-o', 'ConnectTimeout=15',
            *_agent_opt(),
            f'{SUPPORT_USER}@{RELAY_HOST}',
            remote_cmd,
        ],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        universal_newlines=True,
    )
    return result


def cmd_cancel(sid: str = ''):
    """Remove a session from the relay registry.

    For pending sessions: cancels the invitation before the customer connects.
    For active sessions: removes the registry entry without terminating the
    customer's process (the customer's SSH connection and tmux survive until
    they disconnect normally).

    Use 'tunl close' to actively terminate the customer's tmux session.
    """
    if not sid:
        sid = _pick_session(sid)  # no mode_filter -- includes pending
    # Try active sessions first, then pending.
    status, data = _api_delete(f'/api/v1/sessions/{sid}')
    if status == 200:
        print(f"Session {sid} removed from registry.")
        _cleanup_session_local(sid)
        return
    if status != 404:
        print(f"{PREFIX} ERROR: {data.get('error', status)}", file=sys.stderr)
        sys.exit(1)
    status, data = _api_delete(f'/api/v1/pending/{sid}')
    if status == 200:
        print(f"Pending session {sid} cancelled.")
        _cleanup_session_local(sid)
        return
    print(f"{PREFIX} ERROR: session {sid} not found.", file=sys.stderr)
    sys.exit(1)


def cmd_close(sid: str = ''):
    """Terminate the customer's share session from the support side.

    Sends TERMINATE through the broker to the customer daemon, which kills
    the customer's tmux session.  The session is deregistered automatically
    when the customer daemon exits.
    """
    sid = _pick_session(sid, mode_filter='share')
    print(f"Sending TERMINATE to session {sid}...", flush=True)
    remote = f'python3 {SHARE_RELAY_SCRIPT} control {shlex.quote(sid)} TERMINATE'
    r = _ssh_relay_run(remote)
    if r.returncode != 0:
        print(f"{PREFIX} ERROR: {r.stderr.strip() or 'ssh failed'}", file=sys.stderr)
        sys.exit(r.returncode)
    print(f"Session {sid} terminated.", flush=True)
    _cleanup_session_local(sid)


def cmd_detach(sid: str = ''):
    """Evict all current consumers of a share session.

    Useful when 'changing seats': disconnects all watching support clients so
    a clean join session can be started from a new machine.  The customer's
    share session continues uninterrupted.
    """
    sid = _pick_session(sid, mode_filter='share')
    print(f"Evicting support clients of session {sid}...", flush=True)
    remote = f'python3 {SHARE_RELAY_SCRIPT} control {shlex.quote(sid)} EVICT_CONSUMERS'
    r = _ssh_relay_run(remote)
    if r.returncode != 0:
        print(f"{PREFIX} ERROR: {r.stderr.strip() or 'ssh failed'}", file=sys.stderr)
        sys.exit(r.returncode)
    print(f"Support clients evicted.  Run 'tunl join {sid}' to rejoin.", flush=True)


# ---------------------------------------------------------------------------
# tunl pick  (interactive session chooser)
# ---------------------------------------------------------------------------

def cmd_pick():
    sessions, pending = _fetch_all_sessions()

    entries = [dict(s, _state='pending') for s in pending] + \
              [dict(s, _state='active')  for s in sessions]

    if not entries:
        print("(no active or pending sessions)")
        return

    now = int(time.time())

    if len(entries) == 1:
        # Skip table and selection prompt when there is only one session.
        s     = entries[0]
        state = s['_state']
        if state == 'pending':
            label = s.get('customer') or '(customer not yet connected)'
        else:
            label = s.get('host', '?')
        case_str = f"  case={s['case']}" if s.get('case') else ''
        print(f"1 session found: {s['id']}  [{state}]  {label}{case_str}")
        sid = s['id']
    else:
        col = "  {:<3}  {:<25}  {:<10}  {:<28}  {:<12}  {}"
        print(col.format("#", "SESSION ID", "STATE", "HOST/CUSTOMER", "CASE", "AGE"))
        print("  " + "-" * 88)
        for i, s in enumerate(entries, 1):
            state = s['_state']
            if state == 'pending':
                age  = now - s.get('created_at', now)
                host = s.get('customer', '') or '(not connected)'
            else:
                age  = now - s.get('connected_at', now)
                host = s.get('host', '?')
                ips  = ', '.join(s.get('ips', [])) if isinstance(s.get('ips'), list) \
                       else str(s.get('ips', ''))
                if ips:
                    host = f"{host} ({ips})"
            print(col.format(
                str(i),
                s.get('id', '?'),
                state,
                host[:28],
                s.get('case', '')[:12],
                _fmt_age(age),
            ))

        print()
        try:
            raw = input("Select session [1-{n}] or session ID: ".format(n=len(entries))).strip()
        except (EOFError, KeyboardInterrupt):
            print()
            sys.exit(0)

        if not raw or raw in ('q', 'quit'):
            sys.exit(0)

        # Resolve by number or prefix match.
        sid = None
        if raw.isdigit():
            idx = int(raw) - 1
            if 0 <= idx < len(entries):
                sid = entries[idx]['id']
        if sid is None:
            matches = [s['id'] for s in entries if s['id'].startswith(raw)]
            if len(matches) == 1:
                sid = matches[0]
            elif len(matches) > 1:
                print(f"Ambiguous prefix '{raw}': {', '.join(matches)}", file=sys.stderr)
                sys.exit(1)
        if sid is None:
            print(f"No session matching '{raw}'.", file=sys.stderr)
            sys.exit(1)

    # Determine the selected entry's state and mode to offer the right actions.
    selected = next((s for s in entries if s['id'] == sid), None)
    is_pending = selected and selected.get('_state') == 'pending'
    sel_mode   = (selected or {}).get('mode', 'share') if not is_pending else ''
    is_share   = not is_pending and 'tunnel' not in sel_mode

    print(f"\nSession: {sid}")
    if is_pending:
        print("  (session is pending -- waiting for customer to connect)")
        while True:
            try:
                action = _read_single_key("[i]nfo  [q]uit (default q): ")
            except (EOFError, KeyboardInterrupt):
                print()
                sys.exit(0)
            if action.lower() in ('i',):
                cmd_show(sid)
            else:
                sys.exit(0)
        return  # unreachable; loop exits via sys.exit

    if is_share:
        prompt = "[J]oin  [s]sh  [b]rowser  [i]nfo  [q]uit: "
    else:
        prompt = "[S]sh  [b]rowser  [i]nfo  [q]uit: "

    while True:
        try:
            action = _read_single_key(prompt)
        except (EOFError, KeyboardInterrupt):
            print()
            sys.exit(0)
        action_l = action.lower()
        if is_share and action_l in ('j', 'w', '\r', '\n', ''):
            cmd_join(sid)
            return
        elif not is_share and action_l in ('s', '\r', '\n', ''):
            cmd_ssh(sid)
            return
        elif action_l == 's':
            cmd_ssh(sid)
            return
        elif action_l == 'b':
            cmd_browser(sid)
            return
        elif action_l == 'i':
            cmd_show(sid)
        elif action_l in ('q', '\x03', '\x04'):  # q, Ctrl-C, Ctrl-D
            sys.exit(0)


# ---------------------------------------------------------------------------
# CLI dispatch
# ---------------------------------------------------------------------------

# ---------------------------------------------------------------------------
# tunl create-session
# ---------------------------------------------------------------------------

def cmd_create_session(session_id='', case_num='', customer='', expires_in=None,
                       join=False, ad_hoc=False):
    """Pre-register a session via relay-share.py create-session over SSH.

    Three ways to satisfy the "what is this session for?" check:
      - pass --case (preferred; customer derivable from the case number),
      - pass --customer (case often deducible from context),
      - pass both, or
      - pass --ad-hoc (no case yet; fill in later via `tunl set-meta`).

    --ad-hoc is mutually exclusive with --case/--customer; passing none
    of them is rejected so that "lazy" sessions can't slip through as
    ad-hoc when they really had a case all along.

    If session_id is empty the relay generates one automatically.
    If join is True, start tunl join immediately after registering.

    Running via SSH (not the HTTP API) gives the relay verifiable identity:
    SSH_CONNECTION (source IP) and SSH_USER_AUTH (key fingerprint).
    """
    if ad_hoc:
        if case_num or customer:
            print(f"{PREFIX} ERROR: --ad-hoc is mutually exclusive with "
                  f"--case/--customer.", file=sys.stderr)
            sys.exit(1)
    else:
        if not case_num and not customer:
            print(f"{PREFIX} ERROR: at least one of --case / --customer "
                  f"is required (or pass --ad-hoc to create a session not "
                  f"tied to a case).", file=sys.stderr)
            sys.exit(1)

    remote_args = (
        f'python3 {SHARE_RELAY_SCRIPT} create-session'
    )
    if ad_hoc:
        remote_args += ' --ad-hoc'
    else:
        if case_num:
            remote_args += f' --case {shlex.quote(case_num)}'
        if customer:
            remote_args += f' --customer {shlex.quote(customer)}'
    if session_id:
        remote_args += f' --id {shlex.quote(session_id)}'
    if expires_in is not None:
        remote_args += f' --expires-in {expires_in}'
    user = os.environ.get('USER', '')
    host = socket.gethostname()
    created_by = f'{user}@{host}' if user else host
    remote_args += f' --created-by {shlex.quote(created_by)}'

    r = _ssh_relay_run(remote_args)
    if r.returncode != 0:
        try:
            data = json.loads(r.stdout)
            err = data.get('error', r.stderr.strip() or 'unknown error')
        except (json.JSONDecodeError, ValueError):
            err = r.stderr.strip() or r.stdout.strip() or 'ssh failed'
        print(f"{PREFIX} ERROR: {err}", file=sys.stderr)
        sys.exit(1)

    try:
        data = json.loads(r.stdout)
    except (json.JSONDecodeError, ValueError):
        print(f"{PREFIX} ERROR: unexpected response: {r.stdout.strip()}", file=sys.stderr)
        sys.exit(1)

    sid = data.get('id', '?')
    exp = data.get('expires_at')
    exp_str = _fmt_time(exp) if exp else '?'
    cmd = data.get('customer_cmd', f'sudo linbit-tunl {sid}')
    case_disp     = data.get('case') or ('(ad-hoc)' if ad_hoc else '(none)')
    customer_disp = data.get('customer') or ('(ad-hoc)' if ad_hoc else '(none)')
    print(f"Session registered:  {sid}")
    print(f"Case:                {case_disp}")
    print(f"Customer:            {customer_disp}")
    cust_slug = data.get('customer_slug', '')
    case_slug = data.get('case_slug', '')
    if cust_slug and cust_slug != data.get('customer'):
        print(f"  (slug:             {cust_slug})")
    if case_slug and case_slug != data.get('case'):
        print(f"  (slug:             {case_slug})")
    cb = data.get('created_by') or {}
    cb_name = cb.get('name') if isinstance(cb, dict) else cb
    print(f"Created by:          {cb_name or created_by}")
    if isinstance(cb, dict):
        if cb.get('ip'):
            print(f"Source IP:           {cb['ip']}")
        if cb.get('key_fingerprint'):
            print(f"Key:                 {cb['key_fingerprint']}")
    print(f"Expires:             {exp_str}")
    print()
    print(f"Tell the customer to run:")
    print(f"  {cmd}")
    print()
    if join:
        print(f"Starting join session (waiting for customer)...")
        cmd_join(sid)
    else:
        print(f"Then join the session with:")
        print(f"  tunl join {sid}")


# ---------------------------------------------------------------------------
# tunl upgrade
# ---------------------------------------------------------------------------

def cmd_upgrade(session_id):
    """Request a tunnel upgrade for a share session.

    The relay broker picks a free port, tells the customer to open a reverse
    tunnel, and prints the confirmed port.  Prints TUNNEL_PORT N on success.
    """
    s = _get_session(session_id)
    existing = s.get('tunnel_port')
    if existing:
        print(f"Tunnel already active on relay port {existing}.")
        return
    print(f"Requesting tunnel upgrade for {session_id}...", flush=True)
    try:
        port = _request_tunnel_upgrade(session_id)
        print(f"Tunnel ready on relay port {port}.")
    except RuntimeError as e:
        print(f"{PREFIX} ERROR: {e}", file=sys.stderr)
        sys.exit(1)


def cmd_ssh(sid: str = ''):
    """SSH directly to the customer machine as the customer user.

    Implicitly upgrades the tunnel and starts a ControlMaster if needed.
    Opens a plain SSH shell -- no tmux wrapping.
    """
    sid = _pick_session(sid)
    try:
        port, socks_port, cust_user = _ensure_tunnel_and_master(sid)
    except RuntimeError as e:
        print(f"{PREFIX} ERROR: tunnel upgrade failed: {e}", file=sys.stderr)
        sys.exit(1)

    s = _get_session(sid)
    nets        = s.get('nets', [])
    dns_servers = s.get('dns', [])

    proxy_url  = f'socks5h://127.0.0.1:{socks_port}'
    print(f"Connecting to {sid} as {cust_user}...", flush=True)
    print(f"SOCKS5 proxy: {proxy_url}")
    if os.environ.get('TMUX'):
        _open_proxy_shell(sid, port, socks_port, cust_user, nets, dns_servers)
        print(f"  (proxy-env window opened in this tmux session)")

    sock = _master_sock(sid)
    check = subprocess.run(
        ['ssh', '-O', 'check', '-S', str(sock), '127.0.0.1'],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
    )
    if check.returncode != 0:
        print(f"{PREFIX} ERROR: ControlMaster not running (check {SSH_LOG_PATH})",
              file=sys.stderr)
        sys.exit(1)

    dbg = ssh_debug_args('ssh', _session_dir(sid)) or ['-E', SSH_LOG_PATH]
    ssh_cmd = [
        'ssh', '-t',
        *dbg,
        '-S', str(sock),
        '-o', 'ControlMaster=no',
        '-o', 'PasswordAuthentication=no',
        '-o', 'IdentityFile=/dev/null',
        f'{cust_user}@127.0.0.1',
    ]
    os.execvp('ssh', ssh_cmd)


def cmd_collect(sid: str, output: str = ''):
    """Bundle the local per-session debug directory into a tar.zst.

    For bug reports from support.  Includes everything under
    /tmp/tunl-{sid}/ (trace.jsonl, lastgasp.jsonl, ssh master socket
    leftovers, rc scripts) and a small collect.json with collector
    metadata.  Does NOT include ~/.ssh/ or SSH agent data.
    """
    sdir = Path(f'/tmp/tunl-{sid}')
    if not sdir.exists():
        print(f"{PREFIX} ERROR: no local session dir at {sdir}", file=sys.stderr)
        print(f"{PREFIX} (nothing to collect -- did you ever join/ssh this session?)",
              file=sys.stderr)
        sys.exit(1)

    ts = time.strftime('%Y%m%d-%H%M%S', time.gmtime())
    if not output:
        output = f'tunl-collect-{sid}-{ts}.tar.zst'

    # Stage a copy so we can exclude socket files and add a manifest.
    with tempfile.TemporaryDirectory(prefix='tunl-collect-') as staging_root:
        staging = Path(staging_root) / f'tunl-{sid}'
        staging.mkdir()
        for src in sdir.iterdir():
            # Skip live sockets (not representable in a tarball via copy).
            try:
                if src.is_socket():
                    continue
            except OSError:
                continue
            try:
                if src.is_dir():
                    shutil.copytree(src, staging / src.name, symlinks=True)
                else:
                    shutil.copy2(src, staging / src.name)
            except OSError as e:
                print(f"{PREFIX} warning: could not copy {src}: {e}",
                      file=sys.stderr)

        # Include the global SSH log if it exists (may span other sessions).
        if os.path.exists(SSH_LOG_PATH):
            try:
                shutil.copy2(SSH_LOG_PATH, staging / 'ssh.log')
            except OSError:
                pass

        manifest = {
            'sid':            sid,
            'collected_at':   _iso_ts(),
            'collector_host': socket.gethostname(),
            'tunl_version':   VERSION,
            'tunl_commit':    GIT_COMMIT,
            'relay_host':     RELAY_HOST,
            'support_user':   SUPPORT_USER,
        }
        (staging / 'collect.json').write_text(json.dumps(manifest, indent=2))

        if shutil.which('zstd'):
            cmd = ['tar', 'cf', '-', '-C', str(staging.parent), staging.name]
            with open(output, 'wb') as out_fh:
                tar = subprocess.Popen(cmd, stdout=subprocess.PIPE)
                try:
                    zst = subprocess.Popen(
                        ['zstd', '-q'], stdin=tar.stdout, stdout=out_fh)
                except OSError:
                    # zstd vanished between shutil.which and here, or ENOMEM.
                    # Clean up tar so it does not linger with no reader.
                    tar.stdout.close()
                    tar.terminate()
                    tar.wait()
                    raise
                tar.stdout.close()   # zst owns the reader now
                try:
                    zst.communicate()
                    tar.wait()
                finally:
                    if tar.poll() is None:
                        tar.kill()
                        tar.wait()
                rc = zst.returncode if zst.returncode else tar.returncode
        else:
            if output.endswith('.tar.zst'):
                output = output[:-len('.tar.zst')] + '.tar.gz'
            cmd = ['tar', 'czf', output, '-C', str(staging.parent), staging.name]
            rc = subprocess.run(cmd).returncode

        if rc != 0:
            print(f"{PREFIX} ERROR: tar failed (rc={rc})", file=sys.stderr)
            sys.exit(1)

    size = os.path.getsize(output)
    print(f'{PREFIX} collected {size} bytes to {output}')
    print(f'{PREFIX} attach this to your bug report along with the session ID.')


def cmd_note(sid: str, body: str = ''):
    """Session-scoped notes for async handoff between support shifts.

    body == '' -> read and print the existing notes (oldest first).
    Otherwise -> append the note via the relay API.  Author is derived
    from _resolve_join_display_name() so the caller's identity lands in
    the record without extra args.
    """
    if not body:
        try:
            notes = _api_get(f'/api/v1/sessions/{sid}/notes')
        except RuntimeError as e:
            print(f"{PREFIX} ERROR: {e}", file=sys.stderr)
            sys.exit(1)
        if not notes:
            print(f'(no notes for session {sid})')
            return
        for n in notes:
            ts = n.get('ts', 0)
            try:
                when = time.strftime('%Y-%m-%d %H:%M:%S',
                                     time.localtime(float(ts)))
            except (TypeError, ValueError):
                when = '?'
            author = n.get('author') or '?'
            body_s = n.get('body', '')
            print(f'[{when}] {author}: {body_s}')
        return

    author = _resolve_join_display_name()
    status, data = _api_post(f'/api/v1/sessions/{sid}/notes',
                              {'author': author, 'body': body})
    if status == 404:
        print(f"{PREFIX} ERROR: session {sid} not found", file=sys.stderr)
        sys.exit(1)
    if status >= 400:
        msg = data.get('error') if isinstance(data, dict) else str(data)
        print(f"{PREFIX} ERROR: {msg}", file=sys.stderr)
        sys.exit(1)
    print(f'note saved for {sid}')


# ---------------------------------------------------------------------------
# tunl set-meta
# ---------------------------------------------------------------------------

def cmd_set_meta(sid: str, case: str = '', customer: str = ''):
    """Patch a session's case and/or customer fields.

    Primary use case: an ad-hoc session that turns out to be tied to a
    real ticket gets its case (and/or customer name) attached after the
    fact, so reporting and filesystem layout pick it up.

    Both fields are optional individually but at least one must be
    given.  Validation rules mirror create-session.
    """
    if not sid:
        print(f"{PREFIX} ERROR: session id is required", file=sys.stderr)
        sys.exit(1)
    payload = {}
    if case:
        payload['case'] = case
    if customer:
        payload['customer'] = customer
    if not payload:
        print(f"{PREFIX} ERROR: pass --case and/or --customer "
              f"(nothing to patch).", file=sys.stderr)
        sys.exit(1)

    status, data = _api_patch(f'/api/v1/sessions/{sid}', payload)
    if status == 404:
        print(f"{PREFIX} ERROR: session {sid} not found", file=sys.stderr)
        sys.exit(1)
    if status >= 400:
        msg = data.get('error') if isinstance(data, dict) else str(data)
        print(f"{PREFIX} ERROR: {msg}", file=sys.stderr)
        sys.exit(1)

    print(f"Updated {sid}:")
    if 'case' in payload:
        print(f"  case:     {data.get('case', '')}")
    if 'customer' in payload:
        print(f"  customer: {data.get('customer', '')}")


# ---------------------------------------------------------------------------
# tunl cast
# ---------------------------------------------------------------------------

def cmd_cast(sid: str, output: str = ''):
    """Download the asciinema cast for a session via the relay API.

    Default output: ./{sid}.cast in the current directory.  Pass
    --output - to stream to stdout (binary).  Works for active, closed,
    and archived sessions transparently.
    """
    if not sid:
        print(f"{PREFIX} ERROR: session id is required", file=sys.stderr)
        sys.exit(1)

    if output == '-':
        out_fh = sys.stdout.buffer
        out_close = False
        out_label = '<stdout>'
    else:
        if not output:
            output = f'{sid}.cast'
        try:
            out_fh = open(output, 'wb')
        except OSError as e:
            print(f"{PREFIX} ERROR: cannot open {output}: {e}", file=sys.stderr)
            sys.exit(1)
        out_close = True
        out_label = output

    try:
        status, _ctype, err = _api_download(
            f'/api/v1/sessions/{sid}/cast', out_fh,
        )
    finally:
        if out_close:
            out_fh.close()

    if status == 200:
        if out_close:
            print(f"saved cast for {sid} to {out_label}", file=sys.stderr)
        return

    # Failed; remove any partially-written output file to avoid mistaking
    # a half-download for a real recording.
    if out_close:
        try:
            os.unlink(out_label)
        except OSError:
            pass

    if status == 404:
        msg = err or 'cast not found'
        print(f"{PREFIX} ERROR: {msg} (session id: {sid})", file=sys.stderr)
        sys.exit(1)
    if status == 400:
        print(f"{PREFIX} ERROR: {err or 'bad request'}", file=sys.stderr)
        sys.exit(1)
    if status == 0:
        print(f"{PREFIX} ERROR: relay API unreachable: {err}", file=sys.stderr)
        sys.exit(1)
    print(f"{PREFIX} ERROR: HTTP {status}: {err}", file=sys.stderr)
    sys.exit(1)


_USAGE = """\
Usage: {prog} <command> [args]

Commands:
  expect [--ttl MINUTES]     create a one-time pre-registration token for a customer
  create-session [<session-id>]  create a session (relay generates ID if omitted)
    --case CASE              case number or reference
    --customer NAME          customer name or organisation
    --ad-hoc                 no case attached yet (mutex with --case/--customer);
                             fill in later via `tunl set-meta`
    [--expires-in SECONDS]   expiry window (default: relay default, typically 1h)
    [--join]                 immediately start join session (waiting for customer)
  list                       list sessions (default: active + pending)
    [--pending]              show only pending (not yet connected)
    [--all]                  show all statuses including closed/archived
    [--status STATUS]        comma-separated: active,pending,closed,archived,all
    [--customer SUBSTR]      filter by customer name substring
    [--case SUBSTR]          filter by case reference substring
    [--after  DATE[TIME]]    sessions with created_at >= DATE (floor to unit)
    [--before DATE[TIME]]    sessions with created_at <  DATE (ceil to next unit)
                             DATE formats: YYYY-MM-DD, YYYY-MM-DD HH,
                             YYYY-MM-DD HH:MM, YYYY-MM-DD HH:MM:SS
    [--limit N]              cap results (default 20 when any filter given)
    [-v]                     show extra detail (timestamps, origin)
    [-vv]                    show all available fields
  pick                       interactively choose a session and action
  cancel [<session-id>]      remove a session from the relay registry
                             (pending: cancels before customer connects;
                              active: removes record, customer process survives)
  show <session-id>          show details for one session
  join [<session-id>]        join a customer's share session (local tmux session)
    [--read-only]            read-only; do not forward keystrokes
    [--debug]                preserve session logs in /tmp/tunl-*/ after disconnect
                             layout: status (top 3r) / main (middle) / chat (bottom 6r)
  ssh [<session-id>]         SSH directly to customer (auto-upgrades tunnel + ControlMaster)
  upgrade <session-id>       request a tunnel upgrade (relay picks the port)
  browser <session-id>       open a browser with SOCKS5 proxy to customer network
    [--profile NAME]         use named persistent profile (created if absent)
                             omit for interactive profile selection
    [--kill]                 terminate a previously launched browser for this session
  collect <session-id>       bundle /tmp/tunl-<session-id>/ + ssh log for a bug report
    [--output FILE]          write archive to FILE (default: tunl-collect-<sid>-<ts>.tar.zst)
  cast <session-id>          download the relay-side asciinema recording
    [--output FILE | -]      write to FILE (default: <sid>.cast); '-' streams to stdout
  set-meta <session-id>      attach case/customer to a session after the fact
    [--case CASE]            case number or reference to associate
    [--customer NAME]        customer name or organisation to associate
  note <session-id> [--]     read/append session-scoped notes (async handoff)
                             no body: print existing notes (oldest first)
                             --: read body from stdin
                             otherwise: remaining args joined with spaces become the body

Environment:
  TUNL_RELAY          relay hostname        (default: {relay})
  TUNL_RELAY_PORT     relay SSH port        (default: {rport})
  TUNL_SUPPORT_USER   relay SSH user        (default: {user})
  TUNL_API_PORT       relay API port        (default: {aport})
  TUNL_SOCKS_PORT     local SOCKS port      (default: {socks})
  TUNL_FORWARD_AGENT  set to 1 to allow SSH agent forwarding (default: disabled)
  TUNL_SSH_LOG        SSH debug log path    (default: /tmp/tunl-ssh.log)
"""


def _note_poll_loop(sid, latest_note_ref, stop_event, poll_interval,
                    api_get=None):
    """Poll /api/v1/sessions/<sid>/notes repeatedly until stop_event is set.

    latest_note_ref is a single-element list; the most recent note dict
    (or None if there are no notes) is stored in latest_note_ref[0] on
    each successful poll.  Failures are traced but do not stop the
    loop.  The first poll runs immediately; subsequent polls wait
    poll_interval seconds on the stop_event so shutdown wakes us
    early.

    api_get is injected for tests (falls back to module-level _api_get).
    """
    if api_get is None:
        api_get = _api_get
    first = True
    while not stop_event.is_set():
        if not first:
            if stop_event.wait(poll_interval):
                return
        first = False
        try:
            notes = api_get(f'/api/v1/sessions/{sid}/notes')
        except Exception as e:
            trace('chat_window_note_poll_error', err=str(e))
            continue
        if isinstance(notes, list) and notes:
            latest_note_ref[0] = notes[-1]
        else:
            latest_note_ref[0] = None


def _build_session_info_handler(sid, latest_note_ref):
    """Construct the session_info_handler callback for support-side chat.

    Row 1 L/C/R: connection / mode / viewer count.
    Row 2 priority stack: latest note (M6) -> viewer names ->
          case+customer -> empty.  The latest note wins whenever
          it exists so async-handoff context is never hidden by
          shorter-lived signals.
    """
    _last_hostkeys = ['']

    def _handler(payload, pinned, ephemeral, history=None):
        mode = payload.get('mode', 'share')
        tp   = payload.get('tunnel_port')
        mode_label = f'{mode}+tunnel:{tp}' if tp else mode
        # When the broker includes ssh-keyscan output (tunnel_hostkeys), write
        # it to the session's known_hosts so `tunl ssh` works without TOFU.
        hk = payload.get('tunnel_hostkeys') or ''
        if hk and hk != _last_hostkeys[0]:
            _last_hostkeys[0] = hk
            try:
                kh_path = _session_dir(sid) / 'known_hosts'
                keys = bytes.fromhex(hk).decode('utf-8', 'replace')
                kh_path.write_text(keys)
            except Exception:
                pass
        viewers    = payload.get('viewers', 0) or 0
        vnames     = payload.get('viewer_names') or []
        vsuffix    = 's' if viewers != 1 else ''
        # Dimensions summary: show customer outer terminal if reported, and
        # a /!\ warning when our forced window exceeds it (dotted-border
        # territory for the customer).
        ct = payload.get('customer_terminal') or [0, 0]
        cw = payload.get('customer_window') or [0, 0]
        dims = ''
        if ct[0] > 0 and ct[1] > 0:
            warn = (cw[0] > ct[0] or cw[1] > ct[1])
            prefix = '/!\\ ' if warn else ''
            dims = f'  {prefix}term:{ct[0]}x{ct[1]}'
        pinned.set_row1(
            left   = f'● chat  {SYM_TUN if tp else SYM_NTUN} {mode_label}',
            center = f'session {sid}',
            right  = f'{viewers} viewer{vsuffix}{dims}',
        )
        note = latest_note_ref[0]
        if isinstance(note, dict) and note.get('body'):
            author = note.get('author') or '?'
            body   = note.get('body') or ''
            if len(body) > 100:
                body = body[:97] + '...'
            pinned.set_row2(f'note: {author}: {body}')
        elif vnames:
            pinned.set_row2(f'watching: {", ".join(vnames)}')
        else:
            case_s = payload.get('case') or ''
            cust_s = payload.get('customer') or ''
            if case_s or cust_s:
                pinned.set_row2(
                    f'case: {case_s or "(n/a)"}  customer: {cust_s or "(n/a)"}'
                )
            else:
                pinned.set_row2('')
        # Update customer display nick in the history buffer so that
        # messages from who='customer' show a useful label.  The
        # customer-chosen 'customer_nick' (from /nick on the customer
        # side) wins over the company-derived 'customer' field.
        if history is not None:
            chosen = payload.get('customer_nick') or ''
            if chosen:
                history.customer_nick = chosen
            elif payload.get('customer'):
                cn = _first_significant_word(payload['customer'])
                if cn:
                    history.customer_nick = cn
    return _handler


def _chat_window_main(sid: str, display_name: str, read_only: bool,
                      debug: bool, api_get=None):
    """Run the curses chat UI inside a tmux window (internal --_chat_window).

    Spawns an SSH consumer in --chat mode, wires the subprocess stdio to
    run_chat_ui(), and exits when the transport closes or the user types
    /quit.  On SSH failure prints a helpful message and exits 1 so tmux's
    remain-on-exit shows it before respawn.

    api_get is injected for tests so the note-poll daemon thread does not
    do real network I/O; production passes None and falls back to the
    module-level _api_get.
    """
    init('support', sid=sid,
         path=str(_session_dir(sid) / 'trace.jsonl'),
         lastgasp_path=str(_session_dir(sid) / 'lastgasp.jsonl'))
    trace('chat_window_begin', sid=sid, name=display_name,
          read_only=read_only)

    ssh_cmd = _ssh_consume_cmd(sid, read_only, chat=True, name=display_name)
    try:
        proc = subprocess.Popen(
            ssh_cmd,
            stdin =subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            bufsize=0,
        )
    except OSError as e:
        trace('chat_window_spawn_failed', err=str(e))
        print(f'{PREFIX} ERROR: could not spawn ssh: {e}', file=sys.stderr)
        sys.exit(1)

    # The bridge (relay-share.py consume --chat) pipes v4 frames straight
    # through; demux them here so wire_parse sees one CTRL payload at a
    # time.  DATA / HELLO / ERROR are not expected on this leg post-
    # session-1; tolerate them as "skip" so future broker changes do
    # not require touching the chat UI.
    _frame_stream = FrameStream(read_some_from_fileobj(proc.stdout))

    def _read_line():
        while True:
            try:
                type_code, payload = _frame_stream.read_message()
            except (EOFError, WireProtocolError):
                return None
            if type_code == TYPE_CTRL:
                return payload
            if type_code == TYPE_ERROR:
                # Surface relay-side ERROR text; treat as EOF for the UI.
                trace('chat_window_relay_error',
                      text=payload.decode('utf-8', errors='replace'))
                return None
            # DATA / HELLO / unknown: ignore and keep reading.

    def _send_bytes(data):
        try:
            proc.stdin.write(data)
            proc.stdin.flush()
        except (OSError, BrokenPipeError):
            pass

    # Drain stderr in the background so the ssh subprocess never blocks.
    def _drain_stderr():
        try:
            for line in proc.stderr:
                trace('chat_window_ssh_stderr',
                      line=line.decode('utf-8', errors='replace').rstrip())
        except (OSError, ValueError):
            pass
    threading.Thread(target=_drain_stderr, daemon=True).start()

    stop = threading.Event()

    # M6: poll /api/v1/sessions/<sid>/notes in a background thread and
    # stash the most recent entry for _session_info_handler to show.
    # Polling interval is configurable via TUNL_NOTE_POLL_INTERVAL
    # (seconds, default 30).  Mutations to latest_note_ref[0] are
    # atomic under the GIL; no lock needed for a single-slot cell.
    latest_note_ref = [None]
    try:
        poll_interval = float(os.environ.get('TUNL_NOTE_POLL_INTERVAL', '30'))
    except (TypeError, ValueError):
        poll_interval = 30.0

    threading.Thread(
        target=_note_poll_loop,
        args=(sid, latest_note_ref, stop, poll_interval),
        kwargs={'api_get': api_get},
        daemon=True,
    ).start()

    _session_info_handler = _build_session_info_handler(sid, latest_note_ref)

    try:
        run_chat_ui(_read_line, _send_bytes, display_name,
                    stop_event=stop, readonly=read_only,
                    local_echo_on_send=True,
                    pinned_rows=2,
                    session_info_handler=_session_info_handler,
                    mention_tokens=build_mention_tokens(
                        'support', display_name, SUPPORT_COMPANY),
                    session_id=sid)
    finally:
        trace('chat_window_exit', rc=proc.returncode
              if proc.returncode is not None else '?')
        stop.set()
        try:
            proc.stdin.close()
        except OSError:
            pass
        try:
            proc.terminate()
            proc.wait(timeout=2)
        except (OSError, subprocess.TimeoutExpired):
            try:
                proc.kill()
            except OSError:
                pass


# ---------------------------------------------------------------------------
# Support-side viewport filter (--_viewport_filter)
# ---------------------------------------------------------------------------

# Auto-flip detection: ESC[?1049h / ESC[?1049l toggle the xterm alt-screen
# (vim, less, etc.); ESC[?47h / ESC[?47l are the older equivalents.
_ALT_SCREEN_ENTER_RE = re.compile(rb'\x1b\[\?(?:1049|47)h')
_ALT_SCREEN_EXIT_RE  = re.compile(rb'\x1b\[\?(?:1049|47)l')


def _viewport_filter_main(sid: str, ssh_argv, tmux_sock: str = '',
                          tmux_target: str = '', read_only: bool = False,
                          name: str = '', debug: bool = False):
    """Internal: support-side wrapper that spawns ssh -> consume sid,
    demuxes v4 frames from the broker, writes DATA payloads to its
    own stdout (which is the tmux pane stdin), handles CTRL frames
    locally, and translates stdin keystrokes into v4 CTRL frames on
    ssh.stdin.

    Drives the viewport mode state machine:
      * relaxed (default) -- pane stays at whatever size tmux gave it.
      * strict           -- pane is resized to the broker-forwarded
                            EFFECTIVE_SIZE C R.
    Auto-flip to strict on alt-screen entry (ESC[?1049h / ESC[?47h)
    and back to relaxed on the corresponding exit sequence; set
    TUNL_ALT_SCREEN_STRICT=0 to disable auto-flip.  Manual toggle via
    a side-channel FIFO at <session_dir>/viewport.fifo (commands:
    STRICT / RELAXED / TOGGLE; consumed in session 5's keybindings).

    Exit code:
      * 42 on SESSION_ENDED so cmd_join's pane-died hook fires the
        SESSION CLOSED popup.
      * 0  on clean disconnect (network blip; tmux respawns the pane).
      * 1  on local error.
    """
    init('support', sid=sid,
         path=str(_session_dir(sid) / 'trace.jsonl'),
         lastgasp_path=str(_session_dir(sid) / 'lastgasp.jsonl'))
    # Fall back to $TMUX_PANE when no explicit --tmux-target was
    # passed.  tmux sets TMUX_PANE in the env of every pane process
    # and pane ids are stable across join-pane / break-pane, so the
    # filter always knows which pane it lives in -- a hardcoded
    # window:index would break under user-driven layout changes.
    if not tmux_target:
        tmux_target = os.environ.get('TMUX_PANE', '')
    trace('viewport_filter_begin', sid=sid, name=name,
          read_only=read_only, tmux_target=tmux_target)

    # Greeting on the support pane while we connect.  Replaced by the
    # customer's snapshot once it arrives (the snapshot's CSI 2J / CSI
    # H clears the screen).  Uses \r\n explicitly because later
    # messages run in raw mode (cfmakeraw disables OPOST) and we want
    # consistent line endings throughout.
    _greeting = (
        f'\x1b[1;36m[tunl] connecting to relay '
        f'(session {sid})...\x1b[m\r\n'
        f'\x1b[2m[tunl] waiting for customer\'s tmux snapshot...'
        f'\x1b[m\r\n'
    ).encode('utf-8')
    try:
        sys.stdout.buffer.write(_greeting)
        sys.stdout.buffer.flush()
    except OSError:
        pass

    auto_flip = os.environ.get('TUNL_ALT_SCREEN_STRICT', '1').lower() not in (
        '0', 'false', 'no', 'off')

    try:
        proc = subprocess.Popen(
            ssh_argv,
            stdin =subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            bufsize=0,
        )
    except OSError as e:
        trace('viewport_filter_spawn_failed', err=str(e))
        sys.stderr.write(f'{PREFIX} ERROR: could not spawn ssh: {e}\n')
        sys.exit(1)

    # Drain stderr so the ssh subprocess never blocks; trace each line.
    def _drain_stderr():
        try:
            for line in proc.stderr:
                trace('viewport_filter_ssh_stderr',
                      line=line.decode('utf-8', errors='replace').rstrip())
        except (OSError, ValueError):
            pass
    threading.Thread(target=_drain_stderr, daemon=True).start()

    # Self-pipe so the recv / FIFO threads can wake the stdin loop
    # when SESSION_ENDED arrives or the broker disconnects.  Without
    # it, the wrapper would block on os.read(stdin) indefinitely
    # after the broker is gone.
    _done_r, _done_w = os.pipe()

    def _signal_done():
        try:
            os.write(_done_w, b'\x01')
        except OSError:
            pass

    # Viewport / status state.  All mutated only from the recv thread
    # except `mode` which is also mutated from the FIFO thread; protect
    # with a lock so a manual toggle can't race a SIGWINCH.
    state_lock     = threading.Lock()
    mode           = ['relaxed']      # 'relaxed' | 'strict'
    effective_size = [None]           # last EFFECTIVE_SIZE C R from broker
    alt_screen     = [False]
    pane_dead      = [False]
    restricted     = [False]
    brb            = [False]
    session_ended  = [False]
    # Liveness: monotonic timestamps for last frame in/out on the
    # support<->broker SSH stdio (see docs/share.md "Liveness").  The
    # heartbeat thread reads last_outbound to decide when to emit a
    # CTRL HEARTBEAT; later commits will read last_inbound to drive
    # peer-late indicators.
    last_inbound   = [time.monotonic()]
    last_outbound  = [time.monotonic()]
    heartbeat_stop = threading.Event()
    # Set as soon as the broker sends ANY frame (HELLO, SESSION_INFO,
    # the customer's snapshot DATA, ...).  While clear, a background
    # ticker overwrites a "still waiting NN s elapsed" line so the
    # user sees progress instead of a static greeting.  Any frame
    # type means the broker handshake completed and the wait is over.
    first_frame_event = threading.Event()
    # Serialises the spinner's tick writes against the recv thread's
    # one-time spinner-line clear on the first frame; after that the
    # recv thread drops the lock and writes are uncontended.
    stdout_lock       = threading.Lock()
    # SIGWINCH is dropped while in strict mode (see _on_winch).  We
    # still bump a counter so the strict->relaxed trace breadcrumb
    # captures how many resizes the user did during strict; useful
    # when a "stuck" report comes in.
    winch_ignored_in_strict = [0]

    _DEAD_MSG  = (
        b'\r\n\x1b[1;31m[tunl] Customer shell exited.'
        b' Press r to restart it.\x1b[m\r\n'
    )
    _READONLY_MSG = (
        b'\r\n\x1b[1;33m[tunl] Session is restricted:'
        b' shell keystroke injection disabled (chat still works).\x1b[m\r\n'
    )
    _BRB_MSG = (
        b'\r\n\x1b[1;33m[tunl] Customer is away -- input temporarily frozen.\x1b[m\r\n'
    )
    _BRB_END_MSG = (
        b'\r\n\x1b[1;32m[tunl] Customer is back -- input restored.\x1b[m\r\n'
    )
    _SESSION_ENDED_MSG = (
        '\r\n\x1b[1;31m--- SESSION CLOSED ---\x1b[m\r\n'
    ).encode('utf-8')

    # Pane-border-status (and friends) make the pane smaller than the
    # window it lives in: with `pane-border-status top` the pane is
    # one row shorter than the window.  Detect the overhead once,
    # lazily on the first resize, then add it to subsequent
    # resize-window calls so the rendered pane lands at the requested
    # size rather than (cols, rows-1).
    border_overhead = [None]   # (cols_overhead, rows_overhead) or None

    def _detect_border_overhead():
        if not tmux_target:
            return (0, 0)
        cmd = ['tmux']
        if tmux_sock:
            cmd += ['-S', tmux_sock]
        cmd += ['display-message', '-t', tmux_target, '-p',
                '#{e|-:#{window_width},#{pane_width}} '
                '#{e|-:#{window_height},#{pane_height}}']
        try:
            r = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=2)
            parts = r.stdout.decode('ascii', errors='replace').split()
            if len(parts) == 2:
                return (max(0, int(parts[0])), max(0, int(parts[1])))
        except (OSError, subprocess.SubprocessError, ValueError):
            pass
        return (0, 0)

    def _resize_window(cols: int, rows: int):
        """Resize the local join-session window to (cols, rows) via
        tmux resize-window, compensating for pane-border-status so
        the pane (not the window) ends up at the requested size.
        No-op if tmux_target is empty."""
        if not tmux_target:
            return
        if border_overhead[0] is None:
            border_overhead[0] = _detect_border_overhead()
        co, ro = border_overhead[0]
        win_cols = cols + co
        win_rows = rows + ro
        cmd = ['tmux']
        if tmux_sock:
            cmd += ['-S', tmux_sock]
        cmd += ['resize-window', '-t', tmux_target,
                '-x', str(win_cols), '-y', str(win_rows)]
        try:
            subprocess.run(cmd, stdout=subprocess.DEVNULL,
                           stderr=subprocess.DEVNULL, timeout=2)
            trace('viewport_resize', cols=cols, rows=rows,
                  win_cols=win_cols, win_rows=win_rows,
                  mode=mode[0])
        except (OSError, subprocess.SubprocessError) as e:
            trace('viewport_resize_failed', err=str(e))

    mode_path = _session_dir(sid) / 'viewport.mode'

    def _set_tmux_option(opt: str, val: str):
        """Set a window-level tmux user option on the join window so
        the pane-border-format can show it.  Best-effort -- on a
        wrapper started outside a real tmux session, tmux_target /
        tmux_sock are empty and this is a no-op."""
        if not tmux_target:
            return
        cmd = ['tmux']
        if tmux_sock:
            cmd += ['-S', tmux_sock]
        cmd += ['set-option', '-w', '-t', tmux_target, opt, val]
        try:
            subprocess.run(cmd, stdout=subprocess.DEVNULL,
                           stderr=subprocess.DEVNULL, timeout=1)
        except (OSError, subprocess.SubprocessError):
            pass

    def _publish_mode(m: str):
        """Persist current viewport mode in two places: a side-channel
        file (kept for menu callbacks / tests) and the
        @tunl-viewport window-level tmux user option (read by the
        pane-border-format installed in cmd_join)."""
        try:
            mode_path.write_text(m + '\n')
        except OSError:
            pass
        _set_tmux_option('@tunl-viewport', m)

    _publish_mode(mode[0])

    def _restore_window_auto_fit():
        """Run `tmux resize-window -A` so the join window grows back
        to the attached client's full size, and explicitly reset the
        window-size option to 'latest'.

        Necessary on leaving strict mode for two reasons:

          - The strict-entry `resize-window -x -y` call implicitly
            switches the window-size option to 'manual'.  `-A`
            adjusts the dims but does not flip the policy back; if
            we leave it at 'manual', the user's outer terminal
            resizes no longer auto-propagate to the join window,
            no SIGWINCH fires on the wrapper, and we never re-emit
            our size upstream -- visible as a stuck pane that only
            unsticks after a manual alt-screen toggle.

          - 'latest' (tmux's documented default for window-size)
            adjusts to whichever client most recently became
            active; for a single-client join session that is the
            support's outer terminal, which is exactly what we
            want.

        Best-effort: tmux versions that don't accept a particular
        value for window-size silently ignore the set-option call,
        and the resize-window -A above already covers the common
        case.
        """
        if not tmux_target:
            return
        base = ['tmux']
        if tmux_sock:
            base += ['-S', tmux_sock]
        try:
            subprocess.run(
                base + ['set-option', '-w', '-t', tmux_target,
                        'window-size', 'latest'],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                timeout=1,
            )
        except (OSError, subprocess.SubprocessError):
            pass
        try:
            subprocess.run(
                base + ['resize-window', '-A', '-t', tmux_target],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                timeout=1,
            )
            trace('viewport_restore_auto_fit')
        except (OSError, subprocess.SubprocessError) as e:
            trace('viewport_restore_failed', err=str(e))

    def _apply_mode(new_mode: str):
        """Apply mode if it changed; resize when entering strict,
        restore auto-fit and re-sync upstream when leaving."""
        with state_lock:
            if mode[0] == new_mode:
                return
            old_mode = mode[0]
            mode[0] = new_mode
            eff = effective_size[0]
        trace('viewport_mode', mode=new_mode,
              eff=eff if eff else None)
        _publish_mode(new_mode)
        if new_mode == 'strict' and eff is not None:
            _resize_window(*eff)
        elif new_mode == 'relaxed' and old_mode == 'strict':
            # Leaving strict: the window has been pinned by our
            # earlier resize-window call (tmux switches window-size
            # to manual implicitly).  Restore auto-fit so the pane
            # grows back to the attached client's full size, and
            # explicitly re-emit our current pane size upstream so
            # the broker's _consumer_sizes[us] catches up -- the
            # SIGWINCH handler was suppressed during strict, so any
            # outer resize the user did while in strict never made
            # it to the broker.  Without this, the broker keeps
            # forwarding the old strict-time small dims to the
            # customer, and both sides end up with wasted space.
            ignored = winch_ignored_in_strict[0]
            winch_ignored_in_strict[0] = 0
            trace('viewport_leave_strict',
                  ignored_winch=ignored)
            _restore_window_auto_fit()
            _send_winch()
            # The synchronous _send_winch above reads the pane size
            # before tmux has processed `resize-window -A` -- so it
            # sends the strict-pinned dims, not the restored ones.
            # Schedule a second one ~150 ms later to capture the
            # post-restore size; tmux normally also fires SIGWINCH
            # after the resize, but only when the size actually
            # changes, and we want the upstream broker view to
            # converge even when SIGWINCH is suppressed by the
            # outer terminal already matching the restored dims.
            def _deferred_resync():
                time.sleep(0.15)
                if mode[0] == 'relaxed':
                    _send_winch()
            threading.Thread(target=_deferred_resync,
                             name='viewport-resync',
                             daemon=True).start()

    def _send_ctrl(line: str, *bin_args: bytes):
        """Send a v4 CTRL frame to the broker via the ssh subprocess."""
        try:
            proc.stdin.write(_v4_ctrl(line, *bin_args))
            proc.stdin.flush()
            last_outbound[0] = time.monotonic()
        except (OSError, BrokenPipeError):
            pass

    def _send_hello():
        try:
            proc.stdin.write(_v4_hello())
            proc.stdin.flush()
            last_outbound[0] = time.monotonic()
        except (OSError, BrokenPipeError):
            pass

    def _send_winch():
        """Emit the local pane's current size as a RESIZE frame so the
        broker's smallest-wins policy includes us."""
        try:
            cols, rows = os.get_terminal_size(sys.stdout.fileno())
        except OSError:
            return
        _send_ctrl(f'RESIZE {cols} {rows}')

    # Frame demuxer over ssh.stdout.
    stream = FrameStream(read_some_from_fileobj(proc.stdout))

    def _recv():
        """Read frames from broker; dispatch DATA -> stdout, CTRL ->
        local handlers.  Auto-flip viewport mode on alt-screen entry/
        exit when TUNL_ALT_SCREEN_STRICT is enabled."""
        # First read should be the broker's HELLO; we sent ours above.
        # Tolerate either ordering -- the consumer leg is symmetric and
        # the bridge byte-copies frames verbatim, so HELLO arrives in
        # the natural read order.
        try:
            while True:
                try:
                    type_code, payload = stream.read_message()
                except EOFError:
                    break
                except WireProtocolError as e:
                    trace('viewport_protocol_error', err=str(e))
                    sys.stderr.write(
                        f'\n[tunl] protocol error: {e}\n')
                    break

                # Liveness: any successful frame read counts as "peer
                # is alive".
                last_inbound[0] = time.monotonic()

                # Any successful frame read -- HELLO, SESSION_INFO,
                # snapshot DATA -- means the broker is up and
                # responding.  Stop the still-waiting spinner and
                # erase its in-place line under the lock so we do not
                # race a final tick from the spinner thread.
                if not first_frame_event.is_set():
                    with stdout_lock:
                        first_frame_event.set()
                        try:
                            sys.stdout.buffer.write(b'\r\x1b[2K')
                        except OSError:
                            pass

                if type_code == TYPE_DATA:
                    sys.stdout.buffer.write(payload)
                    sys.stdout.buffer.flush()
                    if auto_flip:
                        # Detect alt-screen entry/exit anywhere in the
                        # payload.  Either is rare (one per command at
                        # most), so per-frame regex scan is fine.
                        flip_to = None
                        if _ALT_SCREEN_ENTER_RE.search(payload):
                            flip_to = 'strict'
                            with state_lock:
                                alt_screen[0] = True
                        if _ALT_SCREEN_EXIT_RE.search(payload):
                            flip_to = 'relaxed'
                            with state_lock:
                                alt_screen[0] = False
                        if flip_to:
                            _apply_mode(flip_to)
                    continue
                if type_code == TYPE_HELLO:
                    continue
                if type_code == TYPE_ERROR:
                    reason = payload.decode('utf-8', errors='replace')
                    trace('viewport_relay_error', reason=reason)
                    sys.stderr.write(f'\n[tunl] {reason}\n')
                    break
                if type_code != TYPE_CTRL:
                    continue
                try:
                    line, args = decode_ctrl(payload)
                except WireProtocolError:
                    continue
                parts = line.split()
                if not parts:
                    continue
                verb = parts[0]
                if verb == 'HEARTBEAT':
                    # Idle keepalive from broker; last_inbound already
                    # updated above.
                    continue
                if verb == 'PEER_LOST':
                    # 'PEER_LOST role for_s' optional name binary arg.
                    # Today the broker only sends this for the customer
                    # leg (C4); C6 will extend it to consumer legs.
                    role = parts[1] if len(parts) >= 2 else 'peer'
                    name = (args[0].decode('utf-8', errors='replace')
                            if args else '')
                    label = name or role
                    ts = time.strftime('%H:%M:%S', time.localtime())
                    _set_tmux_option(
                        '@tunl-link', f'{label} lost {ts}')
                    trace('peer_lost', role=role, name=name)
                    continue
                if verb == 'PEER_BACK':
                    role = parts[1] if len(parts) >= 2 else 'peer'
                    name = (args[0].decode('utf-8', errors='replace')
                            if args else '')
                    _set_tmux_option('@tunl-link', '')
                    trace('peer_back', role=role, name=name)
                    continue
                if verb == 'PANE_DEAD':
                    pane_dead[0] = True
                    sys.stdout.buffer.write(_DEAD_MSG)
                    sys.stdout.buffer.flush()
                elif verb == 'READONLY':
                    restricted[0] = True
                    sys.stdout.buffer.write(_READONLY_MSG)
                    sys.stdout.buffer.flush()
                elif verb == 'BRB':
                    brb[0] = True
                    sys.stdout.buffer.write(_BRB_MSG)
                    sys.stdout.buffer.flush()
                elif verb == 'BRB_END':
                    brb[0] = False
                    sys.stdout.buffer.write(_BRB_END_MSG)
                    sys.stdout.buffer.flush()
                elif verb == 'SESSION_ENDED':
                    session_ended[0] = True
                    sys.stdout.buffer.write(_SESSION_ENDED_MSG)
                    sys.stdout.buffer.flush()
                    _signal_done()
                    break
                elif verb == 'EFFECTIVE_SIZE' and len(parts) >= 3:
                    # `EFFECTIVE_SIZE C R [override]` -- the trailing
                    # 'override' marker (added in this commit) tells
                    # us the broker is currently pinned by a
                    # RESIZE_OVERRIDE from some seat (possibly us,
                    # possibly another support).  Surface it via the
                    # @tunl-override tmux user option so the
                    # pane-border-format can render a badge.
                    try:
                        c, r = int(parts[1]), int(parts[2])
                    except ValueError:
                        continue
                    # B2.4 defence-in-depth: clamp before driving
                    # resize-window etc.  Broker already range-checks
                    # but we should not trust unconditionally.
                    if not (1 <= c <= 10000 and 1 <= r <= 10000):
                        continue
                    is_override = (
                        len(parts) >= 4 and parts[3] == 'override')
                    with state_lock:
                        effective_size[0] = (c, r)
                        cur_mode = mode[0]
                    trace('viewport_effective_size', cols=c, rows=r,
                          mode=cur_mode, override=is_override)
                    _set_tmux_option(
                        '@tunl-override',
                        f'{c}x{r}' if is_override else '')
                    if cur_mode == 'strict':
                        _resize_window(c, r)
                elif verb in ('TUNNEL_PORT', 'TUNNEL_FAILED',
                              'TUNNEL_REJECTED'):
                    # Out-of-band tunnel signals; cmd_upgrade prints
                    # them, the wrapper just consumes them.
                    pass
                # Unknown verbs ignored silently.
        finally:
            try:
                sys.stdout.buffer.flush()
            except OSError:
                pass
            # Stop the heartbeat thread (no more outbound channel).
            heartbeat_stop.set()
            # Wake the stdin loop so the wrapper can tear down even if
            # the tmux pane's stdin is still connected.
            _signal_done()

    recv_thread = threading.Thread(target=_recv, name='viewport-recv',
                                   daemon=True)
    recv_thread.start()
    _send_hello()
    _send_winch()

    # Liveness: idle keepalive thread.  Emits a CTRL HEARTBEAT every
    # HEARTBEAT_INTERVAL when nothing else has gone out for that long,
    # so the broker can distinguish "support seat idle but reachable"
    # from "support seat silent and possibly dead".  Stops on EOF
    # (heartbeat_stop set from _recv finally) or on process exit
    # (daemon thread).
    def _heartbeat_loop():
        interval = HEARTBEAT_INTERVAL
        tick = max(1, interval // 2)
        while not heartbeat_stop.wait(timeout=tick):
            if (time.monotonic() - last_outbound[0]) >= interval:
                _send_ctrl('HEARTBEAT')

    threading.Thread(target=_heartbeat_loop, name='viewport-heartbeat',
                     daemon=True).start()

    # Still-waiting ticker.  Overwrites a single line below the
    # static greeting with elapsed time + a spinner glyph until the
    # first DATA frame arrives.  Without this the support pane sits
    # silently on "[tunl] waiting for customer's tmux snapshot..."
    # for up to PENDING_TTL_MINUTES (an hour by default), which looks
    # like a hang.  share_consume keeps polling for the broker for
    # the same window; the spinner makes that wait tolerable.
    def _waiter_spinner():
        spinner = '|/-\\'
        start = time.time()
        i = 0
        while not first_frame_event.is_set():
            elapsed = int(time.time() - start)
            m, s = divmod(elapsed, 60)
            line = (f'\r\x1b[2K\x1b[2m[tunl] still waiting '
                    f'{spinner[i % 4]}  {m:02d}:{s:02d} elapsed'
                    f'\x1b[m').encode('utf-8')
            with stdout_lock:
                if first_frame_event.is_set():
                    return
                try:
                    sys.stdout.buffer.write(line)
                    sys.stdout.buffer.flush()
                except OSError:
                    return
            i += 1
            if first_frame_event.wait(timeout=1.0):
                return
    threading.Thread(target=_waiter_spinner, name='viewport-waiter',
                     daemon=True).start()

    # Manual viewport toggle via side-channel FIFO.  cmd_join's
    # session 5 keybindings will write 'STRICT' / 'RELAXED' / 'TOGGLE'
    # one per line to this FIFO.
    fifo_path = _session_dir(sid) / 'viewport.fifo'
    try:
        if not fifo_path.exists():
            os.mkfifo(str(fifo_path), 0o600)
    except OSError:
        pass

    def _fifo_reader():
        # Open O_RDWR so the read end never returns EOF when the writer
        # closes (each tmux run-shell write opens, writes, closes; the
        # reader stays up across many writers).
        try:
            fd = os.open(str(fifo_path), os.O_RDWR)
        except OSError:
            return
        try:
            with os.fdopen(fd, 'rb', buffering=0) as f:
                for raw in f:
                    cmd = raw.strip().decode('ascii', errors='replace').upper()
                    if cmd == 'STRICT':
                        _apply_mode('strict')
                    elif cmd == 'RELAXED':
                        _apply_mode('relaxed')
                    elif cmd == 'TOGGLE':
                        with state_lock:
                            cur = mode[0]
                        _apply_mode('relaxed' if cur == 'strict' else 'strict')
        except (OSError, ValueError):
            pass

    threading.Thread(target=_fifo_reader, name='viewport-fifo',
                     daemon=True).start()

    # SIGWINCH from tmux (the local pane resized): re-emit our dims so
    # the broker's smallest-wins picks min over all consumers.  The
    # wrapper IS the pane process, so the kernel delivers SIGWINCH to
    # us when the pane resizes.
    #
    # Strict mode is special: we are deliberately driving the pane to
    # match the broker's EFFECTIVE_SIZE, so our pane size is a result
    # of broker policy, not an independent constraint.  Reporting it
    # back upstream re-enters the smallest-wins computation and -- in
    # the presence of any pane-shrink overhead such as
    # pane-border-status (which eats one row from every resize-window)
    # -- creates a one-row-per-round-trip feedback loop.  Skip the
    # upstream RESIZE while we are in strict.
    def _on_winch(_signo, _frame):
        with state_lock:
            if mode[0] == 'strict':
                winch_ignored_in_strict[0] += 1
                return
        _send_winch()

    try:
        signal.signal(signal.SIGWINCH, _on_winch)
    except (OSError, ValueError):
        pass

    # Put the pane pty in raw mode so ALL keystrokes -- including
    # Ctrl-C, Ctrl-H, Ctrl-Z, etc. -- arrive as bytes and get
    # forwarded to the customer.  Without this, the pty's line
    # discipline interprets them locally: Ctrl-C delivers SIGINT
    # and kills the wrapper, Ctrl-H is treated as backspace and
    # never leaves the support side, the kernel echoes everything
    # back into the pane.  cfmakeraw also disables ISIG, so SIGINT
    # is no longer generated by the typed key.
    fd = sys.stdin.fileno()
    saved_tty_attrs = None
    if os.isatty(fd):
        try:
            saved_tty_attrs = termios.tcgetattr(fd)
            tty.setraw(fd)
        except (termios.error, OSError):
            saved_tty_attrs = None

    # Stdin keystroke loop.  Block on read; translate to v4 CTRL
    # frames.  Wake on _done_r so SESSION_ENDED / disconnect can
    # tear us down without waiting for the tmux pane's stdin to
    # close.  Local UI-level keys (ESC R force-resize, ESC Z zoom,
    # 'r' on PANE_DEAD respawn) get their own verbs; everything
    # else is K.
    try:
        while True:
            r, _, _ = select.select([fd, _done_r], [], [])
            if _done_r in r:
                break
            try:
                chunk = os.read(fd, 4096)
            except OSError:
                break
            if not chunk:
                break
            if read_only:
                continue
            if pane_dead[0] and any(
                    b in (ord('r'), ord('R'), 0x0d, 0x0a) for b in chunk):
                pane_dead[0] = False
                _send_ctrl('RESPAWN')
                continue
            if chunk == b'\x1bZ':
                _send_ctrl('ZOOM')
                continue
            if chunk == b'\x1bR':
                try:
                    cols, rows = os.get_terminal_size(sys.stdout.fileno())
                except OSError:
                    continue
                _send_ctrl(f'RESIZE_OVERRIDE {cols} {rows}')
                continue
            if pane_dead[0] or restricted[0] or brb[0]:
                continue
            _send_ctrl('K', chunk)
    except KeyboardInterrupt:
        pass
    finally:
        if saved_tty_attrs is not None:
            try:
                termios.tcsetattr(fd, termios.TCSADRAIN, saved_tty_attrs)
            except (termios.error, OSError):
                pass
        try:
            os.close(_done_r)
        except OSError:
            pass
        try:
            os.close(_done_w)
        except OSError:
            pass

    # Tear down.
    try:
        proc.stdin.close()
    except OSError:
        pass
    try:
        proc.terminate()
        proc.wait(timeout=2)
    except (OSError, subprocess.TimeoutExpired):
        try:
            proc.kill()
        except OSError:
            pass
    recv_thread.join(timeout=1.0)
    trace('viewport_filter_exit', session_ended=session_ended[0],
          ssh_returncode=proc.returncode)
    if session_ended[0]:
        sys.exit(42)
    # The relay-side consumer exits 42 when the API confirms the
    # session is closed (no broker socket, no producer to wait for).
    # Propagate it so cmd_join's pane-died hook fires the SESSION
    # CLOSED popup and Enter on the dead pane will not respawn into
    # another doomed snapshot wait.  Surface the same banner the
    # wire-driven SESSION_ENDED path uses, since stderr from the SSH
    # subprocess is drained into the trace log only.
    if proc.returncode == 42:
        try:
            sys.stdout.buffer.write(_SESSION_ENDED_MSG)
            sys.stdout.buffer.flush()
        except OSError:
            pass
        sys.exit(42)


def main(argv=None):
    if argv is None:
        argv = sys.argv[1:]

    prog = os.path.basename(sys.argv[0])

    # Internal: support-side viewport filter wrapping the ssh consumer.
    if argv and argv[0] == '--_viewport_filter':
        # Args:
        #   --_viewport_filter SID
        #   [--tmux-sock PATH] [--tmux-target SESSION:WINDOW]
        #   [--read-only] [--debug] [--name NAME]
        #   -- ssh-argv...
        if len(argv) < 2:
            sys.exit(1)
        sid          = argv[1]
        tmux_sock    = ''
        tmux_target  = ''
        read_only    = False
        debug        = False
        display_name = ''
        rest         = argv[2:]
        # Split on '--': everything before is filter options, after is
        # the ssh subprocess argv.
        if '--' in rest:
            split = rest.index('--')
            opts, ssh_argv = rest[:split], rest[split + 1:]
        else:
            opts, ssh_argv = rest, []
        i = 0
        while i < len(opts):
            if   opts[i] == '--tmux-sock'   and i + 1 < len(opts):
                i += 1; tmux_sock = opts[i]
            elif opts[i] == '--tmux-target' and i + 1 < len(opts):
                i += 1; tmux_target = opts[i]
            elif opts[i] == '--name'        and i + 1 < len(opts):
                i += 1; display_name = opts[i]
            elif opts[i] == '--read-only':
                read_only = True
            elif opts[i] == '--debug':
                debug = True
            i += 1
        if not ssh_argv:
            sys.stderr.write(
                '--_viewport_filter: missing -- <ssh argv>\n')
            sys.exit(1)
        _viewport_filter_main(sid, ssh_argv,
                              tmux_sock=tmux_sock,
                              tmux_target=tmux_target,
                              read_only=read_only,
                              name=display_name,
                              debug=debug)
        return

    # Internal: run the curses chat UI inside a join-session window.
    if argv and argv[0] == '--_chat_window':
        # Args: --_chat_window SID --name NAME [--read-only] [--debug]
        if len(argv) < 2:
            sys.exit(1)
        sid          = argv[1]
        display_name = ''
        read_only    = False
        debug        = False
        rest         = argv[2:]
        i = 0
        while i < len(rest):
            if   rest[i] == '--name' and i + 1 < len(rest):
                i += 1; display_name = rest[i]
            elif rest[i] == '--read-only':
                read_only = True
            elif rest[i] == '--debug':
                debug = True
            i += 1
        _chat_window_main(sid, display_name, read_only, debug)
        return

    # Internal: clean up per-session temp files and ControlMaster when a join
    # session ends (fired from the tmux session-closed hook).
    if argv and argv[0] == '--_cleanup':
        if len(argv) < 2:
            sys.exit(1)
        sid = argv[1]
        init('support', sid=sid,
             path=str(_session_dir(sid) / 'trace.jsonl'),
             lastgasp_path=str(_session_dir(sid) / 'lastgasp.jsonl'))
        trace('cleanup_begin', sid=sid)
        _cleanup_session_local(sid)
        trace('cleanup_done', sid=sid)
        return

    # Internal: open a proxy shell with tunnel + ControlMaster lazily started.
    # Runs inside a tmux window (spawned from the join menu); execs into bash.
    if argv and argv[0] == '--_proxy_shell':
        if len(argv) < 2:
            sys.exit(1)
        sid = argv[1]
        init('support', sid=sid,
             path=str(_session_dir(sid) / 'trace.jsonl'),
             lastgasp_path=str(_session_dir(sid) / 'lastgasp.jsonl'))
        trace('proxy_shell_begin', sid=sid)
        try:
            port, socks_port, cust_user = _ensure_tunnel_and_master(sid)
        except RuntimeError as e:
            trace('proxy_shell_failed', sid=sid, err=str(e))
            print(f"{PREFIX} ERROR: tunnel upgrade failed: {e}", file=sys.stderr)
            sys.exit(1)
        s = _get_session(sid)
        nets = s.get('nets', [])
        dns_servers = s.get('dns', [])
        script = _write_proxy_env_script(sid, port, socks_port, cust_user, nets, dns_servers)
        os.execvp('bash', ['bash', '--rcfile', script])
        return

    if not argv or argv[0] in ('--help', '-h', 'help'):
        print(_USAGE.format(
            prog=prog, relay=RELAY_HOST, rport=RELAY_PORT,
            user=SUPPORT_USER, aport=RELAY_API_PORT, socks=SOCKS_PORT,
        ))
        sys.exit(0 if argv else 1)

    if argv[0] in ('--version', 'version'):
        commit = f' ({GIT_COMMIT})' if GIT_COMMIT != 'dev' else ''
        print(f'{prog} {VERSION}{commit}')
        sys.exit(0)

    cmd = argv[0]

    _ensure_relay_host_key()

    if cmd == 'list':
        import argparse
        ap = argparse.ArgumentParser(prog='tunl list', add_help=False)
        ap.add_argument('--status',   default=None)
        ap.add_argument('--pending',  action='store_true')
        ap.add_argument('--all',      action='store_true', dest='all_status')
        ap.add_argument('--customer', default=None)
        ap.add_argument('--case',     default=None)
        ap.add_argument('--before',   default=None)
        ap.add_argument('--after',    default=None)
        ap.add_argument('--limit',    type=int, default=None)
        ap.add_argument('-v', '--verbose', action='count', default=0)
        args, _ = ap.parse_known_args(argv[1:])
        if args.all_status and args.pending:
            print("error: --all and --pending are mutually exclusive",
                  file=sys.stderr)
            sys.exit(1)
        st = None
        if args.all_status:
            st = 'all'
        elif args.pending:
            st = 'pending'
        elif args.status:
            st = args.status
        cmd_list(
            status=st,
            customer=args.customer,
            case=args.case,
            before=_parse_tunl_datetime(args.before, round_up=True)
                   if args.before else None,
            after=_parse_tunl_datetime(args.after, round_up=False)
                  if args.after else None,
            limit=args.limit,
            verbose=args.verbose,
        )
    elif cmd == 'pick':
        cmd_pick()
    elif cmd == 'show':
        if len(argv) < 2:
            print("Usage: tunl show <session-id>", file=sys.stderr)
            sys.exit(1)
        cmd_show(argv[1])
    elif cmd == 'browser':
        if len(argv) < 2:
            print("Usage: tunl browser <session-id> [--profile NAME] [--kill]",
                  file=sys.stderr)
            sys.exit(1)
        sid     = argv[1]
        profile = None
        kill    = False
        rest    = argv[2:]
        i = 0
        while i < len(rest):
            if rest[i] == '--profile' and i + 1 < len(rest):
                i += 1
                profile = rest[i]
            elif rest[i] == '--kill':
                kill = True
            else:
                print(f"Unknown option: {rest[i]}", file=sys.stderr)
                sys.exit(1)
            i += 1
        cmd_browser(sid, profile=profile, kill=kill)
    elif cmd == 'join':
        sid       = argv[1] if len(argv) >= 2 and not argv[1].startswith('-') else ''
        read_only = '--read-only' in argv[1:]
        debug     = '--debug' in argv[1:]
        cmd_join(sid, read_only=read_only, debug=debug)
    elif cmd == 'close':
        sid = argv[1] if len(argv) >= 2 else ''
        cmd_close(sid)
    elif cmd == 'detach':
        sid = argv[1] if len(argv) >= 2 else ''
        cmd_detach(sid)
    elif cmd == 'cancel':
        sid = argv[1] if len(argv) >= 2 else ''
        cmd_cancel(sid)
    elif cmd == 'ssh':
        sid = argv[1] if len(argv) >= 2 and not argv[1].startswith('-') else ''
        cmd_ssh(sid)
    elif cmd == 'upgrade':
        if len(argv) < 2:
            print("Usage: tunl upgrade <session-id>", file=sys.stderr)
            sys.exit(1)
        cmd_upgrade(argv[1])
    elif cmd == 'create-session':
        # session-id is optional -- relay generates one if omitted.
        sid        = ''
        case_num   = ''
        customer   = ''
        expires_in = None
        join       = False
        ad_hoc     = False
        rest       = argv[1:]
        # If the first arg doesn't look like an option, treat it as the session ID.
        if rest and not rest[0].startswith('--'):
            sid  = rest[0]
            rest = rest[1:]
        i = 0
        while i < len(rest):
            if rest[i] == '--case' and i + 1 < len(rest):
                i += 1
                case_num = rest[i]
            elif rest[i] == '--customer' and i + 1 < len(rest):
                i += 1
                customer = rest[i]
            elif rest[i] == '--ad-hoc':
                ad_hoc = True
            elif rest[i] == '--expires-in' and i + 1 < len(rest):
                i += 1
                try:
                    expires_in = int(rest[i])
                except ValueError:
                    print(f"ERROR: --expires-in requires an integer, got '{rest[i]}'",
                          file=sys.stderr)
                    sys.exit(1)
            elif rest[i] == '--join':
                join = True
            else:
                print(f"Unknown option: {rest[i]}", file=sys.stderr)
                sys.exit(1)
            i += 1
        cmd_create_session(sid, case_num=case_num, customer=customer,
                           expires_in=expires_in, join=join, ad_hoc=ad_hoc)
    elif cmd == 'expect':
        cmd_expect(argv[1:])
    elif cmd == 'collect':
        if len(argv) < 2:
            print("Usage: tunl collect <session-id> [--output FILE]",
                  file=sys.stderr)
            sys.exit(1)
        sid    = argv[1]
        output = ''
        rest   = argv[2:]
        i = 0
        while i < len(rest):
            if rest[i] == '--output' and i + 1 < len(rest):
                i += 1
                output = rest[i]
            else:
                print(f"Unknown option: {rest[i]}", file=sys.stderr)
                sys.exit(1)
            i += 1
        cmd_collect(sid, output=output)
    elif cmd == 'note':
        if len(argv) < 2:
            print("Usage: tunl note <session-id> [body...]", file=sys.stderr)
            sys.exit(1)
        sid  = argv[1]
        rest = argv[2:]
        if rest == ['--']:
            body = sys.stdin.read().strip()
        elif rest:
            body = ' '.join(rest).strip()
        else:
            body = ''
        cmd_note(sid, body)
    elif cmd == 'cast':
        if len(argv) < 2:
            print("Usage: tunl cast <session-id> [--output FILE | -]",
                  file=sys.stderr)
            sys.exit(1)
        sid    = argv[1]
        output = ''
        rest   = argv[2:]
        i = 0
        while i < len(rest):
            if rest[i] in ('--output', '-o') and i + 1 < len(rest):
                i += 1
                output = rest[i]
            else:
                print(f"Unknown option: {rest[i]}", file=sys.stderr)
                sys.exit(1)
            i += 1
        cmd_cast(sid, output=output)
    elif cmd == 'set-meta':
        if len(argv) < 2:
            print("Usage: tunl set-meta <session-id> [--case CASE] [--customer NAME]",
                  file=sys.stderr)
            sys.exit(1)
        sid      = argv[1]
        case_num = ''
        customer = ''
        rest     = argv[2:]
        i = 0
        while i < len(rest):
            if rest[i] == '--case' and i + 1 < len(rest):
                i += 1
                case_num = rest[i]
            elif rest[i] == '--customer' and i + 1 < len(rest):
                i += 1
                customer = rest[i]
            else:
                print(f"Unknown option: {rest[i]}", file=sys.stderr)
                sys.exit(1)
            i += 1
        cmd_set_meta(sid, case=case_num, customer=customer)
    else:
        print(f"Unknown command: {cmd}", file=sys.stderr)
        print(_USAGE.format(
            prog=prog, relay=RELAY_HOST, rport=RELAY_PORT,
            user=SUPPORT_USER, aport=RELAY_API_PORT, socks=SOCKS_PORT,
        ), file=sys.stderr)
        sys.exit(1)


if __name__ == '__main__':
    try:
        main()
    except (RuntimeError, OSError) as e:
        print(f"{PREFIX} ERROR: {e}", file=sys.stderr)
        sys.exit(1)
    except KeyboardInterrupt:
        sys.exit(0)
