#!/bin/sh
# SPDX-License-Identifier: GPL-3.0-or-later
# Copyright (C) 2026 LINBIT HA-Solutions GmbH
# Author: Lars Ellenberg
#
# linbit-tunl -- LINBIT support tunnel client
#
# Opens an SSH reverse tunnel to the LINBIT relay server so that LINBIT
# support engineers can reach this system.  The customer runs this script,
# observes the session in a shared tmux window, and presses Ctrl-C to revoke
# access at any time.
#
# Usage: linbit-tunl.py [--case CASE] [--token TOKEN] [SESSION_ID]
# Requires: Python 3.6+, ssh, optional: tmux.
''':'; for x in python3 python /usr/libexec/platform-python; do
"$x" -c "import sys; sys.exit(0 if sys.version_info >= (3,6) else 1)" \
>/dev/null 2>&1 && exec "$x" "$0" "$@"; done;
>&2 echo "Python 3.6 or later is required"; exit 1; ':'''

import enum
import fcntl
import hashlib
import http.client
import ipaddress
import json
import os
import pwd
import queue
import re
import secrets
import shlex
import shutil
import signal
import socket
import stat
import subprocess
import sys
import tempfile
import threading
import time
import urllib.error
import urllib.request
from pathlib import Path

# --- BEGIN INLINE tunl_trace --------------------------------------------------
# Everything between the BEGIN/END markers is the inlineable tracer body.
# It is copied verbatim into linbit-tunl.py and tunl.py so those single-file
# deliverables stay single-file.  tests/test_inline_tracer.py asserts the
# copies match.  If you need to change tracer behaviour, edit this file and
# re-run `make sync-inline-tracer`.
import collections
import hashlib
import json
import os
import signal
import struct
import threading
import time

try:
    _mono_ns = time.monotonic_ns          # novermin   Python 3.7+
except AttributeError:                    # 3.6 fallback (customer hosts)
    def _mono_ns():
        return int(time.monotonic() * 1_000_000_000)


# ---------------------------------------------------------------------------
# Advanced-tracing aspect toggles (Options C, D, E in docs/debuggability-review)
#
# Semantics: each aspect has its own env var.  If set to '1', enabled; to '0',
# explicitly disabled.  If unset, falls back to TUNL_TRACE_ALL.  This lets
# a reproducer do `TUNL_TRACE_ALL=1 ...` to switch everything on, then add
# e.g. `TUNL_SSH_TRACE=v` to dial back one aspect.  Intended for short
# test runs where no redaction or log rotation is wanted.
# ---------------------------------------------------------------------------

def tracing_enabled(aspect):
    """Return True when an advanced-tracing aspect should be active.

    aspect in {'b', 'wire', 'tmux'}.  The 'b' aspect is the Phase 2 JSON
    tracer (same as TUNL_TRACE=1).  For SSH verbosity use ssh_trace_level().
    """
    aspect_var = {
        'b':    'TUNL_TRACE',
        'wire': 'TUNL_TRACE_WIRE',
        'tmux': 'TUNL_TRACE_TMUX',
    }[aspect]
    v = os.environ.get(aspect_var, '')
    if v == '1':
        return True
    if v == '0':
        return False
    return os.environ.get('TUNL_TRACE_ALL') == '1'


def ssh_trace_level():
    """Return 'v' / 'vv' / 'vvv' / '' for the verbose-SSH aspect (Option E).

    TUNL_SSH_TRACE overrides TUNL_TRACE_ALL.  Values: 'v','vv','vvv' passed
    through; '1' -> 'vvv'; '0' -> ''.  Unset + TUNL_TRACE_ALL=1 -> 'vvv'.
    """
    v = os.environ.get('TUNL_SSH_TRACE', '').lower()
    if v in ('v', 'vv', 'vvv'):
        return v
    if v == '1':
        return 'vvv'
    if v == '0':
        return ''
    if os.environ.get('TUNL_TRACE_ALL') == '1':
        return 'vvv'
    return ''


def ssh_debug_args(purpose, session_dir):
    """Return the ssh args that enable verbose tracing, or [] if disabled.

    purpose identifies the call site ('main', 'upgrade', 'master', 'consume'
    etc.); it becomes part of the -E log file name so concurrent calls
    don't trample each other.  session_dir is where the log file lands.
    """
    lvl = ssh_trace_level()
    if not lvl:
        return []
    path = os.path.join(str(session_dir), f'ssh.{purpose}.{lvl}.log')
    return [f'-{lvl}', '-E', path]


class WireTap:
    """Capture every byte crossing a socket / pipe boundary (Option C).

    Records are binary-framed for fast append and exact replay:
        1 byte  direction    '>' (local sends) or '<' (local receives)
        8 bytes monotonic ns since tap open (big-endian uint64)
        4 bytes payload length (big-endian uint32)
        N bytes payload

    One WireTap per communication channel (per-consumer UDS, customer SSH
    stdio, etc.).  Not redacted.  No size cap: the caller is expected to
    use these only for short reproducer runs.
    """

    _HEADER = struct.Struct('>BQI')

    def __init__(self, path):
        self._lock = threading.Lock()
        self._t0   = _mono_ns()
        try:
            os.makedirs(os.path.dirname(path) or '.', exist_ok=True)
            self._fh = open(path, 'wb', buffering=0)
        except OSError:
            self._fh = None

    def write(self, direction, data):
        """Record one chunk.  direction is '>' (egress) or '<' (ingress).

        data may be bytes or str (UTF-8 encoded).  Never raises.
        """
        if self._fh is None or not data:
            return
        if isinstance(data, str):
            data = data.encode('utf-8', errors='replace')
        try:
            hdr = self._HEADER.pack(
                ord(direction), _mono_ns() - self._t0, len(data))
            with self._lock:
                self._fh.write(hdr)
                self._fh.write(data)
        except (OSError, struct.error):
            pass

    def close(self):
        fh = self._fh
        self._fh = None
        if fh is not None:
            try:
                fh.close()
            except OSError:
                pass


def _disabled_tap():
    """A no-op WireTap returned when wire tracing is off."""
    t = WireTap.__new__(WireTap)
    t._lock = threading.Lock()
    t._t0   = 0
    t._fh   = None
    return t


def open_wire_tap(path):
    """Return a WireTap if TUNL_TRACE_WIRE / TUNL_TRACE_ALL enable it, else a no-op."""
    if not tracing_enabled('wire'):
        return _disabled_tap()
    return WireTap(path)


def frame_digest(payload):
    """Return a short sha256 prefix of payload for logging without body.

    Accepts str or bytes.  Returns 16 hex chars (64-bit prefix).
    """
    if payload is None:
        return ''
    if isinstance(payload, str):
        payload = payload.encode('utf-8', errors='replace')
    return hashlib.sha256(payload).hexdigest()[:16]


def _iso_ts():
    # UTC, millisecond precision, ends with Z.
    t = time.time()
    ms = int((t - int(t)) * 1000)
    return time.strftime('%Y-%m-%dT%H:%M:%S', time.gmtime(t)) + f'.{ms:03d}Z'


class Tracer:
    """Structured tracer for one process.

    Events are always appended to the in-memory ring buffer (cheap).
    They are additionally written to disk only while enabled and while
    the byte cap has not been reached.
    """

    def __init__(self, comp, sid=None, path=None,
                 ring_size=None, max_bytes=None):
        self.comp        = comp
        self.sid         = sid
        self.path        = path
        self.enabled     = os.environ.get('TUNL_TRACE') == '1'
        rs = ring_size if ring_size is not None else \
             int(os.environ.get('TUNL_TRACE_RING', '500'))
        mb = max_bytes if max_bytes is not None else \
             int(os.environ.get('TUNL_TRACE_MAX', str(50 * 1024 * 1024)))
        self.ring        = collections.deque(maxlen=rs)
        self.max_bytes   = mb
        self._bytes      = 0
        self._fh         = None
        self._lock       = threading.Lock()
        self._capped     = False
        if self.enabled and self.path:
            self._open()

    def _open(self):
        try:
            # Line-buffered so every record is on disk immediately.
            os.makedirs(os.path.dirname(self.path) or '.', exist_ok=True)
            self._fh = open(self.path, 'a', buffering=1)
        except OSError:
            self._fh = None

    def set_sid(self, sid):
        self.sid = sid

    def set_path(self, path):
        self.path = path
        if self.enabled and self._fh is None:
            self._open()

    def trace(self, event, **fields):
        """Emit one event.  Always appended to the ring; written to disk
        only if enabled and under the byte cap.  Never raises.
        """
        try:
            rec = {
                'ts':    _iso_ts(),
                'sid':   self.sid,
                'comp':  self.comp,
                'event': event,
            }
            rec.update(fields)
            line = json.dumps(rec, separators=(',', ':'),
                              default=_json_fallback) + '\n'
        except Exception:
            return
        self.ring.append(line)
        if not self.enabled or self._fh is None or self._capped:
            return
        with self._lock:
            if self._bytes >= self.max_bytes:
                if not self._capped:
                    self._capped = True
                    try:
                        self._fh.write('{"ts":"' + _iso_ts() +
                                       '","comp":"' + self.comp +
                                       '","event":"trace_capped"}\n')
                    except OSError:
                        pass
                return
            try:
                self._fh.write(line)
                self._bytes += len(line)
            except OSError:
                pass

    def toggle(self, enabled=None):
        """Flip or set the enabled flag.  SIGUSR1 calls this with None."""
        if enabled is None:
            enabled = not self.enabled
        self.enabled = enabled
        if enabled and self._fh is None and self.path:
            self._open()
        try:
            self.trace('trace_toggle', enabled=enabled)
        except Exception:
            pass

    def dump_ring(self, path):
        """Write the current ring buffer to path (overwrite).  Never raises."""
        try:
            os.makedirs(os.path.dirname(path) or '.', exist_ok=True)
            with open(path, 'w') as f:
                f.writelines(list(self.ring))
        except OSError:
            pass

    def setup_signals(self, lastgasp_path=None):
        """Install SIGUSR1 (toggle) and SIGUSR2 (dump ring) handlers.

        Must be called from the main thread.  Safe to call multiple times.
        """
        def _sigusr1(signum, frame):
            self.toggle()

        def _sigusr2(signum, frame):
            if lastgasp_path:
                self.dump_ring(lastgasp_path)

        try:
            signal.signal(signal.SIGUSR1, _sigusr1)
            signal.signal(signal.SIGUSR2, _sigusr2)
        except (OSError, ValueError):
            # Not the main thread or signals unavailable.  Skip silently.
            pass

    def flush_on_exit(self, lastgasp_path):
        """Return a callable suitable for atexit: dumps the ring if nonempty."""
        def _dump():
            if self.ring:
                self.dump_ring(lastgasp_path)
        return _dump

    def close(self):
        fh = self._fh
        self._fh = None
        if fh is not None:
            try:
                fh.close()
            except OSError:
                pass


def _json_fallback(obj):
    """Coerce non-JSON-native objects to something serialisable."""
    if isinstance(obj, (bytes, bytearray)):
        try:
            return obj.decode('utf-8')
        except UnicodeDecodeError:
            return obj.hex()
    if isinstance(obj, (set, frozenset)):
        return sorted(obj)
    return repr(obj)


# ---------------------------------------------------------------------------
# Module-level convenience: a shared singleton for simple cases.
# ---------------------------------------------------------------------------

_default = None
_default_lock = threading.Lock()


def init(comp, sid=None, path=None, ring_size=None, max_bytes=None,
         lastgasp_path=None, install_signals=True):
    """Create the module-level default tracer.  Idempotent per process."""
    global _default
    with _default_lock:
        if _default is not None:
            if sid is not None:
                _default.set_sid(sid)
            if path is not None:
                _default.set_path(path)
            return _default
        _default = Tracer(comp, sid=sid, path=path,
                          ring_size=ring_size, max_bytes=max_bytes)
        if install_signals:
            _default.setup_signals(lastgasp_path=lastgasp_path)
        return _default


def get():
    """Return the default tracer, or a disabled no-op one if init() was never called."""
    global _default
    if _default is None:
        with _default_lock:
            if _default is None:
                _default = Tracer('unknown')
    return _default


def trace(event, **fields):
    """Shortcut: emit on the default tracer."""
    get().trace(event, **fields)
# --- END INLINE tunl_trace ----------------------------------------------------

# --- BEGIN INLINE chatui ------------------------------------------------------
# Everything between BEGIN/END markers is the inlineable body, copied
# verbatim into linbit-tunl.py and tunl.py.  tests/test_inline_chatui.py
# asserts the copies match byte-for-byte.

import hashlib as _cu_hashlib
import queue as _cu_queue
import re as _cu_re
import textwrap as _cu_textwrap
import time as _cu_time


# ---------------------------------------------------------------------------
# Wire protocol helpers.  Consumer side of tunl share-mode protocol v4.
#
# wire_parse(payload) decodes the body of a single CTRL frame; wire_encode
# (kind, text) returns the bytes of a complete v4 frame ready for the wire.
# Symbols decode_ctrl, encode_ctrl, encode_message, TYPE_CTRL,
# WireProtocolError are provided by the wire codec at module scope.
# ---------------------------------------------------------------------------

def wire_parse(payload):
    """Parse one inbound CTRL frame payload.

    payload is the CTRL frame body: bytes of '<verb-line>\\n[<binary-args>...]'.
    Accepts bytes; str is coerced to UTF-8 for backward-compatible callers.

    Returns an event dict:
        {'kind': 'chat',          'who': str, 'ts': str, 'body': str}
        {'kind': 'system',        'ts': str, 'body': str}        (SCHAT from '[tunl]')
        {'kind': 'note',          'who': str, 'ts': str, 'body': str}  (M7)
        {'kind': 'session_info',  'payload': dict}
        {'kind': 'ephemeral',     'lifecycle': str, 'body': str}
        {'kind': 'session_ended'}
        {'kind': 'readonly'}
        {'kind': 'readwrite'}
        {'kind': 'brb'}
        {'kind': 'brb_end'}
        {'kind': 'heartbeat'}
        {'kind': 'unknown',       'raw': str}
    Never raises on malformed input; returns kind='unknown'.
    """
    if isinstance(payload, str):
        payload = payload.encode('utf-8', errors='replace')
    if not payload:
        return {'kind': 'unknown', 'raw': ''}
    try:
        line, args = decode_ctrl(payload)
    except WireProtocolError:
        return {'kind': 'unknown', 'raw': repr(payload)}
    parts = line.split()
    if not parts:
        return {'kind': 'unknown', 'raw': line}
    verb = parts[0]

    if verb == 'SESSION_ENDED':
        return {'kind': 'session_ended'}
    if verb == 'READONLY':
        return {'kind': 'readonly'}
    if verb == 'READWRITE':
        return {'kind': 'readwrite'}
    if verb == 'BRB':
        return {'kind': 'brb'}
    if verb == 'BRB_END':
        return {'kind': 'brb_end'}
    if verb == 'HEARTBEAT':
        return {'kind': 'heartbeat'}

    if verb == 'SESSION_INFO':
        # 1 binary arg = JSON blob (UTF-8).
        if args:
            import json as _cu_json
            try:
                payload_obj = _cu_json.loads(args[0].decode('utf-8', 'replace'))
            except (_cu_json.JSONDecodeError, ValueError):
                return {'kind': 'unknown', 'raw': line}
            return {'kind': 'session_info', 'payload': payload_obj}

    if verb == 'EPHEMERAL':
        # 'EPHEMERAL <lifecycle>'; 0 or 1 binary arg (body).
        # 'clear' lifecycle has no body.
        if len(parts) == 2:
            lifecycle = parts[1]
            body = args[0].decode('utf-8', errors='replace') if args else ''
            return {'kind': 'ephemeral', 'lifecycle': lifecycle, 'body': body}

    if verb == 'CCHAT':
        # 'CCHAT <ts>' + 1 binary arg = msg
        if len(parts) == 2 and args:
            ts = parts[1]
            body = args[0].decode('utf-8', errors='replace')
            return {'kind': 'chat', 'who': 'customer', 'ts': ts, 'body': body}

    if verb == 'SCHAT':
        # 'SCHAT <ts>' + 2 binary args = (name, msg)
        if len(parts) == 2 and len(args) >= 2:
            ts = parts[1]
            name = args[0].decode('utf-8', errors='replace')
            body = args[1].decode('utf-8', errors='replace')
            # The broker uses the literal sender name '[tunl]' for
            # system messages (joins, resizes, etc.).
            if name == '[tunl]':
                return {'kind': 'system', 'ts': ts, 'body': body}
            return {'kind': 'chat', 'who': name, 'ts': ts, 'body': body}

    if verb == 'SNOTE':
        # 'SNOTE <ts>' + 2 binary args = (author, body)
        if len(parts) == 2 and len(args) >= 2:
            ts = parts[1]
            author = args[0].decode('utf-8', errors='replace')
            body   = args[1].decode('utf-8', errors='replace')
            return {'kind': 'note', 'who': author, 'ts': ts, 'body': body}

    return {'kind': 'unknown', 'raw': line}


def wire_encode(kind, text):
    """Encode an outbound v4 CTRL frame.

    kind in {'chat', 'customer_chat', 'name', 'customer_nick', 'note'}.
    Returns full v4 frame bytes (header + payload), ready to write to the
    transport.
    """
    if isinstance(text, bytes):
        data = text
    else:
        data = text.encode('utf-8', errors='replace')
    if kind == 'chat':
        verb = 'CHAT'
    elif kind == 'customer_chat':
        verb = 'CUSTOMER_CHAT'
    elif kind == 'name':
        verb = 'NAME'
    elif kind == 'customer_nick':
        verb = 'CUSTOMER_NICK'
    elif kind == 'note':
        verb = 'NOTE'
    else:
        raise ValueError(f'unknown wire kind: {kind!r}')
    return encode_message(TYPE_CTRL, encode_ctrl(verb, binary_args=(data,)))


# ---------------------------------------------------------------------------
# Nick abbreviation and column-width helpers.
# ---------------------------------------------------------------------------

def _compute_nick_abbrevs(nicks):
    """Return {raw_nick: display_nick} using the shortest unique word prefix.

    If two nicks share the same W-word prefix, both get W+1 words.
    Identical full names (same seat) are left as-is (same abbreviation).
    """
    nicks = list(nicks)
    words = [n.split() for n in nicks]
    result = {}
    for i, nick in enumerate(nicks):
        wlist = words[i]
        abbrev = nick  # fallback: full name unchanged
        for w_count in range(1, len(wlist) + 1):
            candidate = ' '.join(wlist[:w_count])
            if all(' '.join(words[j][:w_count]) != candidate
                   for j in range(len(nicks)) if j != i):
                abbrev = candidate
                break
        result[nick] = abbrev
    return result


def _nick_col_width(display_nicks):
    """Tab-stop-aligned column width for the nick field.

    Rounds up to the nearest 4 so the column widens in discrete steps
    (reducing visual churn when participants join/leave).
    """
    if not display_nicks:
        return 8
    max_w = max(len(n) for n in display_nicks)
    w = ((max_w + 4) // 4) * 4   # round up to next multiple of 4
    return min(max(w, 8), 20)


def _first_significant_word(company):
    """Return the first significant word from a company/org name, or None."""
    if not company:
        return None
    for word in _cu_re.split(r'[\s_@\-]+', company):
        w = word.strip()
        if len(w) > 2 and w.lower() not in _MENTION_STOPLIST:
            return w
    parts = company.split()
    return parts[0] if parts else None


# ---------------------------------------------------------------------------
# HistoryBuffer -- bounded deque of structured row dicts plus a scroll index.
# ---------------------------------------------------------------------------

class HistoryBuffer:
    """Store chat events and render them as structured row dicts for the viewport.

    The viewport shows at most `height` rows of `width` columns.  Messages
    are appended as event dicts (from wire_parse) or local notes; call
    lines_for_viewport(height, width) to get the renderable row dicts.

    Each row dict has: {ts, nick_display, nick_color_key, body, kind,
    is_continuation}.  Callers flatten to strings only at display time.

    Scrolling: scroll_offset is the number of rows scrolled up from the
    bottom.  Zero == stuck to live tail; any positive number == paused in
    scrollback (new messages do not auto-scroll).

    Two display toggles are cheap and mutate the wrap cache:
      show_ts      -- if False, render without the leading 'HH:MM '
      filter_re    -- if set, only events whose nick+body matches are shown
      customer_nick -- if set, substitutes who='customer' with this string
    """

    def __init__(self, max_events=2000):
        self._events        = []       # list of event dicts
        self._max_events    = max_events
        self.scroll_offset  = 0        # 0 = at bottom; N = N rows scrolled up
        self.show_ts        = True
        self.filter_re      = None     # compiled regex or None
        self.filter_kinds   = None     # frozenset of allowed kinds or None
        self.customer_nick  = None     # display name for who='customer'
        self.my_nick        = None     # local-echo override; /nick on the
                                       # consumer side updates this so future
                                       # self-tagged messages use the new name
        # Cached wrap: (width, show_ts, filter_id, kinds_id, customer_nick)
        self._wrap_key      = None
        self._wrapped       = []       # flat list of row dicts
        self._nick_col_w    = 8        # computed by _rewrap; used by renderer

    def append(self, ev):
        """Append an event.  ev is a dict with at minimum {'ts','who','body'}
        for chat events, or {'ts','body','kind':'system'|'note'} for others.
        Trims to max_events.  Invalidates the wrap cache.
        """
        self._events.append(ev)
        if len(self._events) > self._max_events:
            drop = len(self._events) - self._max_events
            self._events = self._events[drop:]
        self._wrap_key = None  # force re-wrap

    def __len__(self):
        return len(self._events)

    def set_show_ts(self, on: bool):
        self.show_ts = bool(on)
        self._wrap_key = None

    def set_filter(self, pattern):
        """Set a case-insensitive filter regex; pass None / '' to clear."""
        if not pattern:
            self.filter_re = None
        else:
            try:
                self.filter_re = _cu_re.compile(pattern, _cu_re.IGNORECASE)
            except _cu_re.error:
                self.filter_re = None
        self._wrap_key = None

    def set_kind_filter(self, kinds):
        """Restrict scrollback to the given event kinds.  'local' events
        (client-side feedback like '/search set' acknowledgements) are
        always preserved so users can see their commands take effect.
        Pass None or an empty collection to clear the filter."""
        if not kinds:
            self.filter_kinds = None
        else:
            self.filter_kinds = frozenset(kinds)
        self._wrap_key = None

    def is_filtered(self):
        return (self.filter_re is not None) or (self.filter_kinds is not None)

    def _rewrap(self, width):
        key = (width, self.show_ts, id(self.filter_re), id(self.filter_kinds),
               self.customer_nick)
        if key == self._wrap_key:
            return

        w = max(width, 10)

        # Pass 1: collect distinct non-customer nicks; compute abbreviations.
        raw_nicks = set()
        for ev in self._events:
            who = ev.get('who', '')
            if who and who != 'customer' and ev.get('kind') not in ('system', 'local'):
                raw_nicks.add(who)
        abbrevs = _compute_nick_abbrevs(raw_nicks)

        # Customer display nick (substituted for who='customer').
        cust_display = self.customer_nick or 'customer'

        # Nick column width: sized from all display nicks in this history.
        # Include fixed tokens ([tunl], --, *note*) in the sizing.
        all_display = set(abbrevs.values()) | {cust_display, '[tunl]', '--'}
        # Note authors (wrapped in *) count as their abbrev + 2 chars.
        for ev in self._events:
            if ev.get('kind') == 'note':
                who = ev.get('who', '')
                if who:
                    all_display.add(f'*{abbrevs.get(who, who)}*')
        self._nick_col_w = _nick_col_width(all_display)
        nick_col_w = self._nick_col_w

        # Prefix width: ts (6) + nick column.  Body wraps within remainder.
        ts_w    = 6 if self.show_ts else 0
        prefix_w = ts_w + nick_col_w
        body_w  = max(10, w - prefix_w)
        indent  = ' ' * min(prefix_w, w // 2)

        self._wrapped = []

        for ev in self._events:
            kind = ev.get('kind', 'chat')
            if (self.filter_kinds is not None and kind != 'local'
                    and kind not in self.filter_kinds):
                continue

            ts = ev.get('ts', '') or ''

            if kind == 'system':
                nick_display  = '[tunl]'
                nick_color_key = 'tunl'
                body = ev.get('body', '')
            elif kind == 'note':
                who = ev.get('who', 'note')
                abbr = abbrevs.get(who, who)
                nick_display  = f'*{abbr}*'
                nick_color_key = 'note'
                body = ev.get('body', '')
            elif kind == 'local':
                nick_display  = '--'
                nick_color_key = '--'
                body = ev.get('body', '')
            else:
                who = ev.get('who', '?')
                if who == 'customer':
                    nick_display  = cust_display
                    nick_color_key = 'customer'
                else:
                    nick_display  = abbrevs.get(who, who)
                    nick_color_key = who
                body = ev.get('body', '')

            # Filter on nick + body (no timestamp -- timestamp matching is
            # rarely useful and confuses searches for numbers).
            if self.filter_re is not None:
                if not self.filter_re.search(f'{nick_display} {body}'):
                    continue

            body_lines = _cu_textwrap.wrap(body, width=body_w,
                                           break_long_words=True,
                                           break_on_hyphens=False)
            if not body_lines:
                body_lines = ['']

            self._wrapped.append({
                'ts':            ts if self.show_ts else '',
                'nick_display':  nick_display,
                'nick_color_key': nick_color_key,
                'body':          body_lines[0],
                'kind':          kind,
                'is_continuation': False,
            })
            for bl in body_lines[1:]:
                self._wrapped.append({
                    'ts':            '',
                    'nick_display':  '',
                    'nick_color_key': '',
                    'body':          indent + bl,
                    'kind':          kind,
                    'is_continuation': True,
                })

        self._wrap_key = key

    def lines_for_viewport(self, height, width):
        """Return (rows, at_bottom_flag).

        rows is a list of up to `height` row dicts, topmost first.  Each dict
        has: {ts, nick_display, nick_color_key, body, kind, is_continuation}.
        at_bottom is True iff the view is not scrolled up.
        """
        self._rewrap(width)
        total = len(self._wrapped)
        h = max(height, 1)
        if total <= h:
            return list(self._wrapped), True
        # scroll_offset is how many rows scrolled up from the bottom.
        offset = max(0, min(self.scroll_offset, total - h))
        end   = total - offset
        start = max(0, end - h)
        return self._wrapped[start:end], offset == 0

    def scroll(self, delta, height=1):
        """Positive delta -> scroll up (older).  Negative -> scroll down."""
        if self._wrap_key is None:
            # Force-wrap at the current (or fallback) width so scroll can
            # clamp against something sensible even before the first render.
            w = self._wrap_key[0] if self._wrap_key else 80
            self._rewrap(w)
        max_off = max(0, len(self._wrapped) - max(height, 1))
        self.scroll_offset = max(0, min(self.scroll_offset + delta, max_off))

    def scroll_to_bottom(self):
        self.scroll_offset = 0


# ---------------------------------------------------------------------------
# InputBuffer -- editable single-line input with readline-ish keybindings.
# ---------------------------------------------------------------------------

class InputBuffer:
    """A mutable text buffer + cursor position.  Not curses-aware.

    All keybinding helpers take no arguments and mutate state in place.
    """

    def __init__(self):
        self.text   = ''
        self.cursor = 0
        self.history = []      # submission history (oldest first)
        self._hidx  = None     # current history index (None = live editing)

    def insert(self, ch):
        """Insert a single character (str) or short string at the cursor."""
        if not ch:
            return
        self._hidx = None
        self.text = self.text[:self.cursor] + ch + self.text[self.cursor:]
        self.cursor += len(ch)

    def backspace(self):
        if self.cursor > 0:
            self._hidx = None
            self.text = self.text[:self.cursor - 1] + self.text[self.cursor:]
            self.cursor -= 1

    def delete(self):
        if self.cursor < len(self.text):
            self._hidx = None
            self.text = self.text[:self.cursor] + self.text[self.cursor + 1:]

    def cursor_left(self):
        if self.cursor > 0:
            self.cursor -= 1

    def cursor_right(self):
        if self.cursor < len(self.text):
            self.cursor += 1

    def cursor_home(self):
        self.cursor = 0

    def cursor_end(self):
        self.cursor = len(self.text)

    def kill_to_start(self):       # ctrl-u
        self._hidx = None
        self.text = self.text[self.cursor:]
        self.cursor = 0

    def kill_to_end(self):         # ctrl-k
        self._hidx = None
        self.text = self.text[:self.cursor]

    def kill_word_left(self):      # ctrl-w
        if self.cursor == 0:
            return
        self._hidx = None
        i = self.cursor - 1
        # Skip trailing spaces, then one word.
        while i > 0 and self.text[i].isspace():
            i -= 1
        while i > 0 and not self.text[i - 1].isspace():
            i -= 1
        self.text = self.text[:i] + self.text[self.cursor:]
        self.cursor = i

    def submit(self):
        """Return the current text and clear the buffer.  Empty -> ''."""
        out = self.text
        if out:
            self.history.append(out)
            # Cap history at 200 entries.
            if len(self.history) > 200:
                self.history = self.history[-200:]
        self.text   = ''
        self.cursor = 0
        self._hidx  = None
        return out

    def history_prev(self):
        if not self.history:
            return
        self._hidx = (len(self.history) - 1 if self._hidx is None
                      else max(0, self._hidx - 1))
        self.text   = self.history[self._hidx]
        self.cursor = len(self.text)

    def history_next(self):
        if self._hidx is None:
            return
        self._hidx += 1
        if self._hidx >= len(self.history):
            self._hidx = None
            self.text  = ''
        else:
            self.text = self.history[self._hidx]
        self.cursor = len(self.text)


# ---------------------------------------------------------------------------
# NickColors -- stable nick -> color-pair index (1..8).
# ---------------------------------------------------------------------------

class NickColors:
    """Map a nick to a stable color index in [1, 8].  Index 0 is reserved
    for the default terminal pair; this class never returns it.
    """

    # Reserve index 1 for 'customer' and 2 for 'tunl' system messages.
    # Keys are looked up after stripping '*[]' from the matched nick token,
    # so '[tunl]' -> 'tunl' and '*note*' -> 'note'.
    _RESERVED = {'customer': 1, 'tunl': 2, 'note': 2}

    def __init__(self, palette_size=8):
        self._palette = max(3, palette_size)   # leave 1,2 for reserved

    def index_for(self, nick):
        if nick in self._RESERVED:
            return self._RESERVED[nick]
        h = _cu_hashlib.sha1(nick.encode('utf-8', 'replace')).digest()
        return 3 + (h[0] % (self._palette - 2))


# ---------------------------------------------------------------------------
# Pinned row rendering -- status info that sits above the scrollback.
# Customer uses 1 row; support uses 2.  Content is free-form; the owner
# (caller) is responsible for keeping slots populated.  This class is
# protocol-agnostic: it knows how to lay out strings in `width` columns.
# ---------------------------------------------------------------------------

def fit_lcr(left, center, right, width):
    """Place left / center / right into exactly `width` columns.

    Priority when space is tight: keep left, then right, then center.
    Left is truncated (never dropped) so the user always sees the most
    important slot.  Single-space minimum between non-empty slots.
    """
    if width <= 0:
        return ''
    L, C, R = str(left or ''), str(center or ''), str(right or '')
    if C and len(L) + len(C) + len(R) + 2 <= width:
        inner_w = width - len(L) - len(R)
        lgap = (inner_w - len(C)) // 2
        rgap = inner_w - len(C) - lgap
        return L + (' ' * lgap) + C + (' ' * rgap) + R
    if len(L) + len(R) + 1 <= width:
        return L + ' ' * (width - len(L) - len(R)) + R
    # Either L or R alone exceeds the remaining budget: keep L (truncated).
    return L[:width].ljust(width)


class PinnedRows:
    """Mutable state for 1 or 2 pinned rows above the chat scrollback.

    Row 1 is always L/C/R (connection state, session meta, viewer count).
    Row 2 (support side only) is free-form text used for a priority stack
    of transient context (latest note -> ticket URL -> viewer names).

    scroll_status is set by the curses layer when the user has scrolled
    or is filtering; it preempts the right slot of row 1.
    """

    def __init__(self, rows=1):
        if rows not in (1, 2):
            rows = 1
        self.rows          = rows
        self.row1_left     = ''
        self.row1_center   = ''
        self.row1_right    = ''
        self.row2_body     = ''
        self.scroll_status = None   # 'scrolled' | 'filtered' | None

    def set_row1(self, left=None, center=None, right=None):
        if left   is not None: self.row1_left   = str(left)
        if center is not None: self.row1_center = str(center)
        if right  is not None: self.row1_right  = str(right)

    def set_row2(self, body=''):
        self.row2_body = str(body or '')

    def render(self, width):
        """Return a list of 1 or 2 rendered strings, each exactly `width`
        characters wide (or empty when width <= 0)."""
        if width <= 0:
            return ['', ''][:self.rows]
        right = self.row1_right
        if self.scroll_status == 'filtered':
            right = '[filtered -- /filter to clear]'
        elif self.scroll_status:
            right = f'[{self.scroll_status}]'
        out = [fit_lcr(self.row1_left, self.row1_center, right, width)]
        if self.rows == 2:
            body = self.row2_body[:width]
            out.append(body.ljust(width))
        return out


# ---------------------------------------------------------------------------
# Ephemeral slot -- a one-shot overlay that eats the top scrollback row
# until cleared.  Two lifecycles:
#   state     -- caller explicitly clears when the underlying condition
#                resolves (e.g. 'Shell exited' cleared when the shell
#                respawns).
#   keystroke -- any subsequent input keystroke dismisses the ephemeral
#                (cheap user acknowledgement).
# ---------------------------------------------------------------------------

class Ephemeral:

    def __init__(self):
        self.text      = ''
        self.lifecycle = None

    def set(self, text, lifecycle='state'):
        self.text      = str(text or '')
        self.lifecycle = lifecycle if lifecycle in ('state', 'keystroke') \
                                   else 'state'

    def clear(self):
        self.text      = ''
        self.lifecycle = None

    def on_keystroke(self):
        if self.lifecycle == 'keystroke':
            self.clear()

    def active(self):
        return bool(self.text)


# ---------------------------------------------------------------------------
# SlashCommands -- simple dispatch table for leading-'/' input.
# ---------------------------------------------------------------------------

class SlashCommands:
    """Holds (name -> callable) pairs; dispatch() returns True iff handled.

    The callable receives (args_str) and may return a local event dict to
    append to history, or None for silent commands.
    """

    def __init__(self):
        self._cmds = {}

    def register(self, name, fn, help_text=''):
        if not name.startswith('/'):
            name = '/' + name
        self._cmds[name] = (fn, help_text)

    def names(self):
        return sorted(self._cmds.keys())

    def help_text(self):
        return '\n'.join(f'{k}  {self._cmds[k][1]}' for k in sorted(self._cmds))

    def dispatch(self, line):
        """Parse '/cmd args...'.  Return (handled, event_or_None).

        When handled is False the caller should send the line as chat.
        """
        if not line.startswith('/'):
            return (False, None)
        parts = line.split(None, 1)
        cmd   = parts[0]
        args  = parts[1] if len(parts) > 1 else ''
        if cmd not in self._cmds:
            return (True, {'kind': 'local',
                           'ts': _cu_time.strftime('%H:%M', _cu_time.gmtime()),
                           'body': f'unknown command: {cmd} (try /help)'})
        fn, _ = self._cmds[cmd]
        try:
            return (True, fn(args))
        except Exception as e:
            return (True, {'kind': 'local',
                           'ts': _cu_time.strftime('%H:%M', _cu_time.gmtime()),
                           'body': f'{cmd} failed: {e!r}'})


# ---------------------------------------------------------------------------
# Tiny utilities re-used by the curses layer (kept pure so they're tested).
# ---------------------------------------------------------------------------

_MENTION_STOPLIST = frozenset({
    'corp', 'ltd', 'llc', 'inc', 'gmbh', 'ag', 'sa', 'co', 'bv', 'nv',
    'the', 'and', 'for', 'of', 'de', 'group', 'solutions', 'services', 'systems',
})


def build_mention_tokens(role, my_name='', extra_company=''):
    """Return a frozenset of lowercase tokens that count as 'addressed at me'.

    role          -- 'support' or 'customer' (always included)
    my_name       -- display name; each word > 2 chars is included
    extra_company -- company / org name; significant words (> 2 chars, not on
                     the generic-word stoplist) are included
    """
    tokens = set()
    if role:
        tokens.add(role.lower())
    for word in _cu_re.split(r'[\s_@-]+', my_name or ''):
        w = word.lower().strip()
        if len(w) > 2:
            tokens.add(w)
    for word in _cu_re.split(r'[\s_@-]+', extra_company or ''):
        w = word.lower().strip()
        if len(w) > 2 and w not in _MENTION_STOPLIST:
            tokens.add(w)
    return frozenset(tokens)


def highlight_mentions(body, tokens):
    """Return (body, mentions_you:bool).

    tokens -- frozenset of lowercase strings returned by build_mention_tokens().
    Matches '@<token>' (case-insensitive, word-boundary on the right).
    Empty tokens -> no highlight.
    """
    if not tokens:
        return body, False
    for token in tokens:
        if _cu_re.search(r'(?i)@' + _cu_re.escape(token) + r'\b', body):
            return body, True
    return body, False


def now_hhmm():
    """UTC HH:MM timestamp string.  Matches wire ts format."""
    return _cu_time.strftime('%H:%M', _cu_time.gmtime())


# ---------------------------------------------------------------------------
# Thin curses wrapper.  Included in the INLINE block so the single-file
# deliverables carry their own copy and don't need the relay/ directory.
# ---------------------------------------------------------------------------
import curses
import locale
import threading


def run_chat_ui(read_line, send_bytes, my_name, stop_event=None,
                readonly=False, wire_out_kind='chat',
                send_name=True, local_echo_on_send=False,
                local_echo_who=None,
                pinned_rows=1, session_info_handler=None,
                mention_tokens=frozenset(),
                customer_nick=None,
                extra_commands=None,
                session_id=''):
    """Render the chat UI until the transport closes or /quit is typed.

    Parameters:
        read_line           -- callable() -> bytes or None  (blocking read
                               of one inbound line; return None on EOF)
        send_bytes          -- callable(bytes) -> None      (write outbound
                               frame; bytes already framed by run_chat_ui)
        my_name             -- str  (this client's display name; used for
                               bold-marking own messages)
        stop_event          -- optional threading.Event; set to request exit
        readonly            -- if True, outbound CHAT is suppressed
        wire_out_kind       -- 'chat' for consumer, or 'customer_chat' for
                               the customer producer
        send_name           -- True for consumers; customer side sets False
        local_echo_on_send  -- True when the broker does not echo back
                               (customer side with CUSTOMER_CHAT)
        local_echo_who      -- nick used in the local-echo event's 'who'
                               field; defaults to my_name.  Customer side
                               passes 'customer' so own messages get the
                               reserved yellow color rather than the hash
                               color for their display name.
        pinned_rows         -- 1 for customer layout, 2 for support
        session_info_handler-- optional callable(payload, pinned, ephemeral)
                               invoked on each inbound session_info frame.
                               Mutates the pinned rows / ephemeral slot to
                               reflect the new state.  Caller-site logic
                               lives here; this module stays generic.
        mention_tokens      -- frozenset from build_mention_tokens(); messages
                               containing '@<token>' ring the bell.
        customer_nick       -- display name substituted for who='customer';
                               None falls back to 'customer'.  Support side
                               also updates this dynamically via
                               session_info_handler receiving history.
        extra_commands      -- optional callable(cmds, history) invoked
                               after the default slash commands are
                               registered.  Lets callers add side-specific
                               commands (e.g. customer /brb, /back) without
                               leaking them into the canonical default set.
    """
    locale.setlocale(locale.LC_ALL, '')
    stop_event = stop_event or threading.Event()
    in_q = _cu_queue.Queue()

    def _reader():
        while not stop_event.is_set():
            line = read_line()
            if line is None:
                in_q.put({'kind': 'session_ended', '_eof': True})
                return
            in_q.put(wire_parse(line))

    t = threading.Thread(target=_reader, daemon=True)
    t.start()

    # Consumer-side announces itself to the broker (idempotent).
    # Customer side skips this: it is the producer, not a consumer.
    if send_name and my_name:
        try:
            send_bytes(wire_encode('name', my_name))
        except Exception:
            pass

    def _inner(stdscr):
        _curses_main(stdscr, in_q, send_bytes, my_name, stop_event, readonly,
                     wire_out_kind, local_echo_on_send, local_echo_who,
                     pinned_rows, session_info_handler, mention_tokens,
                     customer_nick, extra_commands, session_id)

    try:
        curses.wrapper(_inner)
    finally:
        stop_event.set()


def _curses_main(stdscr, in_q, send_bytes, my_name, stop_event, readonly,
                 wire_out_kind='chat', local_echo_on_send=False,
                 local_echo_who=None,
                 pinned_rows=1, session_info_handler=None,
                 mention_tokens=frozenset(),
                 customer_nick=None,
                 extra_commands=None,
                 session_id=''):
    curses.curs_set(1)
    stdscr.nodelay(False)
    stdscr.timeout(80)   # ms; controls inbound-queue drain cadence

    # Color setup.
    try:
        curses.start_color()
        curses.use_default_colors()
        palette = [curses.COLOR_YELLOW,    # 1 -- customer
                   curses.COLOR_CYAN,      # 2 -- [tunl] system
                   curses.COLOR_GREEN,     # 3..
                   curses.COLOR_MAGENTA,
                   curses.COLOR_BLUE,
                   curses.COLOR_RED,
                   curses.COLOR_WHITE,
                   curses.COLOR_YELLOW]
        for i, c in enumerate(palette, start=1):
            try:
                curses.init_pair(i, c, -1)
            except curses.error:
                pass
        has_colors = curses.has_colors()
    except curses.error:
        has_colors = False

    history = HistoryBuffer()
    history.customer_nick = customer_nick
    inp     = InputBuffer()
    nicks   = NickColors()
    cmds    = SlashCommands()
    pinned  = PinnedRows(rows=pinned_rows)
    ephem   = Ephemeral()
    _register_default_commands(cmds, history, stop_event,
                               send_bytes=send_bytes,
                               wire_out_kind=wire_out_kind)
    if extra_commands is not None:
        try:
            extra_commands(cmds, history)
        except Exception:
            pass

    # Seed the screen with a tiny banner.  Including the session id
    # makes it copy-pastable from either pane on the support side, so
    # whoever needs to read it to a third party (a customer on the
    # phone, a colleague over chat) can do so without flipping windows.
    sid_part = f' | session {session_id}' if session_id else ''
    history.append({'kind': 'local', 'ts': now_hhmm(),
                    'body': f'chat connected as {my_name or "anonymous"}'
                            f'{sid_part}'
                            f'{" (read-only)" if readonly else ""}'})

    redraw_needed = True
    mentions_bell = False

    while not stop_event.is_set():
        # Reflect scroll / filter state into the pinned row right slot.
        _, at_bottom = history.lines_for_viewport(1, 80)  # cheap check
        if not at_bottom:
            pinned.scroll_status = 'scrolled'
        elif history.is_filtered():
            pinned.scroll_status = 'filtered'
        else:
            pinned.scroll_status = None

        if redraw_needed:
            _render(stdscr, history, inp, nicks, has_colors,
                    mentions_bell, my_name, pinned, ephem)
            mentions_bell = False
            redraw_needed = False

        # Drain inbound queue.
        try:
            while True:
                ev = in_q.get_nowait()
                if ev.get('kind') == 'session_info':
                    payload = ev.get('payload', {}) or {}
                    history.session_info = payload  # /who reads this
                    if session_info_handler is not None:
                        try:
                            session_info_handler(payload,
                                                 pinned, ephem,
                                                 history=history)
                        except Exception:
                            pass
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'ephemeral':
                    lifecycle = ev.get('lifecycle', 'state')
                    if lifecycle == 'clear':
                        ephem.clear()
                    else:
                        ephem.set(ev.get('body', ''), lifecycle=lifecycle)
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'session_ended':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- session ended --'})
                    ephem.set('session ended', lifecycle='keystroke')
                    redraw_needed = True
                    if ev.get('_eof'):
                        # Transport closed; linger briefly so the user sees
                        # the message, then exit.
                        stdscr.timeout(2000)
                        try:
                            stdscr.getch()
                        except curses.error:
                            pass
                        return
                    continue
                if ev.get('kind') == 'readonly':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- read-only mode --'})
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'readwrite':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- interactive mode restored --'})
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'brb':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- customer is away; input frozen --'})
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'brb_end':
                    history.append({'kind': 'local', 'ts': now_hhmm(),
                                    'body': '-- customer is back; input restored --'})
                    redraw_needed = True
                    continue
                if ev.get('kind') == 'heartbeat':
                    continue
                if ev.get('kind') in ('chat', 'system', 'note'):
                    body = ev.get('body', '')
                    _, hit = highlight_mentions(body, mention_tokens)
                    if hit and ev.get('who') != my_name:
                        mentions_bell = True
                    history.append(ev)
                    redraw_needed = True
                    continue
                # 'unknown' and anything else we just tuck away.
                history.append({'kind': 'local', 'ts': now_hhmm(),
                                'body': f'? {ev.get("raw", ev)}'})
                redraw_needed = True
        except _cu_queue.Empty:
            pass

        # Wait for keystroke (timeout already set above).
        # get_wch returns a str (UTF-8-decoded printable, possibly multibyte)
        # or int (control codes / curses KEY_*).  curses.error means "no
        # input within the timeout" -- the getch() equivalent of -1.
        # Using getch + chr(k) here would treat each UTF-8 byte of an
        # umlaut as a separate latin-1 char, double-encoding on submit.
        try:
            k = stdscr.get_wch()
        except curses.error:
            continue
        except KeyboardInterrupt:
            return
        redraw_needed = True

        # Any keystroke dismisses a keystroke-lifecycle ephemeral.
        ephem.on_keystroke()

        # Handle terminal resize first.
        if k == curses.KEY_RESIZE:
            try:
                h, w = stdscr.getmaxyx()
                curses.resize_term(h, w)
            except curses.error:
                pass
            continue

        _handle_key(k, inp, history, cmds, send_bytes, stdscr, readonly,
                    my_name, wire_out_kind, local_echo_on_send, local_echo_who,
                    mention_tokens)


def _register_default_commands(cmds, history, stop_event, send_bytes=None,
                               wire_out_kind='chat'):
    def _help(args):
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': 'commands: ' + ' '.join(cmds.names())}

    def _quit(args):
        stop_event.set()
        return None

    def _clear(args):
        history._events = []
        history._wrap_key = None
        return None

    def _notime(args):
        history.set_show_ts(not history.show_ts)
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': f'timestamps: '
                        f'{"on" if history.show_ts else "off"}'}

    def _search(args):
        pat = args.strip()
        history.set_filter(pat or None)
        history.scroll_to_bottom()
        if not pat:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'scrollback filter cleared'}
        try:
            _cu_re.compile(pat)
        except _cu_re.error as e:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': f'invalid regex: {e}'}
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': f'scrollback filtered to /{pat}/ (case-insensitive)'}

    def _events(args):
        # Toggle the pre-canned "system notices only" view.
        if history.filter_kinds and 'system' in history.filter_kinds:
            history.set_kind_filter(None)
            body = 'events filter off (full scrollback)'
        else:
            history.set_kind_filter({'system', 'note', 'session_ended'})
            body = 'events filter on (system notices + notes only)'
        history.scroll_to_bottom()
        return {'kind': 'local', 'ts': now_hhmm(), 'body': body}

    def _note(args):
        body = args.strip()
        if not body:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'usage: /note <text>'}
        if send_bytes is None:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'no transport; /note cannot be submitted'}
        try:
            send_bytes(wire_encode('note', body))
        except Exception as e:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': f'/note failed: {e!r}'}
        # The broker echoes an SNOTE back; no local echo needed.
        return None

    def _who(args):
        info = getattr(history, 'session_info', None) or {}
        cust = info.get('customer_nick') or info.get('customer') or 'customer'
        viewer_names = info.get('viewer_names') or []
        n_viewers = info.get('viewers')
        parts = [f'customer: {cust}']
        if viewer_names:
            parts.append('viewers: ' + ', '.join(viewer_names))
        elif n_viewers:
            parts.append(f'viewers: {n_viewers}')
        else:
            parts.append('no viewers connected yet')
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': ' | '.join(parts)}

    def _me(args):
        body = args.strip()
        if not body:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'usage: /me <action>'}
        if send_bytes is None:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'no transport; /me cannot be sent'}
        try:
            send_bytes(wire_encode(wire_out_kind, '* ' + body))
        except Exception as e:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': f'/me failed: {e!r}'}
        # Local echo is handled normally (by local_echo_on_send when
        # set, or via broker echo).
        return None

    def _nick(args):
        new_nick = args.strip()
        if not new_nick:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'usage: /nick <name>'}
        # Sanity: keep it printable, no whitespace, reasonable length.
        if any(c.isspace() for c in new_nick) or len(new_nick) > 32:
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': 'nick must be one word, <=32 chars'}
        if wire_out_kind == 'chat':
            # Consumer side: tell the broker our new name; the
            # existing 'name' wire kind already triggers re-broadcast
            # (NAME ctrl frame) so peers and the customer see it.
            # Also update history.my_nick so the next local echo on
            # this seat uses the new name -- otherwise our own pane
            # keeps tagging our messages with the original my_name
            # until the wrapper restarts, while the customer and
            # other consumers already see the new one.
            if send_bytes is not None:
                try:
                    send_bytes(wire_encode('name', new_nick))
                except Exception as e:
                    return {'kind': 'local', 'ts': now_hhmm(),
                            'body': f'/nick failed: {e!r}'}
            history.my_nick = new_nick
            return {'kind': 'local', 'ts': now_hhmm(),
                    'body': f'now known as {new_nick}'}
        # Customer side: send CUSTOMER_NICK so the broker can fan it
        # out via SESSION_INFO; consumers substitute the chosen nick
        # for the company-derived 'customer' label.  Update the local
        # history so the customer's own pane labels their messages
        # the same way as well.
        history.customer_nick = new_nick
        if send_bytes is not None:
            try:
                send_bytes(wire_encode('customer_nick', new_nick))
            except Exception as e:
                return {'kind': 'local', 'ts': now_hhmm(),
                        'body': f'/nick failed: {e!r}'}
        return {'kind': 'local', 'ts': now_hhmm(),
                'body': f'now known as {new_nick}'}

    cmds.register('help',   _help,   'show this help')
    cmds.register('quit',   _quit,   'disconnect')
    cmds.register('clear',  _clear,  'clear scrollback (not history on relay)')
    cmds.register('notime', _notime, 'toggle HH:MM timestamps')
    cmds.register('search', _search, 'filter scrollback by regex; /search to clear')
    cmds.register('filter', _search, 'alias for /search')
    cmds.register('events', _events, 'toggle system-notices-only view')
    cmds.register('note',   _note,   'append a session-scoped note (live relay)')
    cmds.register('who',    _who,    'show customer + connected viewers')
    cmds.register('me',     _me,     'send an emote-style chat line')
    cmds.register('nick',   _nick,   'set your display name')


def _longest_common_prefix(strings):
    if not strings:
        return ''
    out = ''
    for chars in zip(*strings):
        if all(c == chars[0] for c in chars):
            out += chars[0]
        else:
            break
    return out


def _gather_nicks(history, my_name, mention_tokens):
    """Collect known display names from history, my_name, and mention tokens."""
    nicks = set()
    for ev in getattr(history, '_events', []):
        who = ev.get('who')
        if isinstance(who, str) and who.strip():
            nicks.add(who.strip())
    if my_name:
        nicks.add(my_name)
    cn = getattr(history, 'customer_nick', None)
    if cn:
        nicks.add(cn)
    for t in (mention_tokens or frozenset()):
        if t:
            nicks.add(str(t))
    return sorted(nicks)


def _complete_token(text, cursor, cmds, history, my_name, mention_tokens):
    """Tab completion.  Return (new_text, new_cursor, matches_or_None).

    Two modes:
      - slash command: when the current line starts with '/' and the cursor
        is within the first token, complete against cmds.names().
      - @-mention: find the nearest preceding '@' (at BOL or after space);
        complete the word against known nicks.

    When there is exactly one match, complete fully (append space if the
    char after the cursor is not already a space).  When multiple matches
    share a longer common prefix, extend to that prefix.  When prefix
    cannot be extended, return (unchanged, matches) so the caller can
    display the list.
    """
    if cursor > len(text):
        cursor = len(text)
    prefix = text[:cursor]
    suffix = text[cursor:]

    # Slash command completion only for the first token.
    if prefix.startswith('/') and ' ' not in prefix:
        token_start = 0
        candidates = list(cmds.names())
    else:
        at_idx = -1
        for i in range(len(prefix) - 1, -1, -1):
            if prefix[i] == '@':
                if i == 0 or prefix[i - 1].isspace():
                    at_idx = i
                break
            if prefix[i].isspace():
                break
        if at_idx < 0:
            return (text, cursor, None)
        token_start = at_idx
        candidates = ['@' + n for n in _gather_nicks(history, my_name, mention_tokens)]

    token = prefix[token_start:]
    matches = [c for c in candidates if c.startswith(token)]
    if not matches:
        return (text, cursor, None)
    if len(matches) == 1:
        completion = matches[0]
        if not suffix.startswith(' '):
            completion += ' '
        new_text = prefix[:token_start] + completion + suffix
        new_cursor = token_start + len(completion)
        return (new_text, new_cursor, None)
    lcp = _longest_common_prefix(matches)
    if lcp == token:
        return (text, cursor, matches)
    new_text = prefix[:token_start] + lcp + suffix
    new_cursor = token_start + len(lcp)
    return (new_text, new_cursor, matches)


def _handle_key(k, inp, history, cmds, send_bytes, stdscr, readonly,
                my_name='', wire_out_kind='chat', local_echo_on_send=False,
                local_echo_who=None, mention_tokens=frozenset()):
    # get_wch returns control characters (Enter, Tab, Backspace, Ctrl-*) as
    # a one-char str; the special-key handlers below compare with int codes.
    # Normalize a single-char control str back to its ord so those handlers
    # keep working.  Printable input stays a str and falls through to the
    # str branch at the bottom.
    if isinstance(k, str) and len(k) == 1:
        c = ord(k)
        if c < 32 or c == 127:
            k = c
    # Scrollback keys -- operate on the history viewport.
    h, _ = stdscr.getmaxyx()
    page = max(1, h - 3)
    if k == curses.KEY_PPAGE:
        history.scroll(+page, page);   return
    if k == curses.KEY_NPAGE:
        history.scroll(-page, page);   return
    if k == curses.KEY_HOME:
        inp.cursor_home();             return
    if k == curses.KEY_END:
        inp.cursor_end();              return
    if k == curses.KEY_LEFT:
        inp.cursor_left();             return
    if k == curses.KEY_RIGHT:
        inp.cursor_right();            return
    if k == curses.KEY_UP:
        inp.history_prev();            return
    if k == curses.KEY_DOWN:
        inp.history_next();            return
    if k in (curses.KEY_BACKSPACE, 127, 8):
        inp.backspace();               return
    if k == curses.KEY_DC:
        inp.delete();                  return
    if k == 21:                # ctrl-u
        inp.kill_to_start();           return
    if k == 11:                # ctrl-k
        inp.kill_to_end();             return
    if k == 23:                # ctrl-w
        inp.kill_word_left();          return
    if k == 3:                 # ctrl-c: ignore (use /quit to exit)
        return
    if k == 9:                 # TAB: complete slash command or @mention
        new_text, new_cursor, matches = _complete_token(
            inp.text, inp.cursor, cmds, history, my_name, mention_tokens,
        )
        if (new_text, new_cursor) != (inp.text, inp.cursor):
            inp.text, inp.cursor = new_text, new_cursor
        elif matches and len(matches) > 1:
            shown = ' '.join(matches[:8])
            suffix = '' if len(matches) <= 8 else f' ... ({len(matches)} total)'
            history.append({'kind': 'local', 'ts': now_hhmm(),
                            'body': f'completions: {shown}{suffix}'})
        return
    if k in (10, 13, curses.KEY_ENTER):
        line = inp.submit()
        if not line.strip():
            return
        # Slash command?
        handled, ev = cmds.dispatch(line)
        if handled:
            if ev is not None:
                history.append(ev)
            history.scroll_to_bottom()
            return
        if readonly:
            history.append({'kind': 'local', 'ts': now_hhmm(),
                            'body': '-- read-only; message not sent --'})
            return
        # Send as chat.
        try:
            send_bytes(wire_encode(wire_out_kind, line))
            if local_echo_on_send:
                history.append({'kind': 'chat',
                                'who':  history.my_nick or local_echo_who
                                        or my_name or 'you',
                                'ts':   now_hhmm(),
                                'body': line})
        except Exception as e:
            history.append({'kind': 'local', 'ts': now_hhmm(),
                            'body': f'send failed: {e!r}'})
        history.scroll_to_bottom()
        return
    # Printable char (or multibyte sequence).
    # get_wch returns a str directly for printable input (UTF-8 decoded
    # using the active locale); int returns are control codes that
    # didn't match any of the cases above.
    if isinstance(k, str):
        if k.isprintable():
            inp.insert(k)
        return
    try:
        ch = chr(k)
    except (ValueError, OverflowError):
        return
    if ch.isprintable():
        inp.insert(ch)


def _render(stdscr, history, inp, nicks, has_colors, bell, my_name,
            pinned=None, ephem=None):
    try:
        stdscr.erase()
        h, w = stdscr.getmaxyx()
        if h < 4 or w < 20:
            stdscr.addstr(0, 0, 'terminal too small')
            stdscr.refresh()
            return

        # Layout (top to bottom):
        #   [scrollback region]   rows 0 .. hist_h-1
        #   [ephemeral, if set]   row  hist_h-1  (eats last scrollback row)
        #   [input row]           row  h - pinned_h - 1
        #   [pinned rows ...]     rows h - pinned_h .. h-1  (primary at h-1)
        pinned_h = pinned.rows if pinned is not None else 0
        hist_h   = max(1, h - pinned_h - 1)   # 1 for input
        input_row = h - pinned_h - 1

        zone_attr = _zone_attr(has_colors)

        # Scrollback region: rows 0..hist_h-1.
        # Ephemeral occupies the bottom-most scrollback row (above input).
        eph_active = ephem is not None and ephem.active()
        avail_h = hist_h - (1 if eph_active else 0)
        rows, _at_bottom = history.lines_for_viewport(avail_h, w - 1)
        nick_col_w = history._nick_col_w
        # Bottom-align: push messages down so the latest row is directly
        # above the ephemeral (or input when no ephemeral is active).
        row_y = max(0, avail_h - len(rows))
        for row in rows:
            _addstr_colored(stdscr, row_y, 0, row, nicks, has_colors,
                            my_name, nick_col_w)
            row_y += 1
        if eph_active:
            _paint_tinted_row(stdscr, hist_h - 1, w,
                              f'[!] {ephem.text}',
                              _ephemeral_attr(has_colors))

        # Input row (tinted zone).  Horizontal-scroll the buffer so cursor
        # stays visible.  Cursor placement is deferred to the end of the
        # render so subsequent writes do not move it off the input line.
        prompt = '> '
        max_in = max(1, w - 1 - len(prompt))
        start  = max(0, inp.cursor - max_in + 1)
        visible = inp.text[start:start + max_in]
        input_cursor_col = len(prompt) + (inp.cursor - start)
        _paint_tinted_row(stdscr, input_row, w, prompt + visible, zone_attr)

        # Pinned rows at bottom: rendered in reverse so the primary row
        # (row 1) lands at h-1 (most prominent) and supplementary rows
        # sit above it.
        if pinned is not None:
            rendered = pinned.render(w - 1)
            for i, row in enumerate(reversed(rendered)):
                _paint_tinted_row(stdscr, h - 1 - i, w, row, zone_attr)

        if bell:
            try:
                curses.beep()
            except curses.error:
                pass

        # Place the cursor last so no intervening write can move it.
        try:
            stdscr.move(input_row, min(input_cursor_col, w - 1))
        except curses.error:
            pass

        stdscr.refresh()
    except curses.error:
        pass


def _paint_tinted_row(stdscr, y, w, content, attr):
    """Paint row `y` with `attr` applied to the full width `w`.

    Writes `content` (truncated / padded to w-1 chars) into cols 0..w-2 via
    addstr, then uses chgat to extend `attr` to the last column without
    actually writing a character there.  Writing col w-1 on the bottom row
    would scroll the window; chgat only changes attributes.
    """
    try:
        body = content[:w - 1].ljust(w - 1)
        stdscr.addstr(y, 0, body, attr)
    except curses.error:
        pass
    try:
        stdscr.chgat(y, 0, w, attr)
    except curses.error:
        pass


def _zone_attr(has_colors):
    """Attribute for pinned / input rows.  Subtle reverse-video when
    colors are available; bold otherwise."""
    if has_colors:
        try:
            return curses.A_REVERSE
        except AttributeError:
            return curses.A_NORMAL
    return curses.A_BOLD | curses.A_REVERSE


def _ephemeral_attr(has_colors):
    """Attribute for the ephemeral overlay row -- louder than pinned."""
    if has_colors:
        return curses.A_BOLD | curses.A_REVERSE
    return curses.A_STANDOUT


def _row_to_str(row, nick_col_w):
    """Flatten a structured row dict to a plain string (no-color terminals)."""
    if row['is_continuation']:
        return row['body']
    ts   = (row['ts'] + ' ') if row['ts'] else ''
    nick = row['nick_display'].ljust(nick_col_w)
    return ts + nick + row['body']


def _addstr_colored(stdscr, y, x, row, nicks, has_colors, my_name, nick_col_w=8):
    """Write a structured row dict at (y, x).

    Coloring rules:
    - [tunl] system rows: whole row in cyan.
    - '--' local rows: whole row dim.
    - Chat / note rows: ts plain, nick colored, body terminal-default.
    - Continuation rows: indented body, no coloring.
    """
    try:
        _, w = stdscr.getmaxyx()
        budget = max(0, w - x - 1)

        if row['is_continuation']:
            stdscr.addnstr(y, x, row['body'], budget)
            return

        if not has_colors:
            stdscr.addnstr(y, x, _row_to_str(row, nick_col_w), budget)
            return

        ck   = row['nick_color_key']
        ts   = (row['ts'] + ' ') if row['ts'] else ''
        nick = row['nick_display']
        body = row['body']

        # [tunl]: whole row in system color.
        if ck == 'tunl':
            line = _row_to_str(row, nick_col_w)
            stdscr.addnstr(y, x, line, budget,
                           curses.color_pair(nicks.index_for('tunl')))
            return
        # '--' local: whole row dim.
        if ck == '--':
            stdscr.addnstr(y, x, _row_to_str(row, nick_col_w), budget,
                           curses.A_DIM)
            return

        # Chat / note: ts uncolored, nick colored, body terminal-default.
        idx  = nicks.index_for(ck)
        attr = curses.color_pair(idx)
        if my_name and ck == my_name:
            attr |= curses.A_BOLD

        col = x
        if ts and col < w - 1:
            stdscr.addnstr(y, col, ts, min(len(ts), w - col - 1))
            col += len(ts)
        nick_padded = nick.ljust(nick_col_w)
        if col < w - 1:
            stdscr.addnstr(y, col, nick_padded,
                           min(len(nick_padded), w - col - 1), attr)
            col += len(nick_padded)
        if body and col < w - 1:
            stdscr.addnstr(y, col, body, w - col - 1)
    except curses.error:
        pass
# --- END INLINE chatui --------------------------------------------------------


# --- BEGIN INLINE wire --------------------------------------------------------
# Everything between BEGIN/END markers is the inlineable body, copied
# verbatim into linbit-tunl.py and tunl.py.  tests/test_inline_blocks.py
# asserts the copies match byte-for-byte.

import struct as _wi_struct


WIRE_VERSION = b"linbit-tunl-share/4"

MAX_FRAGMENT = 256 * 1024        # 256 KiB per fragment payload
MAX_MESSAGE = 4 * 1024 * 1024    # 4 MiB per reassembled logical message

# Frame types (low 6 bits of type byte).
TYPE_DATA = 0x00
TYPE_CTRL = 0x01
TYPE_HELLO = 0x02
TYPE_ERROR = 0x03

# Fragmentation (high 2 bits of type byte).
FRAG_SINGLE = 0
FRAG_START = 1
FRAG_CONT = 2
FRAG_END = 3

_TYPE_NAMES = {
    TYPE_DATA: "DATA",
    TYPE_CTRL: "CTRL",
    TYPE_HELLO: "HELLO",
    TYPE_ERROR: "ERROR",
}
_FRAG_NAMES = {
    FRAG_SINGLE: "SINGLE",
    FRAG_START: "START",
    FRAG_CONT: "CONT",
    FRAG_END: "END",
}

_NO_FRAGMENT_TYPES = (TYPE_HELLO, TYPE_ERROR)


class WireProtocolError(Exception):
    """Raised on any wire-level protocol violation.  Caller closes the
    connection cleanly and (if customer-facing) shows a friendly message;
    the relay-side log keeps the detail."""


def encode_frame(type_code, frag, payload):
    """Encode a single frame.  Caller is responsible for fragmentation
    decisions; encode_message() is the high-level helper that fragments
    automatically when payload exceeds MAX_FRAGMENT."""
    if not (0 <= type_code <= 0x3F):
        raise ValueError("type code must fit in 6 bits")
    if frag not in (FRAG_SINGLE, FRAG_START, FRAG_CONT, FRAG_END):
        raise ValueError("frag must be SINGLE/START/CONT/END")
    if len(payload) > MAX_FRAGMENT:
        raise ValueError(
            "fragment payload {0} exceeds MAX_FRAGMENT {1}".format(
                len(payload), MAX_FRAGMENT))
    header = bytes([(frag << 6) | type_code]) + _wi_struct.pack(">I", len(payload))
    return header + bytes(payload)


def encode_message(type_code, payload):
    """Encode a logical message of any size up to MAX_MESSAGE, fragmenting
    automatically.  Returns the concatenation of all frame bytes ready
    for the wire."""
    n = len(payload)
    if n <= MAX_FRAGMENT:
        return encode_frame(type_code, FRAG_SINGLE, payload)
    if type_code in _NO_FRAGMENT_TYPES:
        raise ValueError(
            "type {0} cannot be fragmented".format(_TYPE_NAMES.get(type_code, type_code)))
    if n > MAX_MESSAGE:
        raise ValueError(
            "message size {0} exceeds MAX_MESSAGE {1}".format(n, MAX_MESSAGE))
    out = bytearray()
    offset = 0
    first = True
    while offset < n:
        end = offset + MAX_FRAGMENT
        if end >= n:
            chunk = payload[offset:n]
            offset = n
            frag = FRAG_END
        else:
            chunk = payload[offset:end]
            offset = end
            frag = FRAG_CONT
        if first:
            frag = FRAG_START
            first = False
        out += encode_frame(type_code, frag, chunk)
    return bytes(out)


def encode_ctrl(line, binary_args=()):
    """Build a CTRL frame payload.  `line` is the verb plus space-separated
    text args (no NUL, no newline); each entry in `binary_args` becomes a
    length-prefixed binary trailer."""
    if "\n" in line or "\0" in line:
        raise ValueError("ctrl line must not contain NUL or newline")
    out = bytearray()
    out += line.encode("utf-8")
    out += b"\n"
    for arg in binary_args:
        b = bytes(arg)
        out += _wi_struct.pack(">I", len(b))
        out += b
    return bytes(out)


def decode_ctrl(payload):
    """Parse a CTRL payload into (line: str, binary_args: list of bytes).
    Raises WireProtocolError on malformed input."""
    nl = payload.find(b"\n")
    if nl < 0:
        raise WireProtocolError("ctrl payload missing verb-line newline")
    verb_line_bytes = payload[:nl]
    # B2.9 (2026-05-06 audit): mirror encode_ctrl's NUL rejection on
    # the decode side.  encode rejects NUL in the verb-line; decode
    # used to silently accept it, so a malicious peer could embed
    # NUL into the verb token and the per-verb dispatcher would
    # silently drop the frame instead of treating it as a protocol
    # error (per the v4 spec).  Treat NUL in verb-line as a hard
    # protocol error -- close the connection cleanly.
    if b"\x00" in verb_line_bytes:
        raise WireProtocolError("ctrl verb-line contains NUL byte")
    try:
        line = verb_line_bytes.decode("utf-8")
    except UnicodeDecodeError as e:
        raise WireProtocolError("ctrl verb-line not valid UTF-8: {0}".format(e))
    rest = payload[nl + 1:]
    args = []
    while rest:
        if len(rest) < 4:
            raise WireProtocolError("ctrl arg trailer truncated (length header)")
        n = _wi_struct.unpack(">I", rest[:4])[0]
        if n > len(rest) - 4:
            raise WireProtocolError(
                "ctrl arg length {0} exceeds remaining payload {1}".format(
                    n, len(rest) - 4))
        args.append(bytes(rest[4:4 + n]))
        rest = rest[4 + n:]
    return line, args


class FrameStream:
    """Stream-oriented frame reader.

    Wraps a `read_some(n)` callable that returns up to n bytes (or b'' on
    EOF) -- e.g. socket.recv or a pipe.read.  Buffers internally to deliver
    exact-length reads and assembles fragmented messages.

    Use read_frame() for raw frame access (e.g. to forward DATA payloads
    immediately without buffering all fragments) or read_message() for the
    common case of "wait until I have a complete logical message"."""

    def __init__(self, read_some):
        self._read_some = read_some
        self._eof = False
        self._reasm_type = None
        self._reasm_buf = bytearray()

    def _read_exact(self, n):
        buf = bytearray()
        while len(buf) < n:
            chunk = self._read_some(n - len(buf))
            if not chunk:
                self._eof = True
                raise EOFError("connection closed before {0} bytes read".format(n))
            buf += chunk
        return bytes(buf)

    def read_frame(self):
        """Read one wire frame.  Returns (type_code, frag, payload).
        Raises EOFError on clean EOF (only at frame boundary), or
        WireProtocolError on malformed header / oversize fragment."""
        header = self._read_exact(5)
        type_byte = header[0]
        frag = (type_byte >> 6) & 0x3
        type_code = type_byte & 0x3F
        length = _wi_struct.unpack(">I", header[1:5])[0]
        if length > MAX_FRAGMENT:
            raise WireProtocolError(
                "fragment length {0} exceeds MAX_FRAGMENT {1}".format(
                    length, MAX_FRAGMENT))
        payload = self._read_exact(length) if length else b""
        return type_code, frag, payload

    def read_message(self):
        """Read frames until a complete logical message is available.
        Returns (type_code, payload).  Raises WireProtocolError on
        protocol violation (mid-sequence start, type-code mismatch,
        fragmented HELLO/ERROR, oversize reassembled message, etc.)."""
        while True:
            type_code, frag, payload = self.read_frame()
            if frag == FRAG_SINGLE:
                if self._reasm_type is not None:
                    raise WireProtocolError(
                        "SINGLE frame mid-sequence (was assembling type {0})".format(
                            self._reasm_type))
                return type_code, payload
            if frag == FRAG_START:
                if self._reasm_type is not None:
                    raise WireProtocolError(
                        "START frame mid-sequence (was assembling type {0})".format(
                            self._reasm_type))
                if type_code in _NO_FRAGMENT_TYPES:
                    raise WireProtocolError(
                        "type {0} must not be fragmented".format(
                            _TYPE_NAMES.get(type_code, type_code)))
                if len(payload) > MAX_MESSAGE:
                    raise WireProtocolError(
                        "reassembled message exceeds MAX_MESSAGE")
                self._reasm_type = type_code
                self._reasm_buf = bytearray(payload)
                continue
            # CONT or END
            if self._reasm_type is None:
                raise WireProtocolError(
                    "{0} frame without preceding START".format(_FRAG_NAMES[frag]))
            if type_code != self._reasm_type:
                raise WireProtocolError(
                    "type-code mismatch within fragmented sequence "
                    "(START was {0}, this is {1})".format(
                        self._reasm_type, type_code))
            if len(self._reasm_buf) + len(payload) > MAX_MESSAGE:
                raise WireProtocolError(
                    "reassembled message exceeds MAX_MESSAGE")
            self._reasm_buf += payload
            if frag == FRAG_END:
                t = self._reasm_type
                buf = bytes(self._reasm_buf)
                self._reasm_type = None
                self._reasm_buf = bytearray()
                return t, buf


def read_some_from_socket(sock):
    """Adapter: returns a read_some(n) callable for a socket.socket."""
    def _read(n):
        try:
            return sock.recv(n)
        except OSError:
            return b""
    return _read


def read_some_from_fileobj(f):
    """Adapter: returns a read_some(n) callable for a binary file-like.
    `f.read(n)` is called; returns whatever read returns (b'' on EOF)."""
    def _read(n):
        try:
            data = f.read(n)
        except OSError:
            return b""
        return data if data else b""
    return _read


# --- END INLINE wire ----------------------------------------------------------


def _v4_data(raw: bytes) -> bytes:
    """v4 DATA frame carrying tmux control-mode bytes for the broker."""
    return encode_message(TYPE_DATA, raw)


def _v4_ctrl(line: str, *bin_args: bytes) -> bytes:
    """v4 CTRL frame: verb-line + 0 or more length-prefixed binary args."""
    return encode_message(TYPE_CTRL, encode_ctrl(line, bin_args))


def _v4_hello() -> bytes:
    return encode_message(TYPE_HELLO, WIRE_VERSION)


# ---------------------------------------------------------------------------
# Configuration (override via environment)
# ---------------------------------------------------------------------------
def _env_int(name: str, default: int) -> int:
    v = os.environ.get(name)
    if v is None or v == '':
        return default
    try:
        return int(v)
    except ValueError:
        sys.exit(f"ERROR: {name}={v!r} is not an integer")

RELAY_HOST       = os.environ.get("TUNL_RELAY", "tunl.linbit.com")
RELAY_PORT       = _env_int("TUNL_RELAY_PORT", 443)
RELAY_USER       = os.environ.get("TUNL_USER",           "tunl")
RELAY_API        = os.environ.get("TUNL_API", "https://tunl.linbit.com")
def _default_authkeys():
    try:
        return os.path.join(pwd.getpwuid(os.getuid()).pw_dir,
                            '.ssh', 'authorized_keys')
    except Exception:
        return os.path.expanduser('~/.ssh/authorized_keys')
AUTHKEYS_FILE    = os.environ.get("TUNL_AUTHKEYS_FILE", _default_authkeys())
TARGET_HOST      = os.environ.get("TUNL_TARGET_HOST",    "localhost")
TARGET_PORT      = _env_int("TUNL_TARGET_PORT", 22)
# Replaced at package build time with the relay's actual host CA public key.
# If the placeholder is not substituted, SSH will reject any relay host key as
# unverifiable -- a hard fail, not a silent MITM.
RELAY_CA_PUBKEY = os.environ.get(
    "TUNL_RELAY_CA_PUBKEY",
    "ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAILHlzJ/Pzvs1xTetEr+VkGurdCbJfvTjSu3BrRQ/eSaF linbit-tunl-host-ca",
)

VERSION              = "0.3.0"
GIT_COMMIT           = "8723da1"
TUNNEL_PORT_MIN      = 30000
TUNNEL_PORT_MAX      = 39999
RECONNECT_DELAY_MIN  = 1
RECONNECT_DELAY_MAX  = 30
# Idle keepalive cadence on the customer<->broker SSH stdio.  Mirrors
# relay/share_common.HEARTBEAT_INTERVAL.  Sent only when no other
# outbound traffic has happened in this many seconds.  See
# docs/share.md "Liveness".
HEARTBEAT_INTERVAL   = 3
# Soft late threshold: status row flips to "relay: no contact Ns ago"
# when nothing has arrived from the broker for this long.  Mirrors
# relay/share_common.PEER_STALE_AFTER.
PEER_STALE_AFTER     = 7
PROTOCOL_VERSION     = "linbit-tunl/1"
PREFIX               = "[linbit-tunl]"
_ASCII = os.environ.get('TUNL_ASCII', '').lower() in ('1', 'yes', 'true')
SYM_ON   = '*' if _ASCII else '\u25cf'   # ● connected
SYM_OFF  = 'o' if _ASCII else '\u25cb'   # ○ disconnected
SYM_TUN  = '^' if _ASCII else '\u25c6\u2191'  # ◆↑ tunnel active
SYM_NTUN = '-' if _ASCII else '\u25c7'   # ◇ no tunnel
COMPRESS             = os.environ.get("TUNL_COMPRESS", "1") != "0"
CUSTOMER_COMPANY     = os.environ.get("TUNL_CUSTOMER_COMPANY", "")

SESSION_ID_RE = re.compile(r'^[a-z]+-[a-z]+-[a-z]+-[a-z]+$')
KEY_LINE_RE   = re.compile(r'^(?:\S+\s+)*(ssh-|ecdsa-|sk-)')

# B2.1 / B4.2 / B4.3 fix (2026-05-06 audit): authoritative authorized_keys
# parser on the customer side.  Whatever the relay sends in SUPPORT_KEY,
# we drop it through this filter and rebuild the line ourselves with our
# own `from=` clause.  A malicious relay that tries to inject extra
# options or extra key lines via `\n`-padded payloads cannot beat this
# parser -- embedded NUL/CR/LF is a hard reject, and consumer-supplied
# options are silently dropped.
_AUTHKEYS_KEY_TYPE_NAMES = (
    'ssh-ed25519', 'ssh-rsa', 'ssh-dss',
    'ecdsa-sha2-nistp256', 'ecdsa-sha2-nistp384', 'ecdsa-sha2-nistp521',
    'sk-ssh-ed25519@openssh.com',
    'sk-ecdsa-sha2-nistp256@openssh.com',
)
_AUTHKEYS_B64_RE = re.compile(r'^[A-Za-z0-9+/]+=*$')

# `from=` clause we install with each support key.  Today this is fixed
# at relay-side loopback (the connection arrives via the reverse tunnel
# on the customer host's local interface).  When/if the script grows
# support for "around the corner" tunnelling -- where the SSH connection
# the support engineer makes terminates not on this host but on a
# downstream node reachable from here -- this becomes the customer's
# view of where the connection originates, not the relay's.
AUTHKEYS_FROM_CLAUSE = 'from="127.0.0.1,::1"'


def _extract_bare_key(line):
    """Parse an authorized_keys-shaped line into `<type> <b64> [<comment>]`.

    Mirrors relay/share_common.extract_bare_key() (which is the broker's
    copy).  Returns the bare-key string on success, None on:
      - embedded NUL / CR / LF byte
      - no recognised SSH key-type token in the line
      - malformed base64 in the key blob

    Any options that precede the key-type token are silently discarded
    -- the customer authoritatively rebuilds the from=/command=/etc.
    options it actually wants to install.
    """
    if not isinstance(line, str) or not line:
        return None
    if '\n' in line or '\r' in line or '\0' in line:
        return None
    tokens = line.split()
    if not tokens:
        return None
    for i, tok in enumerate(tokens):
        if tok in _AUTHKEYS_KEY_TYPE_NAMES:
            if i + 1 >= len(tokens):
                return None
            b64 = tokens[i + 1]
            if not _AUTHKEYS_B64_RE.match(b64):
                return None
            comment = ' '.join(tokens[i + 2:])
            if comment:
                return f'{tok} {b64} {comment}'
            return f'{tok} {b64}'
    return None


def _format_authkeys_line(bare_key):
    """Build the final authorized_keys line we will install.

    bare_key is the trusted output of _extract_bare_key().  We prepend
    the customer-authoritative AUTHKEYS_FROM_CLAUSE and (currently) no
    other options.  No further validation is done -- the input has
    already been parsed strictly, so the result is guaranteed to be a
    single line.
    """
    return f'{AUTHKEYS_FROM_CLAUSE} {bare_key}'


def _key_fingerprint_label(bare_key):
    """Bounded, terminal-safe label for a bare-key line.

    Used in chat-pane announcements where the relay-controlled key
    blob would otherwise reach the TTY.  Returns `<type> ...<last-8-of-b64>`,
    a short token that cannot embed terminal-control sequences.
    """
    parts = bare_key.split()
    ktype = parts[0] if parts else 'ssh-key'
    b64 = parts[1] if len(parts) > 1 else ''
    return f'{ktype} ...{b64[-8:]}' if b64 else ktype


def _sanitize_text(data, *, max_bytes, allow_tab=False, allow_newlines=False):
    """Mirror of relay/share_common.sanitize_text -- strip control bytes
    from text-shaped payloads received from the relay before they reach
    a TTY, curses surface, or local file.

    B2.5: a malicious or buggy relay can put terminal-escape sequences in
    STATUS, CHAT, etc.  We accept text but never raw control bytes.
    Strips C0 (0x00-0x1F, except optional tab / CR-LF), DEL (0x7F), C1
    (0x80-0x9F).  Caps at max_bytes after stripping (truncates cleanly
    on a UTF-8 boundary).
    """
    if isinstance(data, (bytes, bytearray)):
        s = bytes(data).decode('utf-8', errors='replace')
    elif isinstance(data, str):
        s = data
    else:
        return ''
    out = []
    for ch in s:
        cp = ord(ch)
        if cp < 0x20:
            if cp == 0x09 and allow_tab:
                out.append(ch)
            elif cp in (0x0A, 0x0D) and allow_newlines:
                out.append(ch)
            continue
        if cp == 0x7F or 0x80 <= cp <= 0x9F:
            continue
        out.append(ch)
    cleaned = ''.join(out)
    encoded = cleaned.encode('utf-8')
    if len(encoded) <= max_bytes:
        return cleaned
    truncated = encoded[:max_bytes]
    while truncated:
        try:
            return truncated.decode('utf-8')
        except UnicodeDecodeError:
            truncated = truncated[:-1]
    return ''

# ---------------------------------------------------------------------------
# Session ID word lists: adjective-noun-verb-noun
# Four categories, ~200 words each, curated for:
#   - phonetic distinctness within each category (no near-homophones,
#     no words that sound like a letter or digit)
#   - 1-3 syllables; internationally recognizable common English
#   - sentence-plausible order: e.g. "swift-fox-jumps-barrel"
# Entropy: 200^4 = 1.6e9 combinations
# Curation informed by EFF large wordlist methodology:
#   J. Bonneau, https://www.eff.org/deeplinks/2016/07/new-wordlists-random-passphrases
# ---------------------------------------------------------------------------

_WORDS_ADJ = """
aged amber arid arctic ashen azure balmy bare barren bitter bleak blunt
bold brave bright brisk brittle broad bronze brown buoyant calm carved
chalky charred clear cobalt coarse cold compact cool cozy crimson crisp
crooked curved damp dark deep dense dewy dim distant dry dull dusty
eager earthy emerald even faint fair fertile fierce fine firm flat foggy
fragile free fresh frozen full gentle gleaming glowing golden grand great
green grey grim hard heavy hidden high hollow huge icy jade jagged keen
kind large lean lone long loud loyal mild misty murky narrow neat new
noble notched old open orange oval pale parched patched pink plain
polished proud pure quick quiet rapid raw red rich rigid ringed rough
round rugged rust scarlet scoured seared sheer sharp short silent silver
slim slow small smooth soft solid solemn spare stark steep stern still
stout straight strange striped strong sudden swift tall tan thick thin
tidal tiny tough true vast velvet warm weathered white wide wild wise
worn young hazy rosy shadowed slanted tapered tipped toothed veiled washed
dappled lanky muddy sandy scorched speckled sunny tawny
"""

_WORDS_NOUN_SUBJECT = """
acorn ant ash badger bat bear beaver beetle birch bison boar briar
brook broom buck bull buzzard calf carp cat chestnut cliff clover cod
colt coot cormorant cove crab crane crest crow cub daisy deer dormouse
dove duck dune eagle eel elk elm falcon fawn fern finch fish
fly fox frog glen goat goose gorge grouse grove gull hare hawk
hazel heather heron herring hill hobby horse hound ibis ivy jackdaw jay
juniper kestrel kingfisher kite knot lamb larch lark laurel lion loch lynx
magpie mallard maple marsh meadow merlin mink mire mole moor moose moss
moth mouse mule nettle newt nightingale nightjar oak osprey otter owl ox
partridge peak pear perch peregrine pigeon pike pine pond puffin quail
rabbit ram raven reed robin rook rose rush sage salmon seal shark sheep
shrew skunk skylark slate snail snake snipe sparrow spider spruce squirrel
stag starling stoat stork stream swamp swan swift teal thicket thistle
thorn thrush toad trout tulip turtle vale viper vole wasp weasel whale
willow wolf woodcock woodpecker worm wren yak
"""

_WORDS_VERB = """
anchors bends binds blows bolts borders breaks bridges brews builds burns
calls carries carves catches chases circles clears climbs clips coats
coils collects cracks crawls crosses curves cuts digs dips dives drains
draws drives drifts drops edges extends fences fills finds fires fishes
flanks flips floats folds follows forms frees gains gathers glides grabs
grasps grips grows guards guides hangs holds hunts hurls joins jumps
keeps knots lands laps lashes layers leans leaps leads lifts links loads
locks loops marks meets mends moves nails nests notches opens packs pegs
picks pierces pins plants powers presses pulls pushes races raises reads
reaches reels rides rings rolls routes rounds rows runs scales scores
seeks sends sets shapes shelters shifts shields shores shows sifts
signals skips skirts slides slips slopes smooths snares sorts spans
speeds spills spins splices spots stacks stalks starts stays steers stirs
stops stores stretches streams strikes strips sweeps swings tackles takes
taps tears tests threads throws ties tightens tilts tips topples touches
tows traces tracks traps tugs uncurls unfolds vaults wades waits walks
warms watches weaves wedges winds wraps
"""

_WORDS_NOUN_OBJECT = """
anchor anvil arch arrow axle axe badge bag bale barn barrel basket batch
beam belt bench blade block bolt book boot bottle bough box brace branch
brick bridge bucket bundle cage candle cap cart case chain chest chisel
chord clamp clasp cleat cloak cloth coat coil collar cord crate crop
cross crust cube cup dart deck dish dock dome door dowel draft drum dune
fence field file flask flint floor flute forge fork frame funnel gate
gear globe grain grate grid groove gulf handle hatch hay helm hinge hive
hook horn hull jar joint joist keel keg key knot lamp lance latch
lattice ledge lever lid link lock log loop mast mesh mill mold mound net
oar pail peg pin pipe pivot plank plate plug pole post pot pulley rack
rail rake ramp ring rivet rod roll rope rung sail shaft shard shelf shell
shield sill slab slate sling slot spike spool spring spur stake step
strap strut stud strip tack timber tong tool track trap tray trench
trough trunk tube vat vault washer wedge weir well wheel wick wire yoke
"""

_WORDS_ADJ_LIST  = [w for w in _WORDS_ADJ.split()          if w]
_WORDS_NOUN_SUBJ = [w for w in _WORDS_NOUN_SUBJECT.split() if w]
_WORDS_VERB_LIST = [w for w in _WORDS_VERB.split()         if w]
_WORDS_NOUN_OBJ  = [w for w in _WORDS_NOUN_OBJECT.split()  if w]


# ---------------------------------------------------------------------------
# SSH stderr debug logger
# ---------------------------------------------------------------------------

class _SshStderrLogger:
    """Background thread: reads an SSH process stderr pipe, writes timestamped
    lines to an append-only log file, and buffers the last N lines so callers
    can retrieve the most recent error without blocking.

    Usage:
        logger = _SshStderrLogger(proc.stderr, '/tmp/linbit-tunl-abc123/ssh.log')
        ...
        last_err = logger.last_error()   # non-blocking
        logger.join()                    # wait for the thread to finish
    """

    _MAX_BUFFER = 30

    def __init__(self, stderr_pipe, log_path):
        self._buf  = []
        self._lock = threading.Lock()
        self._thread = threading.Thread(
            target=self._run, args=(stderr_pipe, log_path), daemon=True,
        )
        self._thread.start()

    def _run(self, pipe, log_path):
        try:
            with open(log_path, 'a') as f:
                for raw in pipe:
                    ts   = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
                    line = raw.decode('utf-8', errors='replace').rstrip('\n')
                    f.write(f'{ts} {line}\n')
                    f.flush()
                    with self._lock:
                        self._buf.append(line)
                        if len(self._buf) > self._MAX_BUFFER:
                            self._buf.pop(0)
        except Exception as e:
            trace('ssh_stderr_logger_error', err=repr(e))

    def last_error(self):
        """Return the last non-blank stderr line, or ''."""
        with self._lock:
            return next((l for l in reversed(self._buf) if l.strip()), '')

    def join(self, timeout=1.0):
        self._thread.join(timeout=timeout)


class _StderrTee:
    """Tee sys.stderr to an append-only log file (debug.log in the session dir).

    Use as a context manager around outer-process work so diagnostics survive
    even when the tmux session dies before the user can read them.
    """

    def __init__(self, log_path):
        self._orig = sys.stderr
        self._log = open(log_path, 'a')
        ts = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
        self._log.write(f'--- {ts} ---\n')
        self._log.flush()

    def __enter__(self):
        sys.stderr = self
        return self

    def __exit__(self, *exc):
        sys.stderr = self._orig
        self._log.close()

    def write(self, s):
        self._orig.write(s)
        self._log.write(s)
        self._log.flush()

    def flush(self):
        self._orig.flush()
        self._log.flush()

    def fileno(self):
        return self._orig.fileno()

    def isatty(self):
        return self._orig.isatty()


# ---------------------------------------------------------------------------
# Pure helpers (testable without SSH or filesystem)
# ---------------------------------------------------------------------------

def validate_session_id(sid):
    return bool(SESSION_ID_RE.match(str(sid)))


def generate_session_id():
    """Return a random adj-noun-verb-noun session ID (e.g. 'swift-fox-jumps-barrel')."""
    adj  = secrets.choice(_WORDS_ADJ_LIST)
    subj = secrets.choice(_WORDS_NOUN_SUBJ)
    verb = secrets.choice(_WORDS_VERB_LIST)
    obj  = secrets.choice(_WORDS_NOUN_OBJ)
    return f"{adj}-{subj}-{verb}-{obj}"


def pick_port():
    """Return a random port in the reverse-tunnel range."""
    return secrets.randbelow(TUNNEL_PORT_MAX - TUNNEL_PORT_MIN + 1) + TUNNEL_PORT_MIN


def _current_user() -> str:
    """Username of the effective user running this script.

    Uses the effective UID (getuid) so that under sudo the reported name
    matches the home directory where authorized_keys will be written.
    Falls back to $USER / $LOGNAME only if the passwd lookup fails.
    """
    try:
        return pwd.getpwuid(os.getuid()).pw_name
    except Exception:
        pass
    for var in ('USER', 'LOGNAME'):
        u = os.environ.get(var, '').strip()
        if u:
            return u
    return ''


def gather_context(session_id, session_port, case_num="", token=""):
    """Build the SSH 'command' string that relay-command will parse."""
    host = _fqdn()
    ips  = ",".join(_local_ips()) or "unknown"
    nets = ",".join(_local_networks())
    dns  = ",".join(_nameservers())
    ctx  = (
        f"{PROTOCOL_VERSION} id={session_id} host={host} ips={ips} "
        f"port={session_port} case={case_num} user={_current_user()}"
        f" nets={nets} dns={dns}"
    )
    if token:
        ctx += f" token={token}"
    return ctx


def parse_key_block(lines, session_id):
    """Extract SSH public keys from between the relay's greeting markers.

    Returns a list of key lines, or None if the end marker was never seen.
    """
    begin = f"# linbit-tunl-keys-begin {session_id}"
    end   = f"# linbit-tunl-keys-end {session_id}"
    keys  = []
    inside = False
    for line in lines:
        line = line.strip()
        if line == begin:
            inside = True
            continue
        if line == end:
            return keys
        if inside and KEY_LINE_RE.match(line):
            keys.append(line)
    return None  # end marker not seen


def _fqdn():
    try:
        return socket.getfqdn()
    except Exception:
        return socket.gethostname()


def _local_ips():
    try:
        result = subprocess.run(
            ['ip', '-4', '-o', 'addr', 'show', 'scope', 'global'],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            universal_newlines=True,
        )
        return re.findall(r'inet (\d+\.\d+\.\d+\.\d+)', result.stdout)
    except Exception:
        return []


def _local_networks():
    """Return directly-connected network CIDRs, e.g. ['10.0.1.0/24']."""
    try:
        result = subprocess.run(
            ['ip', '-4', '-o', 'addr', 'show', 'scope', 'global'],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
            universal_newlines=True,
        )
        seen = []
        for m in re.findall(r'inet (\d+\.\d+\.\d+\.\d+/\d+)', result.stdout):
            try:
                cidr = str(ipaddress.ip_interface(m).network)
                if cidr not in seen:
                    seen.append(cidr)
            except ValueError:
                pass
        return seen
    except Exception:
        return []


def _nameservers():
    """Return nameserver IPs from /etc/resolv.conf."""
    try:
        servers = []
        with open('/etc/resolv.conf') as fh:
            for line in fh:
                parts = line.split()
                if parts and parts[0] == 'nameserver' and len(parts) >= 2:
                    servers.append(parts[1])
        return servers
    except OSError:
        return []


# ---------------------------------------------------------------------------
# authorized_keys management
# ---------------------------------------------------------------------------

def authkeys_ensure(path):
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    if not p.exists():
        p.touch(mode=0o600)
    p.chmod(0o600)


def authkeys_remove_session(path, session_id):
    """Remove one session's marker block from the authorized_keys file."""
    p = Path(path)
    if not p.exists():
        return
    begin = f"# linbit-tunl-begin {session_id}"
    end   = f"# linbit-tunl-end {session_id}"
    lines = p.read_text().splitlines(True)
    out   = []
    skip  = False
    for line in lines:
        s = line.rstrip()
        if s == begin:
            skip = True
            continue
        if s == end:
            skip = False
            continue
        if not skip:
            out.append(line)
    p.write_text("".join(out))
    p.chmod(0o600)


def authkeys_remove_all(path):
    """Remove all linbit-tunl marker blocks (stale entries from prior sessions)."""
    p = Path(path)
    if not p.exists():
        return
    lines = p.read_text().splitlines(True)
    out   = []
    skip  = False
    for line in lines:
        s = line.rstrip()
        if re.match(r'^# linbit-tunl-begin ', s):
            skip = True
            continue
        if re.match(r'^# linbit-tunl-end ', s):
            skip = False
            continue
        if not skip:
            out.append(line)
    p.write_text("".join(out))
    p.chmod(0o600)


def authkeys_add_session(path, session_id, keys):
    """Append a marker block with the given keys to the authorized_keys file.

    `keys` must be a list of fully-formed authorized_keys lines that
    have already passed the strict customer-side parser
    (_extract_bare_key + _format_authkeys_line).  As a belt-and-suspenders
    measure we still re-validate here: any line that no longer round-trips
    through _extract_bare_key (e.g. someone passes raw, untrusted strings
    via this entry point in the future) is dropped.
    """
    authkeys_ensure(path)
    authkeys_remove_session(path, session_id)  # idempotent
    safe_lines = []
    for line in keys:
        if line is None:
            continue
        if '\n' in line or '\r' in line or '\0' in line:
            # Hard reject: a single bad line discards JUST that line so a
            # legitimate caller does not get its whole batch dropped.
            continue
        if _extract_bare_key(line) is None:
            continue
        safe_lines.append(line)
    block = (
        f"# linbit-tunl-begin {session_id}\n"
        + "".join(line + "\n" for line in safe_lines)
        + f"# linbit-tunl-end {session_id}\n"
    )
    with open(path, 'a') as fh:
        fh.write(block)


# ---------------------------------------------------------------------------
# Relay host key verification
# ---------------------------------------------------------------------------

def setup_known_hosts(relay_host, host_ca_pubkey, dest_dir=None):
    """Write a temp known_hosts file trusting the relay via its host CA.

    Uses @cert-authority so that any relay host cert signed by the CA is
    accepted -- host key rotation no longer requires a script update.
    """
    if dest_dir:
        path = os.path.join(dest_dir, 'known_hosts')
    else:
        fd, path = tempfile.mkstemp(prefix="linbit-tunl-knownhosts.")
        os.close(fd)
    Path(path).write_text(f"@cert-authority {relay_host} {host_ca_pubkey}\n")
    return path


# ---------------------------------------------------------------------------
# Quoting helpers for nested command string construction
# ---------------------------------------------------------------------------

def _sq(s):
    """Shell-quote a value for safe interpolation into a shell command string."""
    return shlex.quote(str(s))


def _tmux_sq(s):
    """Shell-quote and escape '#' for use inside a tmux run-shell argument.

    tmux expands #{...} in run-shell arguments before passing to sh -c.
    Doubling '#' prevents accidental format expansion.
    """
    return shlex.quote(str(s)).replace('#', '##')


def _tmux_fmt(s):
    """Escape a value for embedding in a tmux format/style string.

    Prevents #{...} and #[...] interpretation by doubling '#'.
    """
    return str(s).replace('#', '##')


# ---------------------------------------------------------------------------
# tmux helpers
# ---------------------------------------------------------------------------

def _session_hash(session_id):
    """Short deterministic hash of the session ID (privacy-preserving)."""
    return hashlib.sha256(session_id.encode()).hexdigest()[:12]


def _strict_mkdir_or_reuse(path):
    """mkdir(path, 0o700) or reuse an existing dir we own; refuse anything else.

    Closes a TOCTOU/symlink-attack window where another local user pre-creates
    a session directory under a guessed session-id and gets the customer's
    SSH ControlMaster socket, ephemeral private key, and SSH_ASKPASS file
    written into a directory the attacker can read (B1.4 in the
    2026-05-06 security audit; B1.8 follows transitively).

    Behaviour:
      - mkdir succeeds              -- chmod to 0o700 (defeat unusual umask)
      - path exists, is a dir we own -- tighten mode if needed, accept
      - path is a symlink            -- refuse
      - path is not a directory      -- refuse
      - path is owned by another uid -- refuse

    Refusal aborts the customer process with a friendly, actionable
    message and a non-zero exit; this is an "expected" failure (the
    audit-mandated security check fired) so we do not keep artifacts.
    """
    try:
        os.mkdir(path, 0o700)
        os.chmod(path, 0o700)
        return
    except FileExistsError:
        pass
    st = os.lstat(path)
    if stat.S_ISLNK(st.st_mode):
        raise SystemExit(
            f"{PREFIX} {path}: is a symlink; refusing to follow.  "
            f"Remove it and re-run.")
    if not stat.S_ISDIR(st.st_mode):
        raise SystemExit(
            f"{PREFIX} {path}: not a directory; refusing to use it.  "
            f"Remove it and re-run.")
    if st.st_uid != os.geteuid():
        raise SystemExit(
            f"{PREFIX} {path}: owned by uid {st.st_uid}, not "
            f"{os.geteuid()}.  Refusing to share a session directory "
            f"with another user.  Remove or rename it and re-run.")
    if (st.st_mode & 0o777) != 0o700:
        os.chmod(path, 0o700)


def _session_dir(session_id):
    """Per-session temp directory.  Created on first call, idempotent."""
    d = f'/tmp/linbit-tunl-{_session_hash(session_id)}'
    _strict_mkdir_or_reuse(d)
    return d


def _choose_cast_path():
    """Pick a persistent location for the session recording (.cast file).

    Tries /var/log/linbit-tunl/, ~/.local/share/linbit-tunl/, /tmp/ in order.
    Returns the path (parent directory created if needed).
    """
    ts = time.strftime('%Y%m%d-%H%M%S', time.gmtime())
    host = socket.gethostname().split('.')[0][:32]
    fname = f'linbit-tunl_{ts}_{host}.cast'
    candidates = [
        '/var/log/linbit-tunl',
        os.path.expanduser('~/.local/share/linbit-tunl'),
    ]
    for d in candidates:
        try:
            os.makedirs(d, mode=0o700, exist_ok=True)
            test = os.path.join(d, '.write-test')
            fd = os.open(test, os.O_CREAT | os.O_WRONLY, 0o600)
            os.close(fd)
            os.unlink(test)
            return os.path.join(d, fname)
        except OSError:
            continue
    return os.path.join('/tmp', fname)


def _tmux_local_name(session_id):
    """Derive a local tmux session name from the linbit session ID.

    Uses a short hash so the actual session ID is not visible in ps output
    to other users on the host.  The mapping is deterministic: the same
    session ID always produces the same local name, enabling automatic
    re-attach across linbit-tunl restarts.
    """
    return f'linbit-{_session_hash(session_id)}'


def _tmux_sock_path(session_id):
    """Customer-side tmux socket path inside the per-session directory."""
    return os.path.join(_session_dir(session_id), 'tmux.sock')


def _ctl_sock_path(session_id):
    """Deterministic SSH ControlMaster socket path inside the per-session dir."""
    return os.path.join(_session_dir(session_id), 'ssh-ctl.sock')


def _tmux_is_available():
    """Return True if tmux can be found and executed."""
    try:
        subprocess.run(['tmux', '-V'], stdout=subprocess.PIPE,
                       stderr=subprocess.PIPE, check=True)
        return True
    except (FileNotFoundError, subprocess.CalledProcessError):
        return False


# Minimum tmux version for share mode.  Share mode uses refresh-client -C
# (2.9+), window-level window-size option (3.1+), and "-f read-only"
# attach flags (2.6+).  The window-level window-size is the hardest
# requirement, hence 3.1.  Supported distros as of 2026: RHEL 9 (3.2a),
# Ubuntu 22.04+ (3.2a), Debian 12 (3.3, 3.5 via backports).
_TMUX_MIN_SHARE = (3, 1)


def _tmux_version():
    """Return tmux version as a (major, minor) tuple, or None if tmux is
    unavailable or its version string does not parse.  Suffix letters
    (e.g. '3.2a') are ignored for comparison purposes."""
    try:
        r = subprocess.run(['tmux', '-V'], stdout=subprocess.PIPE,
                           stderr=subprocess.PIPE, check=True, universal_newlines=True,
                           timeout=5)
    except (FileNotFoundError, subprocess.CalledProcessError,
            subprocess.TimeoutExpired):
        return None
    m = re.match(r'tmux\s+(\d+)\.(\d+)', r.stdout)
    if not m:
        return None
    return (int(m.group(1)), int(m.group(2)))


def _tmux_share_capable():
    """Return True iff tmux is available AND meets the share-mode floor."""
    if not _tmux_is_available():
        return False
    v = _tmux_version()
    return v is not None and v >= _TMUX_MIN_SHARE


def _tmux(session_id, *args, check=False):
    """Run a tmux command using the session's named socket."""
    sock = _tmux_sock_path(session_id)
    return subprocess.run(
        ['tmux', '-f', '/dev/null', '-S', sock, *args],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        check=check,
    )


def kill_tmux_session(session_id):
    name = _tmux_local_name(session_id)
    _tmux(session_id, 'kill-session', '-t', name)


# ---------------------------------------------------------------------------
# CA certificate support
# ---------------------------------------------------------------------------

def _generate_ephemeral_keypair(tmp_dir):
    """Generate a throwaway ed25519 keypair in tmp_dir.

    Returns (privkey_path, pubkey_str).  ssh-keygen creates two files:
    privkey_path (mode 600) and privkey_path + '.pub'.
    """
    key_path = os.path.join(tmp_dir, "tunl-ephem")
    subprocess.run(
        ['ssh-keygen', '-t', 'ed25519', '-N', '', '-f', key_path],
        stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True,
    )
    pubkey = Path(key_path + '.pub').read_text().strip()
    return key_path, pubkey


_API_TIMEOUT_DEFAULT = 2.0


def _api_timeout():
    """Timeout (seconds) for the cert-signing API call.

    Default 2s -- the call is fail-fast: if the relay's API is reachable
    it answers in milliseconds; if it isn't, we want to fall back to
    KI/PAM token auth quickly rather than block the (re-)connect.
    Override with TUNL_API_TIMEOUT (e.g. when relay sits behind a slow
    link or proxy that does need more headroom).
    """
    raw = os.environ.get('TUNL_API_TIMEOUT')
    if not raw:
        return _API_TIMEOUT_DEFAULT
    try:
        v = float(raw)
        return v if v > 0 else _API_TIMEOUT_DEFAULT
    except (TypeError, ValueError):
        return _API_TIMEOUT_DEFAULT


def _request_certificate(pubkey, session_id, token, relay_api, mode="tunnel"):
    """Request a CA-signed certificate from the relay.

    POSTs the ephemeral public key to /api/v1/sign.  Returns the certificate
    string on success (HTTP 201).  Raises urllib.error.HTTPError on 4xx/5xx,
    or OSError / urllib.error.URLError on network failure.

    mode='share' requests a cert with force-command=relay-share.py produce
    so it works for the tunl-share sshd user.
    """
    payload = json.dumps({
        "pubkey":     pubkey,
        "session_id": session_id,
        "token":      token,
        "mode":       mode,
    }).encode()
    req = urllib.request.Request(
        relay_api.rstrip('/') + '/api/v1/sign',
        data=payload,
        headers={"Content-Type": "application/json"},
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=_api_timeout()) as resp:
        body = json.loads(resp.read())
    return body["certificate"]


# ---------------------------------------------------------------------------
# SSH tunnel
# ---------------------------------------------------------------------------

def _stdout_reader(proc, out_queue):
    """Thread: read lines from proc.stdout and put to out_queue."""
    try:
        for raw_line in proc.stdout:
            out_queue.put(raw_line.decode(errors='replace').rstrip())
    finally:
        out_queue.put(None)  # EOF sentinel


def open_tunnel(session_id, context, known_hosts_path,
                relay_user, relay_host, relay_port,
                session_port, target_host, target_port,
                privkey_path=None, ki_env=None,
                key_timeout=15, status_queue=None):
    """Start SSH reverse tunnel.  Returns (proc, keys) or (None, []).

    Auth is either:
      privkey_path -- path to an ed25519 private key (CA cert auto-discovered
                      from adjacent privkey_path + '-cert.pub').
      ki_env       -- dict for keyboard-interactive PAM token auth; no IdentityFile.
                      Two variants:
                        {'SSHPASS': pw, '_SSHPASS_CMD': 'sshpass'} -- use sshpass(1)
                        {'SSH_ASKPASS': path, 'SSH_ASKPASS_REQUIRE': 'force'}
    """
    ssh_cmd = [
        'ssh',
        '-p', str(relay_port),
        '-o', 'StrictHostKeyChecking=yes',
        '-o', f'UserKnownHostsFile={known_hosts_path}',
        '-o', 'ServerAliveInterval=15',
        '-o', 'ServerAliveCountMax=4',
        '-o', 'ExitOnForwardFailure=yes',
        '-o', 'ConnectTimeout=30',
        '-R', f'{session_port}:{target_host}:{target_port}',
    ]
    if COMPRESS:
        ssh_cmd += ['-C']
    if privkey_path:
        ssh_cmd += [
            '-o', 'BatchMode=yes',
            '-o', f'IdentityFile={privkey_path}',
            '-o', 'IdentitiesOnly=yes',
        ]
    # keyboard-interactive: no IdentityFile/BatchMode; token supplied via ki_env.
    # Limit to one password prompt: retrying with the wrong session-id or token
    # never helps, and without this the client may spin through multiple rounds.
    if ki_env:
        ssh_cmd += ['-o', 'NumberOfPasswordPrompts=1']
    ssh_cmd += [f'{relay_user}@{relay_host}', context]

    env = dict(os.environ)
    preexec = None
    pass_fds = ()
    r_fd = None

    if ki_env and '_SSHPASS_CMD' in ki_env:
        # Feed password via inherited pipe fd so it never appears in env/argv.
        pw = ki_env.get('_KI_PASSWORD', '')
        r_fd, w_fd = os.pipe()
        os.write(w_fd, (pw + '\n').encode())
        os.close(w_fd)
        ssh_cmd = ['sshpass', '-d', str(r_fd)] + ssh_cmd
        pass_fds = (r_fd,)
    elif ki_env:
        env.update({k: v for k, v in ki_env.items() if not k.startswith('_')})
        # SSH_ASKPASS_REQUIRE=force only works without a controlling terminal.
        preexec = os.setsid

    proc = subprocess.Popen(
        ssh_cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        env=env,
        preexec_fn=preexec,
        pass_fds=pass_fds,
    )
    if r_fd is not None:
        os.close(r_fd)

    out_q = queue.Queue()
    t = threading.Thread(target=_stdout_reader, args=(proc, out_q), daemon=True)
    t.start()

    if status_queue is not None:
        threading.Thread(
            target=_stderr_reader, args=(proc, status_queue), daemon=True,
        ).start()

    # Collect lines until we see the end marker or time out.
    lines    = []
    deadline = time.time() + key_timeout
    while time.time() < deadline:
        try:
            line = out_q.get(timeout=0.3)
        except queue.Empty:
            if proc.poll() is not None:
                break
            continue
        if line is None:  # EOF
            break
        lines.append(line)
        if line.strip() == f"# linbit-tunl-keys-end {session_id}":
            keys = parse_key_block(lines, session_id) or []
            return proc, keys

    # Timeout or SSH died without sending the end marker.
    proc.terminate()
    try:
        proc.wait(timeout=3)
    except subprocess.TimeoutExpired:
        proc.kill()
    return None, []


# ---------------------------------------------------------------------------
# Status display
# ---------------------------------------------------------------------------

def _stderr_reader(proc, status_queue):
    """Thread: read lines from proc.stderr and route by content.

    Lines starting with '# linbit-tunl-status' are relay heartbeats: parsed
    into a field dict and put on status_queue.  All other lines (SSH warnings,
    errors) are forwarded to sys.stderr so the customer sees them.  A None
    sentinel is put on the queue when stderr reaches EOF.
    """
    PREFIX = '# linbit-tunl-status '
    try:
        for raw_line in proc.stderr:
            line = raw_line.decode(errors='replace').rstrip()
            if line.startswith(PREFIX):
                fields = {}
                for token in line[len(PREFIX):].split():
                    k, _, v = token.partition('=')
                    if k:
                        fields[k] = v
                status_queue.put(fields)
            else:
                print(line, file=sys.stderr)
    finally:
        status_queue.put(None)  # EOF sentinel


class StatusLine:
    """Overwrites a single terminal line in place."""

    def __init__(self, prefix):
        self._prefix          = prefix
        self._last_len        = 0
        self._started         = int(time.time())
        self._contact         = int(time.time())
        self._support_active  = False
        self._last_support    = None   # timestamp of last observed support conn

    def touch(self):
        self._contact = int(time.time())

    def set_support(self, active):
        """Call each tick with the current support-connection state."""
        if active:
            self._support_active = True
            self._last_support   = int(time.time())
        else:
            self._support_active = False

    def update(self, msg):
        now     = int(time.time())
        elapsed = now - self._contact
        if elapsed < 5:
            relay_age = "just now"
        elif elapsed < 60:
            relay_age = f"{elapsed}s ago"
        else:
            relay_age = f"{elapsed // 60}m {elapsed % 60:02d}s ago"

        if self._support_active:
            support_part = "SUPPORT CONNECTED  "
        elif self._last_support is not None:
            age = now - self._last_support
            if age < 60:
                support_part = f"Last support: {age}s ago  "
            else:
                support_part = f"Last support: {age // 60}m {age % 60:02d}s ago  "
        else:
            support_part = "Waiting for support...  "

        line = (f"{self._prefix} {msg:<14}  "
                f"{support_part}"
                f"Last relay contact: {relay_age}")
        padded = line.ljust(self._last_len)
        sys.stdout.write(f"\r{padded}")
        sys.stdout.flush()
        self._last_len = len(line)


# Share mode helpers
# ---------------------------------------------------------------------------

# Chat pane is 6 rows + 1 border row for the chat pane title + 1 border row
# stolen from the main pane by pane-border-status=top = 8 total rows of
# vertical real-estate consumed by the non-main panes.  The main pane's
# own inner rows are what consumers see as the shell content.
_SHARE_FIXED_OVERHEAD = 8


def _discover_share_panes(sock=None, session_name=None):
    """Find panes tagged with @linbit-role custom options.

    With sock + session_name: query a specific tmux server and session.
    Without: use $TMUX and default to the current session.

    Scans every window in the target session because since M5 the
    daemon's own pane is broken off to a hidden background window so
    only main + chat are visible.  The hidden pane carries the
    '@linbit-role=inner' tag so outer launchers can still find it.

    Returns {role: pane_id} dict.  Accepted roles are 'main', 'chat',
    'inner' (M5+), and 'status' (pre-M5, still tolerated so that
    sessions started by an older customer script continue to work).
    Returns None if no tagged panes are found.
    """
    cmd = ['tmux']
    if sock:
        cmd += ['-S', sock]
    # -s selects every pane in the session (not just the current window).
    if session_name:
        cmd += ['list-panes', '-s', '-t', session_name]
    else:
        cmd += ['list-panes', '-s']
    cmd += ['-F', '#{pane_id} #{@linbit-role}']
    r = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
                       universal_newlines=True)
    if r.returncode != 0:
        return None
    panes = {}
    for line in r.stdout.strip().splitlines():
        parts = line.split(None, 1)
        # 'status' is tolerated for backwards compatibility with panes
        # created before M5.  New sessions tag 'main', 'chat', 'inner'.
        if len(parts) == 2 and parts[1] in ('status', 'main', 'chat', 'inner'):
            panes[parts[1]] = parts[0]
    return panes if panes else None


def _share_fixed_overhead(main_pane_id):
    """Return the row overhead (chat pane + borders) from the current layout.

    The overhead is the delta between window height and the main pane's
    content height.  Queried live so zoom/unzoom transitions stay accurate.
    Falls back to the compile-time constant when tmux cannot be queried.
    """
    r = subprocess.run(
        ['tmux', 'display-message', '-t', main_pane_id,
         '-p', '#{pane_height} #{window_height}'],
        stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, universal_newlines=True,
    )
    parts = r.stdout.strip().split()
    if len(parts) == 2:
        try:
            main_h, window_h = int(parts[0]), int(parts[1])
        except ValueError:
            return _SHARE_FIXED_OVERHEAD
        return window_h - main_h
    return _SHARE_FIXED_OVERHEAD


def _share_write_to_pane(pane_id, text):
    """Write one line of text directly to a pane's TTY (output, not input).

    Appends a CR+LF after `text` so it scrolls into the pane naturally.
    Used for the legacy chat-pane writes done by TUNNEL_CONFIRMED.
    """
    try:
        r = subprocess.run(
            ['tmux', 'display-message', '-t', pane_id, '-p', '#{pane_tty}'],
            stdout=subprocess.PIPE, universal_newlines=True, timeout=2,
        )
        tty_path = r.stdout.strip()
        if tty_path:
            with open(tty_path, 'w') as f:
                f.write(text + '\r\n')
    except Exception:
        pass


def _share_forward_keys(pane_id, hex_str, tmux_sock=None):
    """Inject raw bytes into a pane via send-keys -l or -H.

    Uses send-keys -l (literal string) for printable UTF-8.  tmux's argv
    parser (cmd_parse_from_arguments) strips a trailing ';' from each token
    as a command separator; a trailing '\\;' is the documented escape.
    Mid-string ';' and all backslashes are passed through unchanged.
    Falls back to send-keys -H (raw hex, tmux 3.2+) for control characters,
    NUL, and non-UTF-8 byte sequences.
    """
    try:
        data = bytes.fromhex(hex_str)
    except ValueError:
        return
    if not data:
        return
    sock_opt = ['-S', tmux_sock] if tmux_sock else []
    try:
        if not any(b < 0x20 or b == 0x7f for b in data):
            try:
                text = data.decode('utf-8')
                # tmux's argv parser strips a trailing ';' as a command separator
                # (and drops the arg entirely if it was only ';').  A '\' before
                # the trailing ';' is the documented escape.  Mid-string ';' and
                # standalone '\' are left untouched by tmux's parser and need no
                # escaping.
                if text.endswith(';'):
                    text = text[:-1] + '\\;'
                subprocess.run(
                    ['tmux'] + sock_opt + ['send-keys', '-l', '-t', pane_id, '--', text],
                    stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2,
                )
                return
            except UnicodeDecodeError:
                pass
        subprocess.run(
            ['tmux'] + sock_opt + ['send-keys', '-H', '-t', pane_id] + [f'{b:02x}' for b in data],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, timeout=2,
        )
    except subprocess.TimeoutExpired:
        pass


def _encode_output(raw: bytes) -> str:
    """Encode raw bytes as tmux control-mode %output payload (octal-escape non-ASCII)."""
    result = []
    for b in raw:
        if 0x20 <= b <= 0x7E and b != ord('\\'):
            result.append(chr(b))
        elif b == ord('\\'):
            result.append('\\\\')
        else:
            result.append(f'\\{b:03o}')
    return ''.join(result)


_PAUSE_RE = re.compile(rb'^%pause (%\d+)(?:\s|$)')


def _parse_pause_pane(line: bytes):
    """Return the pane id from a `%pause %N` tmux notification, else None.

    With `pause-after=N` set on the control client, tmux emits
    `%pause %paneID` when it has been unable to deliver output for that
    pane for N seconds.  The client resumes by sending
    `refresh-client -A '%paneID:continue'` back to tmux.
    """
    m = _PAUSE_RE.match(line)
    if m is None:
        return None
    return m.group(1)


def _share_send_snapshot(session_name, tmux_sock, ssh_proc, panes, lock):
    """Capture current tmux state; inject as fake %%output back into SSH stdin."""
    main_pane = panes.get('main', '')
    target = main_pane or session_name
    sock_args = ['-S', tmux_sock] if tmux_sock else []
    r = subprocess.run(
        ['tmux'] + sock_args + ['capture-pane', '-t', target, '-p', '-e', '-a'],
        stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
    )
    alt_screen = (r.returncode == 0)
    r2 = subprocess.run(
        ['tmux'] + sock_args + ['capture-pane', '-t', target, '-p', '-e'],
        stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
    )
    if r2.returncode != 0:
        return
    raw_screen = r2.stdout

    r3 = subprocess.run(
        ['tmux'] + sock_args + [
            'display-message', '-t', target, '-p', '#{cursor_y} #{cursor_x}'
        ],
        stdout=subprocess.PIPE, universal_newlines=True,
    )
    row, col = 1, 1
    if r3.returncode == 0:
        parts = r3.stdout.strip().split()
        if len(parts) == 2:
            try:
                row, col = int(parts[0]) + 1, int(parts[1]) + 1
            except ValueError:
                pass

    # capture-pane -p uses LF (\n) as line separator; raw terminal mode
    # needs CR+LF.  Convert without doubling existing \r\n sequences.
    raw_screen = raw_screen.replace(b'\r\n', b'\n').replace(b'\n', b'\r\n')
    # capture-pane always appends a trailing \n; after CR+LF conversion that
    # becomes \r\n at the very end.  Emitting it at the last terminal row would
    # scroll the screen up one line before the cursor-position sequence fires.
    # Strip the trailing \r\n -- cursor_seq positions the cursor explicitly.
    if raw_screen.endswith(b'\r\n'):
        raw_screen = raw_screen[:-2]

    preamble = b'\x1b[!p\x1b[2J\x1b[H'
    if alt_screen:
        preamble += b'\x1b[?1049h'
    cursor_seq = f'\x1b[{row};{col}H'.encode('ascii')
    raw = preamble + raw_screen + cursor_seq
    pane_id = main_pane or '%0'
    line = (f'%output {pane_id} ' + _encode_output(raw) + '\n').encode('utf-8')
    # Wrap as a v4 DATA frame: snapshots are tmux control-mode bytes
    # (the broker's %output parser handles them like any other DATA).
    frame = _v4_data(line)
    try:
        with lock:
            ssh_proc.stdin.write(frame)
            ssh_proc.stdin.flush()
    except OSError:
        pass


_AUTH_ERROR_PHRASES = (
    'Permission denied',
    'Authentication failed',
    'Too many authentication failures',
)


def _is_auth_error(reason):
    """True if reason string indicates a credential rejection, not a transient network failure."""
    return any(p in reason for p in _AUTH_ERROR_PHRASES)


# ---------------------------------------------------------------------------
# ShareSession -- customer-side share session daemon
# ---------------------------------------------------------------------------

class _ConnState(enum.Enum):
    """Connection lifecycle states."""
    INIT = 'init'
    CONNECTING = 'connecting'
    CONNECTED = 'connected'
    RECONNECTING = 'reconnecting'
    DONE = 'done'

class _TunnelPerm(enum.Enum):
    """Tunnel permission states.

    BLOCKED:     restricted mode -- immutable, no transitions.
    CONFIRM:     default -- UPGRADE request shows confirmation popup (customer decides).
    ALLOWED:     pre-granted or tunnel active -- UPGRADE auto-accepts without popup.
    AUTO_REJECT: customer blocked requests -- UPGRADE gets immediate rejection.
    """
    BLOCKED     = 'blocked'
    CONFIRM     = 'confirm'
    ALLOWED     = 'allowed'
    AUTO_REJECT = 'auto_reject'

_TUNNEL_PERM_TRANSITIONS = {
    _TunnelPerm.CONFIRM:     frozenset({_TunnelPerm.ALLOWED, _TunnelPerm.AUTO_REJECT}),
    _TunnelPerm.ALLOWED:     frozenset({_TunnelPerm.CONFIRM, _TunnelPerm.AUTO_REJECT}),
    _TunnelPerm.AUTO_REJECT: frozenset({_TunnelPerm.CONFIRM}),
    _TunnelPerm.BLOCKED:     frozenset(),
}

_CONN_STATE_TRANSITIONS = {
    _ConnState.INIT:         frozenset({_ConnState.CONNECTING, _ConnState.DONE}),
    _ConnState.CONNECTING:   frozenset({_ConnState.CONNECTED, _ConnState.RECONNECTING, _ConnState.DONE}),
    _ConnState.CONNECTED:    frozenset({_ConnState.RECONNECTING, _ConnState.DONE}),
    _ConnState.RECONNECTING: frozenset({_ConnState.CONNECTING, _ConnState.DONE}),
    _ConnState.DONE:         frozenset(),
}


class ShareSession:
    """Customer-side share session daemon.

    Connects to the relay (tunl@relay) carrying the tmux control-mode stream
    over SSH stdio.  Handles SNAPSHOT/KEYS/RESIZE/STATUS/UPGRADE/SUPPORT_KEY/
    TUNNEL_CONFIRMED commands from the relay broker.

    Uses SSH ControlMaster so that a tunnel upgrade can add a -R port forward
    on the existing authenticated connection without reconnecting.
    """

    def __init__(self, session_id, case_num='', token='', user='',
                 panes=None, restricted=False, tunnel_port=None):
        self.session_id    = session_id
        self.case_num      = case_num
        self.token         = token
        self.user          = user or _current_user()
        self._ready        = False   # set True by main() when pre-connected via fork
        self._panes        = panes or {}
        self._tunnel_perm  = (_TunnelPerm.BLOCKED if restricted
                              else _TunnelPerm.ALLOWED if tunnel_port is not None
                              else _TunnelPerm.CONFIRM)
        self._brb          = False
        self._tunnel_port  = tunnel_port   # non-None triggers immediate upgrade
        self._conn_state   = _ConnState.INIT
        self.ssh_proc      = None
        self.tmux_proc     = None
        self.known_hosts_path = None
        self.privkey_path  = None
        self.ki_env        = None
        self._key_tmp_dir  = None
        self._ctl_socket   = _ctl_sock_path(session_id)
        self._pending_keys      = []
        self._tmux_name         = _tmux_local_name(session_id)
        self._tmux_sock         = _tmux_sock_path(session_id)
        self._connected_since   = None   # HH:MM:SS string when relay last connected
        self._last_disconnected = None   # HH:MM:SS string when relay last dropped
        self._session_dir       = _session_dir(session_id)
        self._ssh_log_path      = os.path.join(self._session_dir, 'ssh.log')
        # Option C: wire tap over the customer<->broker SSH stdio.  Returns
        # a disabled no-op unless TUNL_TRACE_WIRE / TUNL_TRACE_ALL is set.
        self._wire_tap          = open_wire_tap(
            os.path.join(self._session_dir, 'wire.broker.bin'))
        # Option D: parallel tmux control-mode tap (see main forward loop).
        self._tmux_tap_fh       = None   # opened lazily when tracing_enabled('tmux')
        self._stderr_logger     = None   # active _SshStderrLogger, if any
        self._status_is_stdout  = False  # True when daemon IS the status pane process
        self._write_line_ref    = [None] # mutable; updated each SSH connect for FIFO thread
        self._chat_fifo         = None   # path to outbound text FIFO, if created
        self._chat_in_fifo      = None   # path to inbound SCHAT FIFO, if created
        self._broker_info       = {}     # last SESSION_INFO payload from broker
                                         # (case, customer, viewers, ...);
                                         # merged into _emit_status_info so
                                         # /who in the chat pane has parity
                                         # with the support side
        self._chat_in_fh        = None   # non-blocking write handle, lazily opened
        self._chat_ring         = collections.deque(maxlen=500)  # replay on respawn
        self._ctl_fifo          = os.path.join(self._session_dir, 'ctl.fifo')
        self._fixed_overhead    = 0      # row overhead (status+chat+borders); set after pane creation
        self._last_resize_c     = 0      # last RESIZE columns from broker
        self._last_resize_r     = 0      # last RESIZE rows from broker
        self._attach_mode       = False  # True when running inside an existing tmux session
        self._debug             = False  # True with --debug; preserves session dir
        self._terminal_poller_started = False
        self._last_reported_terminal  = (0, 0)
        # Liveness tracking (see docs/share.md "Liveness").  Monotonic
        # timestamps; updated by every successful read from / write to
        # the broker on the active SSH connection.  Reset whenever a
        # new connect attempt begins.
        self._last_inbound_from_broker = 0.0
        self._last_outbound_to_broker  = 0.0
        self._heartbeat_stop_event     = None  # threading.Event for the per-connection thread
        # Resolve TUNL_PREFIX once.  _install_share_keybindings calls
        # _apply_tmux_prefix and overwrites _prefix_key with whatever
        # tmux ended up accepting; status bar / cheat-sheet rendering
        # picks up the live label via _prefix_label.
        self._prefix_key, self._prefix_label = _resolve_tmux_prefix()

    def _set_conn_state(self, new):
        old = self._conn_state
        if new not in _CONN_STATE_TRANSITIONS.get(old, frozenset()):
            print(f'{PREFIX} WARNING: invalid state transition {old.value} -> {new.value}',
                  file=sys.stderr)
        self._conn_state = new

    def _set_tunnel_perm(self, new):
        old = self._tunnel_perm
        if new not in _TUNNEL_PERM_TRANSITIONS.get(old, frozenset()):
            print(f'{PREFIX} WARNING: invalid tunnel perm transition {old.value} -> {new.value}',
                  file=sys.stderr)
        self._tunnel_perm = new

    def _handle_zoom_toggle(self):
        """Toggle zoom on the main pane and resize the window to match."""
        main_pane = self._panes.get('main')
        if not main_pane:
            return
        _tmux = ['tmux', '-S', self._tmux_sock] if self._tmux_sock else ['tmux']
        zr = subprocess.run(
            _tmux + ['display-message', '-t', self._tmux_name,
                     '-p', '#{window_zoomed_flag}'],
            stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, universal_newlines=True,
        )
        was_zoomed = zr.stdout.strip() == '1'
        subprocess.run(
            _tmux + ['resize-pane', '-t', main_pane, '-Z'],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )
        self._resize_after_zoom(was_zoomed, _tmux)

    def _resize_after_zoom(self, was_zoomed, _tmux=None):
        """Adjust window dimensions after a zoom state change."""
        if not self._last_resize_r:
            return
        if _tmux is None:
            _tmux = ['tmux', '-S', self._tmux_sock] if self._tmux_sock else ['tmux']
        c = self._last_resize_c or 80
        r = self._last_resize_r
        if was_zoomed:
            total_h = r + self._fixed_overhead
            subprocess.run(
                _tmux + ['resize-window', '-t', self._tmux_name,
                         '-x', str(c), '-y', str(total_h)],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            )
        else:
            subprocess.run(
                _tmux + ['resize-window', '-t', self._tmux_name,
                         '-x', str(c), '-y', str(r)],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            )

    @property
    def _mode_label(self):
        if self._tunnel_port:
            return 'TUNNEL'
        return {
            _TunnelPerm.BLOCKED:     'RESTRICTED',
            _TunnelPerm.CONFIRM:     'SHARE',
            _TunnelPerm.ALLOWED:     'SHARE',
            _TunnelPerm.AUTO_REJECT: 'SHARE',
        }[self._tunnel_perm]

    @property
    def _tunnel_sym(self):
        if self._tunnel_port:
            return f' {SYM_TUN}'
        return ''

    def _status_header(self):
        return (
            f'\x1b[1m[LINBIT {self._mode_label}]{self._tunnel_sym}  {self.session_id}'
            + (f'  Case: {self.case_num}' if self.case_num else '')
            + '\x1b[m'
        )

    @property
    def _mode_summary(self):
        p = self._prefix_label
        if self._tunnel_perm == _TunnelPerm.BLOCKED:
            return '\x1b[2mMode: restricted -- support can observe only\x1b[m'
        if self._tunnel_port:
            return (f'\x1b[2mMode: share+tunnel -- port {self._tunnel_port} active'
                    f' ({p} U to revoke)\x1b[m')
        if self._tunnel_perm == _TunnelPerm.ALLOWED:
            return f'\x1b[2mMode: share -- tunnel pre-granted ({p} U to revoke)\x1b[m'
        if self._tunnel_perm == _TunnelPerm.AUTO_REJECT:
            return f'\x1b[2mMode: share -- tunnel requests blocked ({p} u to unblock)\x1b[m'
        return f'\x1b[2mMode: share -- {p} u to pre-grant tunnel access\x1b[m'

    def _refresh_status_header(self):
        self._status_row(1, self._status_header())
        self._update_tmux_mode_labels()

    def _write_chat_in_fifo(self, data: bytes):
        """Write one line into chat.in.fifo for the curses chat pane.

        Lazily opens a persistent write handle (O_RDWR so the open does
        not block on a reader; the chat pane opens its side RDWR too).
        On write failure the handle is closed and reopened on the next
        call.  On reopen, the ring buffer (_chat_ring) is replayed so a
        respawned chat pane sees the full history.
        """
        if not self._chat_in_fifo:
            return
        if not hasattr(self, '_chat_ring'):
            self._chat_ring = collections.deque(maxlen=500)
        reopened = False
        if self._chat_in_fh is None:
            fd = None
            try:
                fd = os.open(self._chat_in_fifo, os.O_RDWR | os.O_NONBLOCK)
                # Switch to blocking writes once a peer exists; non-blocking
                # is only needed to return from open() when nobody's reading.
                import fcntl as _fcntl
                flags = _fcntl.fcntl(fd, _fcntl.F_GETFL)
                _fcntl.fcntl(fd, _fcntl.F_SETFL, flags & ~os.O_NONBLOCK)
                self._chat_in_fh = os.fdopen(fd, 'wb', buffering=0)
                fd = None   # ownership transferred to _chat_in_fh
                reopened = True
            except OSError as e:
                trace('chat_in_fifo_open_failed', err=str(e))
                if fd is not None:
                    try:
                        os.close(fd)
                    except OSError:
                        pass
                return
        if reopened and self._chat_ring:
            # Replay history so a respawned chat pane sees previous messages.
            try:
                for past in self._chat_ring:
                    self._chat_in_fh.write(past)
            except (OSError, BrokenPipeError) as e:
                trace('chat_in_fifo_replay_failed', err=str(e))
                try:
                    self._chat_in_fh.close()
                except OSError:
                    pass
                self._chat_in_fh = None
                return
        self._chat_ring.append(data)
        try:
            self._chat_in_fh.write(data)
        except (OSError, BrokenPipeError) as e:
            trace('chat_in_fifo_write_failed', err=str(e))
            try:
                self._chat_in_fh.close()
            except OSError:
                pass
            self._chat_in_fh = None

    _ANSI_RE = re.compile(r'\x1b\[[^a-zA-Z]*[a-zA-Z]')

    def _status_row(self, row, text):
        """Record one of three status-row strings and route it to the
        chat pane's pinned header (chat-ui-merge M4/M5).

        row in {1, 2, 3}: header, connection state, mode/ephemerals.

        As of M5 there is no dedicated in-window status pane; the chat
        pane's pinned rows carry this information.  The tunnel-only
        daemon path (no tmux) still falls back to stderr.
        """
        if self._status_is_stdout:
            sys.stdout.write(f'\r\x1b[{row};1H\x1b[2K{text}')
            sys.stdout.flush()
            return
        if not hasattr(self, '_status_cache'):
            self._status_cache = {1: '', 2: '', 3: ''}
        self._status_cache[row] = text
        # Chat pane's pinned header: route the status update as a
        # SESSION_INFO frame that the curses UI's session_info_handler
        # splits into left/center/right slots.
        if self._chat_in_fifo:
            try:
                self._emit_status_info()
            except Exception:
                pass
        if not self._panes:
            if sys.stderr.isatty():
                print(f'\r\x1b[1m[linbit-tunl]\x1b[m {text}', file=sys.stderr)
            else:
                plain = self._ANSI_RE.sub('', text).strip()
                if plain:
                    print(f'[linbit-tunl] {plain}', file=sys.stderr)

    def _emit_status_info(self):
        """Serialise the three status-row strings into a v4 SESSION_INFO
        CTRL frame and push it to chat.in.fifo.  ANSI escapes are
        stripped so the chat UI's pinned-row attribute (A_REVERSE) does
        not collide with embedded colour codes.

        Local fields (header / connection / mode / session_id) drive
        the chat pane's pinned row; broker-supplied fields (case,
        customer, customer_nick, viewers, viewer_names) are merged in
        so /who in the chat pane has parity with the support side.
        Local fields take precedence on overlap.
        """
        cache = self._status_cache
        # broker-supplied fields first; getattr keeps tests that build
        # ShareSession via __new__ without __init__ working.
        payload = dict(getattr(self, '_broker_info', None) or {})
        payload.update({
            'header':     self._ANSI_RE.sub('', cache.get(1, '')).strip(),
            'connection': self._ANSI_RE.sub('', cache.get(2, '')).strip(),
            'mode':       self._ANSI_RE.sub('', cache.get(3, '')).strip(),
            'session_id': self.session_id,
        })
        body = json.dumps(payload).encode('utf-8')
        frame = encode_message(TYPE_CTRL, encode_ctrl('SESSION_INFO', (body,)))
        self._write_chat_in_fifo(frame)

    def _emit_ephemeral(self, body: str, lifecycle: str = 'state'):
        """Overlay a transient message on the chat pane's top scrollback
        row.  lifecycle in {'state', 'keystroke', 'clear'}."""
        if not self._chat_in_fifo:
            return
        bin_args = ()
        if body:
            bin_args = (body.encode('utf-8'),)
        frame = encode_message(
            TYPE_CTRL, encode_ctrl(f'EPHEMERAL {lifecycle}', bin_args))
        self._write_chat_in_fifo(frame)

    def _poll_terminal_size_loop(self):
        """Poll `tmux list-clients` on our session for the customer's outer
        terminal size, send TERMINAL_SIZE WxH to the broker on change.

        The control-mode client (our SSH connection) is excluded because it
        has `ignore-size` flag.  If the customer has multiple real clients
        attached, smallest wins (so no client sees dotted borders).

        Resends after reconnect: on disconnect, write_line becomes None and
        the broker resets its state; we reset our local cache so the first
        post-reconnect poll re-sends the current size.
        """
        tmux_base = ['tmux', '-S', self._tmux_sock] if self._tmux_sock else ['tmux']
        while self._conn_state != _ConnState.DONE:
            time.sleep(2)
            wl = self._write_line_ref[0]
            if wl is None:
                # Disconnected.  Clear cache so reconnect re-sends current size.
                self._last_reported_terminal = (0, 0)
                continue
            try:
                r = subprocess.run(
                    tmux_base + ['list-clients',
                        '-F', '#{client_flags}|#{client_width}|#{client_height}'],
                    stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, timeout=2,
                )
            except (subprocess.SubprocessError, OSError):
                continue
            if r.returncode != 0:
                continue
            sizes = []
            for line in r.stdout.splitlines():
                parts = line.split('|')
                if len(parts) != 3:
                    continue
                flags, cols_s, rows_s = parts
                # control-mode (our SSH) and read-only sizing clients excluded.
                if 'control-mode' in flags or 'ignore-size' in flags:
                    continue
                try:
                    cols, rows = int(cols_s), int(rows_s)
                except ValueError:
                    continue
                if cols > 0 and rows > 0:
                    sizes.append((cols, rows))
            if not sizes:
                continue
            cur = (min(c for c, _ in sizes), min(r_ for _, r_ in sizes))
            if cur != self._last_reported_terminal:
                try:
                    wl(_v4_ctrl(f'TERMINAL_SIZE {cur[0]} {cur[1]}'))
                    self._last_reported_terminal = cur
                    trace('terminal_size_sent', cols=cur[0], rows=cur[1])
                except (OSError, AttributeError):
                    pass

    def _start_terminal_poller(self):
        if self._terminal_poller_started:
            return
        self._terminal_poller_started = True
        threading.Thread(
            target=self._poll_terminal_size_loop, daemon=True,
        ).start()

    def cleanup(self):
        if self._conn_state == _ConnState.DONE:
            return
        self._conn_state = _ConnState.DONE

        print(f"\nEnding share session {self.session_id}...",
              file=sys.stderr)

        if self.tmux_proc is not None:
            self.tmux_proc.terminate()
            try:
                self.tmux_proc.wait(timeout=2)
            except subprocess.TimeoutExpired:
                self.tmux_proc.kill()
            self.tmux_proc = None

        if self.ssh_proc is not None:
            self.ssh_proc.terminate()
            try:
                self.ssh_proc.wait(timeout=3)
            except subprocess.TimeoutExpired:
                self.ssh_proc.kill()
            self.ssh_proc = None

        authkeys_remove_session(AUTHKEYS_FILE, self.session_id)

        # NOTE: tmux session intentionally preserved -- customer's shell stays alive.

        try:
            os.unlink(self._ctl_socket)
        except OSError:
            pass

        if self._key_tmp_dir:
            shutil.rmtree(self._key_tmp_dir, ignore_errors=True)
            self._key_tmp_dir = None

        if self.known_hosts_path and os.path.exists(self.known_hosts_path):
            os.unlink(self.known_hosts_path)
            self.known_hosts_path = None

        if self._chat_fifo:
            try:
                os.unlink(self._chat_fifo)
            except OSError:
                pass
            self._chat_fifo = None
        if self._chat_in_fh is not None:
            try:
                self._chat_in_fh.close()
            except OSError:
                pass
            self._chat_in_fh = None
        if self._chat_in_fifo:
            try:
                os.unlink(self._chat_in_fifo)
            except OSError:
                pass
            self._chat_in_fifo = None
        if self._ctl_fifo:
            try:
                os.unlink(self._ctl_fifo)
            except OSError:
                pass

        # Update status pane to show disconnected state with reconnect hint.
        self._status_row(2, '\x1b[1;31m●\x1b[m relay: disconnected')
        self._status_row(
            3,
            f'\x1b[2mTo reconnect: '
            f'{os.path.basename(sys.argv[0])} {self.session_id}\x1b[m',
        )

        # Close the chat pane -- customer has no way to close it themselves
        # after the relay disconnects and the chat loop is still running.
        # The main pane (customer's shell) is intentionally left alive.
        chat_pane = self._panes.get('chat')
        if chat_pane:
            subprocess.run(
                ['tmux', 'kill-pane', '-t', chat_pane],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            )

        if self.ki_env:
            askpass = self.ki_env.get('SSH_ASKPASS')
            if askpass:
                try:
                    os.unlink(askpass)
                except OSError:
                    pass
            self.ki_env = None

        # Kill the tmux server so it does not linger after session cleanup.
        # In attach mode we do not own the server, so only kill our window.
        if not self._attach_mode and self._tmux_sock:
            subprocess.run(
                ['tmux', '-S', self._tmux_sock, 'kill-server'],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            )

    def _setup_key(self):
        """Set up SSH auth (cert preferred, KI/PAM fallback).

        Cert signing is fail-fast and never retries: if the API is not
        reachable, not speaking HTTP (e.g. an SSH listener on the same
        port), or rejects us, we drop straight to KI/PAM token auth.
        Retrying the cert path would only add latency to (re-)connect.
        """
        key_dir = os.path.join(self._session_dir, 'sharekey')
        _strict_mkdir_or_reuse(key_dir)
        reason = None
        try:
            key_path, pubkey = _generate_ephemeral_keypair(key_dir)
            cert = _request_certificate(
                pubkey=pubkey,
                session_id=self.session_id,
                token=self.token,
                relay_api=RELAY_API,
            )
        except urllib.error.HTTPError as e:
            reason = f"API returned HTTP {e.code}"
        except urllib.error.URLError as e:
            # Wraps connect failures, TLS handshake errors, socket
            # timeouts, name resolution -- everything urlopen normalises.
            reason = f"API not reachable ({e.reason})"
        except http.client.HTTPException as e:
            # Response that does not parse as HTTP, e.g. an SSH banner
            # coming back from sshd-on-443.  BadStatusLine lands here.
            reason = f"API not speaking HTTP ({e.__class__.__name__})"
        except (OSError, json.JSONDecodeError) as e:
            reason = f"API call failed ({e})"
        except Exception as e:
            # Unexpected (e.g. ssh-keygen subprocess failure during
            # ephemeral keypair generation).  Still fall back -- the
            # user wants to get connected.
            reason = f"cert signing failed ({e})"
        else:
            cert_path = str(key_path) + '-cert.pub'
            with open(cert_path, 'w') as f:
                f.write(cert + '\n')
            self.privkey_path = str(key_path)
            self._key_tmp_dir = key_dir
            return

        shutil.rmtree(key_dir, ignore_errors=True)
        print(f"{PREFIX} Cert signing skipped: {reason}; using token auth.",
              file=sys.stderr)

        # KI/PAM fallback: explicit --token if given, otherwise the session ID.
        # The relay's validate-token.py accepts both 6-digit tokens (consumed)
        # and 4-word session IDs (validated against pending/active registry, not
        # consumed -- so reconnects keep working).
        password = self.token if self.token else self.session_id
        if shutil.which('sshpass'):
            # sshpass feeds the password via an inherited pipe fd; the password
            # never appears in the environment or process list.
            self.ki_env = {'_KI_PASSWORD': password, '_SSHPASS_CMD': 'sshpass'}
        else:
            askpass_path = os.path.join(self._session_dir, 'askpass.sh')
            # O_EXCL|O_NOFOLLOW so a pre-existing file or symlink at this
            # path makes us fail closed rather than silently reusing
            # attacker-controlled storage (B1.7 in the 2026-05-06 audit).
            # Mode 0o700 is set at creation time; no separate chmod step.
            askpass_flags = (os.O_WRONLY | os.O_CREAT | os.O_EXCL
                             | os.O_NOFOLLOW)
            try:
                askpass_fd = os.open(askpass_path, askpass_flags, 0o700)
            except FileExistsError:
                # Stale askpass from a prior run inside our (already
                # vetted) session dir.  Unlink and retry once.
                os.unlink(askpass_path)
                askpass_fd = os.open(askpass_path, askpass_flags, 0o700)
            with os.fdopen(askpass_fd, 'w') as askpass:
                askpass.write(f'#!/bin/sh\necho {shlex.quote(password)}\n')
            self.ki_env = {
                'SSH_ASKPASS':         askpass_path,
                'SSH_ASKPASS_REQUIRE': 'force',
            }

    def _build_ssh_cmd(self, context):
        """Build the SSH command for the share connection (with ControlMaster)."""
        cmd = [
            'ssh',
            '-o', f'UserKnownHostsFile={self.known_hosts_path}',
            '-o', 'StrictHostKeyChecking=yes',
            '-o', 'ConnectTimeout=15',
            '-o', 'ServerAliveInterval=15',
            '-o', 'ServerAliveCountMax=4',
            '-o', 'ControlMaster=auto',
            '-S', self._ctl_socket,
            '-o', 'ControlPersist=30s',
            '-p', str(RELAY_PORT),
        ]
        if self.privkey_path:
            # Cert/key auth: BatchMode suppresses any unexpected prompts.
            cmd += [
                '-o', 'BatchMode=yes',
                '-o', f'IdentityFile={self.privkey_path}',
                '-o', 'IdentitiesOnly=yes',
            ]
        elif self.ki_env:
            # KI/PAM token auth (sshpass or SSH_ASKPASS): BatchMode would
            # disable keyboard-interactive entirely.  Token is fed externally.
            # NumberOfPasswordPrompts=1: retrying with a wrong session-id or
            # token never succeeds; one prompt is all we need.
            cmd += [
                '-o', 'IdentitiesOnly=yes',
                '-o', 'NumberOfPasswordPrompts=1',
            ]
        else:
            # No auth configured at all -- block any interactive prompts.
            cmd += ['-o', 'BatchMode=yes']
        if COMPRESS:
            cmd += ['-C']
        # Advanced tracing (Option E): prepend -vvv + -E ssh.main.vvv.log.
        # Safe to mix with -S / -o * above; placed before the host argument.
        cmd += ssh_debug_args('main', self._session_dir)
        cmd += [f'{RELAY_USER}@{RELAY_HOST}', context]
        return cmd

    def _run_upgrade(self, port, write_line_fn):
        """Add -R port forward on the existing ControlMaster connection.

        Avoids a new auth round -- uses ssh -O forward on the live socket.
        Reports result by injecting LINBIT_TUNNEL_PORT or LINBIT_TUNNEL_FAILED
        into the tmux control-mode stream (relay-share.py on the other end
        will forward it to the broker).
        """
        trace('upgrade_begin', port=port, target=f'{TARGET_HOST}:{TARGET_PORT}')
        # Pre-check: verify sshd is actually listening on the target port.
        keyscan = subprocess.run(
            ['ssh-keyscan', '-p', str(TARGET_PORT), '-T', '3', TARGET_HOST],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE,
        )
        if not keyscan.stdout.strip():
            trace('upgrade_no_sshd', port=TARGET_PORT, host=TARGET_HOST)
            write_line_fn(_v4_ctrl('LINBIT_TUNNEL_FAILED'))
            return
        r = subprocess.run(
            [
                'ssh', '-S', self._ctl_socket,
                *ssh_debug_args('upgrade', self._session_dir),
                '-O', 'forward',
                '-R', f'{port}:{TARGET_HOST}:{TARGET_PORT}',
                f'{RELAY_USER}@{RELAY_HOST}',
            ],
            stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=30,
        )
        if r.returncode == 0:
            self._tunnel_port = port
            trace('upgrade_ok', port=port)
            trace('frame_send', frame='LINBIT_TUNNEL_PORT', port=port)
            write_line_fn(_v4_ctrl(f'LINBIT_TUNNEL_PORT {port}'))
            # Forward host keys so the support side can pre-populate known_hosts.
            keys_blob = keyscan.stdout.decode('utf-8', 'replace').strip().encode('utf-8')
            write_line_fn(_v4_ctrl('LINBIT_TUNNEL_HOSTKEYS', keys_blob))
            self._refresh_status_header()
            subprocess.run(
                ['tmux', 'display-message',
                 f'{SYM_TUN} Tunnel active (port {port})'],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            )
        else:
            trace('upgrade_failed', rc=r.returncode,
                  stderr=r.stderr.decode('utf-8', 'replace').strip()[:200])
            trace('frame_send', frame='LINBIT_TUNNEL_FAILED')
            write_line_fn(_v4_ctrl('LINBIT_TUNNEL_FAILED'))

    def _do_grant_tunnel(self):
        """Grant or unblock tunnel access mid-session (customer pressed Ctrl-b u).

        AUTO_REJECT -> CONFIRM: unblocks tunnel requests (customer must still approve).
        CONFIRM -> ALLOWED: pre-grants tunnel access (next UPGRADE auto-accepts).
        """
        if self._tunnel_perm == _TunnelPerm.AUTO_REJECT:
            self._set_tunnel_perm(_TunnelPerm.CONFIRM)
            wl = self._write_line_ref[0]
            if wl:
                wl(_v4_ctrl('LINBIT_MODE_ALLOW_TUNNEL'))
            self._refresh_status_header()
            self._status_row(3, '\x1b[32m[tunl] Tunnel requests unblocked.\x1b[m')
            subprocess.run(['tmux', 'display-message', 'Tunnel requests unblocked'],
                           stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        elif self._tunnel_perm == _TunnelPerm.CONFIRM:
            self._set_tunnel_perm(_TunnelPerm.ALLOWED)
            wl = self._write_line_ref[0]
            if wl:
                wl(_v4_ctrl('LINBIT_MODE_ALLOW_TUNNEL'))
            self._refresh_status_header()
            self._status_row(3,
                '\x1b[32m[tunl] Tunnel access pre-granted.'
                ' Next support request will open tunnel automatically.\x1b[m')
            subprocess.run(['tmux', 'display-message', 'Tunnel pre-granted'],
                           stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    def _do_revoke_tunnel(self):
        """Revoke tunnel access mid-session (customer pressed Ctrl-b U).

        Transitions tunnel permission ALLOWED -> CONFIRM, kills the
        ControlMaster so the reconnect loop re-establishes without -R,
        removes support keys.

        The master IS the share carrier: there is exactly one ssh
        process (self.ssh_proc) carrying both the share-command stdio
        and every accepted reverse-forwarded TCP as multiplexed
        channels.  Killing the master via `-O exit` therefore tears
        down the share session along with every forwarded channel --
        the reconnect loop is responsible for re-establishing the
        share without `-R`.  (An older comment here claimed forwarded
        sessions survive the master exit; that was wrong on SSH
        semantics.)
        """
        if self._tunnel_perm not in (_TunnelPerm.ALLOWED, _TunnelPerm.AUTO_REJECT):
            return
        # Send TUNNEL_PORT 0 before killing the connection so the broker
        # notifies consumers that the tunnel is closed.
        wl = self._write_line_ref[0]
        if wl and self._tunnel_port:
            wl(_v4_ctrl('LINBIT_TUNNEL_PORT 0'))
        self._set_tunnel_perm(_TunnelPerm.CONFIRM)
        if self._tunnel_port:
            # -v -E <log>: always capture ssh's own debug output so a
            # silent failure of `-O exit` is diagnosable after the
            # fact.  Capturing stderr via PIPE in addition because -E
            # only catches ssh-level messages, not pre-handshake
            # errors (control socket missing, permission denied, etc.).
            exit_log = os.path.join(self._session_dir, 'ssh.exit.log')
            r = subprocess.run(
                ['ssh', '-v', '-E', exit_log,
                 '-S', self._ctl_socket, '-O', 'exit',
                 f'{RELAY_USER}@{RELAY_HOST}'],
                stdout=subprocess.PIPE, stderr=subprocess.PIPE,
                universal_newlines=True, timeout=10,
            )
            trace('revoke_ssh_exit', rc=r.returncode,
                  stderr=(r.stderr or '').strip()[:300],
                  log=exit_log)
            self._tunnel_port = None
        authkeys_remove_session(AUTHKEYS_FILE, self.session_id)
        # No wl() for LINBIT_MODE_REVOKE_TUNNEL here -- the connection is
        # likely dead after -O exit.  Reconnect will re-establish clean.
        self._refresh_status_header()
        self._status_row(3,
            '\x1b[33m[tunl] Tunnel revoked. Reconnecting to relay without tunnel.\x1b[m')
        subprocess.run(['tmux', 'display-message', 'Tunnel revoked'],
                       stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    def _do_block_tunnel(self):
        """Revoke tunnel and block all further upgrade requests (AUTO_REJECT state)."""
        if self._tunnel_perm == _TunnelPerm.BLOCKED:
            return
        # If tunnel is active, revoke it first (sends TUNNEL_PORT 0, kills ControlMaster).
        if self._tunnel_port:
            self._do_revoke_tunnel()
        self._set_tunnel_perm(_TunnelPerm.AUTO_REJECT)
        wl = self._write_line_ref[0]
        if wl:
            wl(_v4_ctrl('LINBIT_MODE_REVOKE_TUNNEL'))
        self._refresh_status_header()
        self._status_row(3,
            f'\x1b[31m[tunl] Tunnel requests blocked.'
            f' Press {self._prefix_label} u to unblock.\x1b[m')
        subprocess.run(['tmux', 'display-message', 'Tunnel requests blocked'],
                       stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    def _do_brb_on(self):
        """Freeze key input from support (BRB mode). Enforce on customer side."""
        if self._tunnel_port:
            self._status_row(3,
                '\x1b[33m[tunl] BRB not available while tunnel is active.'
                ' Revoke tunnel first.\x1b[m')
            return
        if self._brb:
            return
        self._brb = True
        wl = self._write_line_ref[0]
        if wl:
            wl(_v4_ctrl('LINBIT_BRB'))
        self._refresh_status_header()
        subprocess.run(['tmux', 'display-message', 'BRB: support input frozen'],
                       stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    def _do_brb_off(self):
        """Restore key input from support."""
        if not self._brb:
            return
        self._brb = False
        wl = self._write_line_ref[0]
        if wl:
            wl(_v4_ctrl('LINBIT_BRB_END'))
        self._refresh_status_header()
        subprocess.run(['tmux', 'display-message', 'Back: support input restored'],
                       stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    def _update_tmux_mode_labels(self):
        """Refresh the tmux status bar after a mid-session mode change."""
        name = self._tmux_name
        if not name:
            return
        status_left = (
            f'[LINBIT {self._mode_label}]{self._tunnel_sym}  {self.session_id}'
            f'  {self._prefix_label}: Enter=menu c=chat u=tunnel Q=end session'
        )
        subprocess.run(
            ['tmux', 'set-option', '-t', name, 'status-left', status_left],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )

    def _do_prefix_changed(self, key):
        """Refresh user-visible labels after the set-prefix.py helper
        applied a new prefix to tmux.

        The cheat-sheet popup file was already re-rendered by the
        helper.  Status bar text + pane-border labels are still set to
        the old key (one-shot tmux options), and the mode-summary row
        was rendered into the chat pane's pinned header.  Update
        self._prefix_label, refresh the status header (which redraws
        row 3 via the chat fifo), and reapply pane-border-format /
        status-left so all customer-visible chrome reflects the new
        key without requiring a reconnect.
        """
        self._prefix_key   = key
        self._prefix_label = _tmux_key_display(key)
        self._refresh_status_header()
        self._update_tmux_mode_labels()
        main_id = (self._panes or {}).get('main')
        if main_id:
            _apply_pane_border_labels(self, main_id)

    def _run_cmd_forwarder(self, ssh_proc, sock_lock, write_line=None):
        """Read broker commands from ssh_proc.stdout and execute them.

        Reads v4 frames; the broker sends CTRL frames whose verb-line is
        one of: SNAPSHOT, KEYS <role>, RESIZE C R [override], STATUS,
        UPGRADE N, SUPPORT_KEY, TUNNEL_CONFIRMED, CHAT, SCHAT, CCHAT,
        SNOTE, ZOOM, RESPAWN main, TERMINATE.  KEYS to the main pane is
        suppressed in restricted mode and during BRB; UPGRADE is gated
        by _TunnelPerm.  The broker enforces these too -- this check is
        defense-in-depth.

        Runs in a dedicated thread; terminates when ssh_proc.stdout
        closes.  write_line: callable(bytes) that writes raw v4 frame
        bytes to ssh_proc.stdin under sock_lock.  If not provided, a
        default implementation using sock_lock is used.
        """
        if write_line is None:
            def write_line(data):
                try:
                    with sock_lock:
                        ssh_proc.stdin.write(data)
                        ssh_proc.stdin.flush()
                        self._wire_tap.write('>', data)
                except OSError:
                    pass

        # Tap-aware FrameStream.  Use a small loop around proc.stdout
        # reads; ssh_proc.stdout is line-buffered binary, but the
        # underlying read1 chunks fine for FrameStream.
        def _read_some(n):
            try:
                chunk = ssh_proc.stdout.read1(n) if hasattr(
                    ssh_proc.stdout, 'read1') else ssh_proc.stdout.read(n)
            except (OSError, ValueError):
                return b''
            if chunk:
                self._wire_tap.write('<', chunk)
                self._last_inbound_from_broker = time.monotonic()
            return chunk if chunk else b''

        stream = FrameStream(_read_some)

        # HELLO handshake -- we send ours, broker sends back.
        try:
            write_line(_v4_hello())
            type_code, frag, payload = stream.read_frame()
        except (EOFError, WireProtocolError, OSError) as e:
            trace('producer_hello_fail', err=str(e))
            return
        if type_code == TYPE_ERROR:
            reason = payload.decode('utf-8', errors='replace')
            trace('producer_hello_relay_error', reason=reason)
            self._status_row(3,
                f'\x1b[1;31m[tunl] relay rejected handshake: {reason}\x1b[m')
            return
        if type_code != TYPE_HELLO or frag != 0:
            trace('producer_hello_unexpected', type_code=type_code, frag=frag)
            return
        if payload != WIRE_VERSION:
            trace('producer_hello_mismatch',
                  expected=WIRE_VERSION.decode(),
                  got=repr(payload))
            return

        while True:
            try:
                type_code, payload = stream.read_message()
            except (EOFError, WireProtocolError):
                break
            if type_code == TYPE_HELLO:
                continue   # repeat HELLO post-handshake -- ignore
            if type_code == TYPE_ERROR:
                reason = payload.decode('utf-8', errors='replace')
                trace('producer_relay_error_frame', reason=reason)
                break
            if type_code == TYPE_DATA:
                # Broker doesn't send DATA frames to producer; ignore.
                continue
            if type_code != TYPE_CTRL:
                continue
            try:
                line, args = decode_ctrl(payload)
            except WireProtocolError:
                trace('frame_parse_fail', frame='CTRL', dir='from_broker')
                continue
            parts = line.split()
            if not parts:
                continue
            verb = parts[0]

            if verb == 'HEARTBEAT':
                # Idle keepalive from the broker; last_inbound is already
                # updated by _read_some.
                continue

            if verb == 'SNAPSHOT':
                _share_send_snapshot(
                    self._tmux_name, self._tmux_sock,
                    ssh_proc, self._panes, sock_lock,
                )
            elif verb == 'SESSION_INFO':
                # Broker-supplied session metadata (case, customer,
                # customer_nick, viewers, viewer_names, ...).  Cache
                # the parsed payload; _emit_status_info merges it into
                # the chat-pane SESSION_INFO so /who has the same view
                # the support side gets.
                if not args:
                    continue
                try:
                    self._broker_info = json.loads(
                        args[0].decode('utf-8', errors='replace'))
                except (ValueError, UnicodeDecodeError):
                    self._broker_info = {}
                    continue
                if self._chat_in_fifo:
                    try:
                        self._emit_status_info()
                    except Exception:
                        pass
            elif verb in ('SCHAT', 'CCHAT', 'SNOTE'):
                # Backlog replay + live SNOTE fan-out arrive verbatim
                # as v4 CTRL frames.  Forward the encoded bytes
                # straight to chat.in.fifo so the chat pane's
                # FrameStream consumes them uniformly.
                in_fifo = getattr(self, '_chat_in_fifo', None)
                if in_fifo:
                    self._write_chat_in_fifo(
                        encode_message(TYPE_CTRL, payload))
            elif verb == 'CHAT':
                # 2 binary args: name, msg.
                # B2.5 / B4.5: the relay forwards consumer chat verbatim;
                # in the no-tmux fallback below we print() to a real
                # terminal, which would execute embedded escapes.  Strip
                # control bytes here regardless of which downstream path
                # is taken.
                if len(args) < 2:
                    continue
                name_b, msg_b = args[0], args[1]
                if not msg_b:
                    continue
                clean_name = _sanitize_text(name_b, max_bytes=80) or 'support'
                clean_msg  = _sanitize_text(msg_b,  max_bytes=4096)
                if not clean_msg:
                    continue
                ts = time.strftime('%H:%M', time.gmtime())
                in_fifo = getattr(self, '_chat_in_fifo', None)
                if in_fifo:
                    # Render as SCHAT in the chat pane (the chat UI
                    # treats SCHAT as "support says ..." which is what
                    # the broker means by CHAT to producer).
                    frame = encode_message(
                        TYPE_CTRL,
                        encode_ctrl(
                            f'SCHAT {ts}',
                            (clean_name.encode('utf-8'),
                             clean_msg.encode('utf-8'))))
                    self._write_chat_in_fifo(frame)
                elif not self._panes:
                    if sys.stdout.isatty():
                        print(f'\x1b[1;36m[{ts}][{clean_name}]\x1b[m '
                              f'{clean_msg}', flush=True)
                    else:
                        print(f'[{ts}][{clean_name}] {clean_msg}',
                              flush=True)
            elif (verb == 'KEYS'
                  and self._tunnel_perm != _TunnelPerm.BLOCKED
                  and not self._brb):
                # 'KEYS <role>' + 1 binary arg = key bytes.
                if len(parts) != 2 or not args:
                    continue
                role = parts[1]
                pane_id = self._panes.get(role)
                if pane_id:
                    _share_forward_keys(pane_id, args[0].hex(),
                                        tmux_sock=self._tmux_sock)
            elif verb == 'RESIZE':
                # 'RESIZE C R [override]'
                if len(parts) < 3:
                    continue
                try:
                    c, r = int(parts[1]), int(parts[2])
                except ValueError:
                    continue
                # B2.4 (2026-05-06 audit): clamp dimensions before
                # feeding them to refresh-client / resize-window.  A
                # malicious or buggy relay should not be able to pin
                # the customer to a 1x1 viewport or send absurdly
                # large values that have caused tmux crashes upstream.
                # 1..10000 covers any realistic terminal.
                if not (1 <= c <= 10000 and 1 <= r <= 10000):
                    continue
                attrs = set(parts[3:])
                self._last_resize_c = c
                self._last_resize_r = r
                _tmux = ['tmux', '-S', self._tmux_sock]
                zr = subprocess.run(
                    _tmux + ['display-message', '-t', self._tmux_name,
                             '-p', '#{window_zoomed_flag}'],
                    stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
                    universal_newlines=True,
                )
                zoomed = zr.stdout.strip() == '1'
                total_h = r if zoomed else r + self._fixed_overhead
                if 'override' in attrs:
                    # Override: pin window unconditionally.  Flip to
                    # window-size=manual so resize-window sticks even
                    # if it exceeds the interactive client.
                    subprocess.run(
                        _tmux + ['set-option', '-w', '-t', self._tmux_name,
                                 'window-size', 'manual'],
                        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                    )
                    subprocess.run(
                        _tmux + ['resize-window', '-t', self._tmux_name,
                                 '-x', str(c), '-y', str(total_h)],
                        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                    )
                else:
                    # Normal: advertise dims via refresh-client so
                    # tmux's smallest-wins picks min(producer,
                    # interactive).  Also restore window-size=smallest
                    # in case a previous override flipped to manual.
                    subprocess.run(
                        _tmux + ['set-option', '-w', '-t', self._tmux_name,
                                 'window-size', 'smallest'],
                        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                    )
                    try:
                        self.tmux_proc.stdin.write(
                            f'refresh-client -C {c}x{total_h}\n'
                            .encode('ascii')
                        )
                        self.tmux_proc.stdin.flush()
                    except (OSError, BrokenPipeError, AttributeError):
                        pass
            elif verb == 'ZOOM':
                self._handle_zoom_toggle()
            elif verb == 'STATUS':
                # 1 binary arg: text.
                # B2.5: relay-supplied STATUS is rendered in our own
                # status pane.  Strip control bytes so a hostile relay
                # cannot escape-inject the customer's terminal.
                msg = _sanitize_text(args[0] if args else b'', max_bytes=512)
                self._emit_ephemeral(msg, lifecycle='keystroke')
            elif verb == 'UPGRADE':
                # 'UPGRADE N'
                if len(parts) != 2:
                    continue
                try:
                    port = int(parts[1])
                except ValueError:
                    continue
                # B2.3 (2026-05-06 audit): defence-in-depth -- the
                # broker has already range-checked the port and the
                # relay sshd's permitlisten policy enforces the same
                # range, but the customer should not rely on either.
                # Reject out-of-range values here so a misbehaving or
                # compromised relay cannot drive ssh -O forward into
                # an attacker-chosen port.
                if not (TUNNEL_PORT_MIN <= port <= TUNNEL_PORT_MAX):
                    write_line(_v4_ctrl(
                        'LINBIT_TUNNEL_REJECTED',
                        f'requested tunnel port {port} outside allowed '
                        f'range [{TUNNEL_PORT_MIN},{TUNNEL_PORT_MAX}]'
                        .encode('ascii')))
                    continue
                if self._tunnel_perm == _TunnelPerm.BLOCKED:
                    write_line(_v4_ctrl(
                        'LINBIT_TUNNEL_REJECTED',
                        b'tunnel upgrade not permitted: session is restricted'))
                    self._status_row(3,
                        '\x1b[33m[tunl] Support requested tunnel access'
                        ' -- rejected (restricted mode).\x1b[m')
                elif self._tunnel_perm == _TunnelPerm.AUTO_REJECT:
                    write_line(_v4_ctrl(
                        'LINBIT_TUNNEL_REJECTED',
                        b'tunnel upgrade not permitted: '
                        b'customer has blocked tunnel requests'))
                    self._status_row(3,
                        f'\x1b[31m[tunl] Support requested tunnel'
                        f' -- blocked (press {self._prefix_label} u to unblock).\x1b[m')
                elif self._tunnel_perm == _TunnelPerm.CONFIRM:
                    _menu_fifo = _tmux_sq(self._ctl_fifo)
                    subprocess.run([
                        'tmux', 'display-menu',
                        '-T', '#[bold]Support requests tunnel access#[nobold]',
                        'Grant tunnel access', 'g',
                        f'run-shell "printf CONFIRM_UPGRADE_{port}\\\\n >> {_menu_fifo}"',
                        'Deny', 'd',
                        f'run-shell "printf DENY_UPGRADE\\\\n >> {_menu_fifo}"',
                    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
                else:  # ALLOWED
                    t = threading.Thread(
                        target=self._run_upgrade,
                        args=(port, write_line),
                        daemon=True,
                    )
                    t.start()
            elif verb == 'SUPPORT_KEY':
                # The relay-supplied line is hostile-by-construction: a
                # compromised relay could embed `\n` to inject extra lines
                # into authorized_keys, or option strings (command=, from=*,
                # etc.) to widen the key's scope.  _extract_bare_key drops
                # all options and rejects newline-injection.  We rebuild
                # the installable line ourselves with our own from= clause.
                raw = (args[0].decode('utf-8', errors='replace')
                       if args else '')
                bare = _extract_bare_key(raw)
                if bare is None:
                    trace('support_key_rejected', reason='malformed',
                          raw_len=len(raw))
                else:
                    self._pending_keys.append(bare)
            elif verb == 'RESPAWN':
                if len(parts) >= 2 and parts[1] == 'main':
                    pane_id = self._panes.get('main')
                    if pane_id:
                        subprocess.run(
                            ['tmux', '-S', self._tmux_sock,
                             'respawn-pane', '-t', pane_id],
                            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                        )
                        self._status_row(3, self._mode_summary)
            elif verb == 'TERMINATE':
                self._status_row(2,
                    '\x1b[1;31m[tunl] Session closed by support.\x1b[m')
                if self._attach_mode:
                    # M5: the inner runs in a hidden background window
                    # (break-pane -d); kill both the main-pane window
                    # and our own hidden window so nothing lingers.
                    main_pane = self._panes.get('main')
                    if main_pane:
                        subprocess.run(
                            ['tmux', 'kill-window', '-t', main_pane],
                            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                        )
                    subprocess.run(
                        ['tmux', 'kill-window'],
                        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                    )
                else:
                    subprocess.run(
                        ['tmux', '-S', self._tmux_sock, 'kill-server'],
                        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
                    )
                break
            elif verb == 'TUNNEL_CONFIRMED':
                if self._pending_keys:
                    chat_pane = self._panes.get('chat')
                    install_lines = [
                        _format_authkeys_line(k) for k in self._pending_keys
                    ]
                    for bare in self._pending_keys:
                        if chat_pane:
                            # Show a fingerprint, never the raw key bytes.
                            # The relay controls the key contents so a chat
                            # pane that interpolated them verbatim would be
                            # a terminal-escape injection vector.
                            label = _key_fingerprint_label(bare)
                            _share_write_to_pane(
                                chat_pane,
                                f'[tunl] temporarily registering SSH key: {label}',
                            )
                    try:
                        authkeys_add_session(
                            AUTHKEYS_FILE, self.session_id, install_lines
                        )
                    except OSError as e:
                        msg = f'[tunl] ERROR: cannot update {AUTHKEYS_FILE}: {e}'
                        self._status_row(3, f'\x1b[1;31m{msg}\x1b[m')
                        if chat_pane:
                            _share_write_to_pane(chat_pane, msg)
                    self._pending_keys = []
            # Everything else: dropped (trust enforcement).

    def _connect_ssh(self):
        """Start SSH to the relay with ControlMaster.

        Sets self.ssh_proc on success.  Returns (True, '') or (False, reason).
        """
        mode = {_TunnelPerm.BLOCKED:     "restricted",
                _TunnelPerm.CONFIRM:     "share",
                _TunnelPerm.ALLOWED:     "share",
                _TunnelPerm.AUTO_REJECT: "share"}[self._tunnel_perm]
        nets = ",".join(_local_networks())
        dns  = ",".join(_nameservers())
        context = (
            f'{PROTOCOL_VERSION} id={self.session_id}'
            f' host={_fqdn()}'
            f' ips={",".join(_local_ips())}'
            f' mode={mode}'
            f' user={self.user}'
            f' nets={nets}'
            f' dns={dns}'
        )
        if self.case_num:
            context += f' case={self.case_num}'
        if self.token:
            context += f' token={self.token}'

        # Remove stale socket from a previous (dead) ControlMaster so that
        # SSH creates a fresh one rather than failing with "Connection refused".
        if os.path.exists(self._ctl_socket):
            try:
                os.unlink(self._ctl_socket)
            except OSError:
                pass

        ssh_cmd = self._build_ssh_cmd(context)
        env = dict(os.environ)
        env.pop('SSH_AUTH_SOCK', None)
        preexec = None
        pass_fds = ()
        r_fd = None

        if self.ki_env and '_SSHPASS_CMD' in self.ki_env:
            pw = self.ki_env.get('_KI_PASSWORD', '')
            r_fd, w_fd = os.pipe()
            os.write(w_fd, (pw + '\n').encode())
            os.close(w_fd)
            ssh_cmd = ['sshpass', '-d', str(r_fd)] + ssh_cmd
            pass_fds = (r_fd,)
        elif self.ki_env:
            env.update({k: v for k, v in self.ki_env.items() if not k.startswith('_')})
            preexec = os.setsid

        try:
            trace('ssh_spawn', target=f'{RELAY_USER}@{RELAY_HOST}:{RELAY_PORT}',
                  auth=('sshpass' if self.ki_env and '_SSHPASS_CMD' in self.ki_env
                        else 'ki' if self.ki_env else 'cert'))
            self.ssh_proc = subprocess.Popen(
                ssh_cmd,
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                env=env,
                preexec_fn=preexec,
                pass_fds=pass_fds,
            )
            trace('ssh_spawned', pid=self.ssh_proc.pid)
            if r_fd is not None:
                os.close(r_fd)
        except OSError as e:
            trace('ssh_spawn_failed', err=str(e))
            return False, f'SSH failed to start: {e}'

        self._stderr_logger = _SshStderrLogger(self.ssh_proc.stderr, self._ssh_log_path)

        # Wait for the ControlMaster socket to appear (created by SSH once the
        # relay's ForceCommand starts) or for SSH to exit (auth failure etc.).
        deadline = time.monotonic() + 20
        while time.monotonic() < deadline:
            if self.ssh_proc.poll() is not None:
                break
            if os.path.exists(self._ctl_socket):
                break
            time.sleep(0.1)

        # Case 1: SSH exited -- auth failure or connection refused.
        if self.ssh_proc.poll() is not None:
            rc = self.ssh_proc.returncode
            self._stderr_logger.join(timeout=2.0)
            last = self._stderr_logger.last_error()
            reason = last or f'connection failed (ssh exit {rc})'
            trace('ssh_connect_failed', phase='early_exit', rc=rc, reason=reason)
            self.ssh_proc = None
            self._stderr_logger = None
            return False, reason

        # Case 2: 20s elapsed without the socket appearing.
        if not os.path.exists(self._ctl_socket):
            self._stderr_logger.join(timeout=2.0)
            last = self._stderr_logger.last_error()
            reason = last or 'connection timed out (no control socket)'
            trace('ssh_connect_failed', phase='ctl_socket_timeout', reason=reason)
            self.ssh_proc.terminate()
            self.ssh_proc = None
            self._stderr_logger = None
            return False, reason

        # Case 3: socket appeared -- auth succeeded, ForceCommand started.
        # ControlPersist=30s keeps the socket alive after ssh_proc exits, so
        # socket presence alone does not prove the ForceCommand is still running.
        # Poll briefly to catch a ForceCommand that dies right after auth (e.g.
        # relay rejected the session because it was not pre-registered).
        deadline2 = time.monotonic() + 2.0
        while time.monotonic() < deadline2:
            if self.ssh_proc.poll() is not None:
                self._stderr_logger.join()
                reason = self._stderr_logger.last_error() or 'relay rejected session'
                trace('ssh_connect_failed', phase='force_cmd_early_exit',
                      rc=self.ssh_proc.returncode, reason=reason)
                self.ssh_proc = None
                self._stderr_logger = None
                return False, reason
            time.sleep(0.1)

        trace('ssh_connected', pid=self.ssh_proc.pid)
        return True, ''

    def _connect_tmux_ctrl(self):
        """Attach local tmux in control-mode on the relay-spliced stdio.

        Requires self.ssh_proc to be set (call _connect_ssh first).
        Returns (True, '') or (False, reason).

        Phase 4 sizing model: the producer participates in tmux's
        smallest-wins window sizing via `refresh-client -C`.  We attach
        with `-f read-only` (dropping the ignore-size that comes with
        `-r`) so the control-mode client is counted once it has
        explicitly declared its size.  Immediately after attach, we
        set `window-size smallest` on the shared window and advertise
        the current window dims so behaviour pre-first-broker-RESIZE
        matches the pre-phase-4 state.

        `pause-after=2` makes tmux pause output to this control client
        if it cannot be delivered for 2 s.  When the SSH pipe to the
        relay stalls (flaky customer link, slow relay) tmux drops
        accumulated output for the producer instead of blocking the
        panes themselves, so the customer's shell stays responsive.
        On `%pause`, the forwarder loop sends `refresh-client -A
        <pane>:continue`; tmux re-syncs by retransmitting pane
        contents.
        """
        try:
            self.tmux_proc = subprocess.Popen(
                ['tmux', '-S', self._tmux_sock, '-C',
                 'attach-session', '-f', 'read-only,pause-after=2',
                 '-t', self._tmux_name],
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.DEVNULL,
            )
        except OSError as e:
            self.ssh_proc.terminate()
            self.ssh_proc = None
            return False, f'tmux attach failed: {e}'
        # Query current window dims for the initial refresh-client.  If
        # the query fails (shouldn't, but we just opened the session),
        # fall back to a sane initial; the broker will drive the correct
        # dims on the first RESIZE anyway.
        try:
            dims = subprocess.run(
                ['tmux', '-S', self._tmux_sock, 'display-message',
                 '-p', '-t', self._tmux_name,
                 '-F', '#{window_width}x#{window_height}'],
                stdout=subprocess.PIPE, stderr=subprocess.DEVNULL,
                universal_newlines=True, timeout=2,
            ).stdout.strip()
            if not re.fullmatch(r'\d+x\d+', dims):
                dims = '200x50'
        except (subprocess.SubprocessError, OSError):
            dims = '200x50'
        setup = (
            f'set-option -w -t {self._tmux_name} window-size smallest\n'
            f'refresh-client -C {dims}\n'
        ).encode('ascii')
        try:
            self.tmux_proc.stdin.write(setup)
            self.tmux_proc.stdin.flush()
        except (OSError, BrokenPipeError):
            pass
        return True, ''

    def _try_connect(self):
        """Open SSH to relay and start local tmux control-mode.

        Returns (True, '') on success or (False, error_reason) on failure.
        """
        ok, reason = self._connect_ssh()
        if not ok:
            return False, reason
        return self._connect_tmux_ctrl()

    def run(self):
        authkeys_remove_all(AUTHKEYS_FILE)
        if not self._ready:
            # Not pre-connected (no-tmux fallback or 6-digit token path).
            self.known_hosts_path = setup_known_hosts(
                RELAY_HOST, RELAY_CA_PUBKEY, dest_dir=self._session_dir)
            self._setup_key()

        def _on_signal(sig, _frame):
            # Watchdog: if cleanup() ever exceeds CLEANUP_DEADLINE_SECS,
            # SIGKILL ourselves so the customer is never stuck staring
            # at a hung "ending session..." prompt.  cleanup() already
            # has bounded waits on its child processes; the watchdog is
            # a backstop, not the primary timeout.
            CLEANUP_DEADLINE_SECS = 10
            def _watchdog():
                time.sleep(CLEANUP_DEADLINE_SECS)
                os.kill(os.getpid(), signal.SIGKILL)
            threading.Thread(target=_watchdog, daemon=True).start()
            self.cleanup()
            signal.signal(sig, signal.SIG_DFL)
            os.kill(os.getpid(), sig)

        signal.signal(signal.SIGINT,  _on_signal)
        signal.signal(signal.SIGTERM, _on_signal)
        signal.signal(signal.SIGHUP,  _on_signal)

        # Control FIFO: tmux hotkeys write GRANT_TUNNEL / REVOKE_TUNNEL here.
        ctl_fifo = self._ctl_fifo
        try:
            os.mkfifo(ctl_fifo, 0o600)
        except FileExistsError:
            pass

        def _ctl_reader():
            while self._conn_state != _ConnState.DONE:
                try:
                    with open(ctl_fifo) as fh:
                        got_data = False
                        for line in fh:
                            got_data = True
                            cmd = line.strip()
                            if cmd == 'GRANT_TUNNEL':
                                self._do_grant_tunnel()
                            elif cmd == 'REVOKE_TUNNEL':
                                self._do_revoke_tunnel()
                            elif cmd == 'BLOCK_TUNNEL':
                                self._do_block_tunnel()
                            elif cmd.startswith('CONFIRM_UPGRADE_'):
                                try:
                                    port = int(cmd[len('CONFIRM_UPGRADE_'):])
                                except ValueError:
                                    pass
                                else:
                                    wl = self._write_line_ref[0]
                                    if wl:
                                        t = threading.Thread(
                                            target=self._run_upgrade,
                                            args=(port, wl),
                                            daemon=True,
                                        )
                                        t.start()
                            elif cmd == 'DENY_UPGRADE':
                                wl = self._write_line_ref[0]
                                if wl:
                                    wl(_v4_ctrl(
                                        'LINBIT_TUNNEL_REJECTED',
                                        b'customer denied tunnel request'))
                                self._status_row(3,
                                    '\x1b[33m[tunl] Tunnel request denied.\x1b[m')
                            elif cmd.startswith('PREFIX_CHANGED '):
                                # set-prefix.py applied a new tmux
                                # prefix and asks the daemon to refresh
                                # the labels it owns (status bar, pane
                                # borders, mode-summary row).
                                new_key = cmd[len('PREFIX_CHANGED '):].strip()
                                if new_key:
                                    self._do_prefix_changed(new_key)
                            elif cmd == 'BRB_ON':
                                self._do_brb_on()
                            elif cmd == 'BRB_OFF':
                                self._do_brb_off()
                            elif cmd.startswith('NICK '):
                                # Customer's /nick command: forward the
                                # chosen handle to the broker as a
                                # CUSTOMER_NICK frame.  /nick has already
                                # validated the name is one whitespace-free
                                # word <= 32 chars.
                                nick = cmd[len('NICK '):]
                                wl = self._write_line_ref[0]
                                if wl and nick:
                                    nick_b = nick.encode('utf-8',
                                                         errors='replace')[:80]
                                    trace('frame_send',
                                          frame='CUSTOMER_NICK',
                                          len=len(nick_b))
                                    wl(_v4_ctrl('CUSTOMER_NICK', nick_b))
                                elif nick:
                                    trace('frame_dropped',
                                          frame='CUSTOMER_NICK',
                                          reason='no_ssh_connection')
                            elif cmd == 'ZOOM_TOGGLE':
                                if self._last_resize_r and self._panes:
                                    _tx = (['tmux', '-S', self._tmux_sock]
                                           if self._tmux_sock else ['tmux'])
                                    zr = subprocess.run(
                                        _tx + ['display-message', '-p',
                                               '#{window_zoomed_flag}'],
                                        stdout=subprocess.PIPE,
                                        stderr=subprocess.DEVNULL, universal_newlines=True,
                                    )
                                    is_zoomed = zr.stdout.strip() == '1'
                                    self._resize_after_zoom(
                                        was_zoomed=not is_zoomed, _tmux=_tx)
                        if not got_data:
                            time.sleep(0.2)
                except (OSError, IOError):
                    break

        threading.Thread(target=_ctl_reader, daemon=True).start()

        # No-tmux tunnel mode: forward stdin lines as customer chat messages.
        if not self._panes and sys.stdin.isatty():
            def _stdin_chat():
                hint = '\x1b[2m(Type a message and press Enter to chat with support.)\x1b[m'
                if sys.stderr.isatty():
                    print(f'\x1b[1m[linbit-tunl]\x1b[m {hint}', file=sys.stderr)
                else:
                    print('[linbit-tunl] Type a message and press Enter to chat.',
                          file=sys.stderr)
                while self._conn_state != _ConnState.DONE:
                    try:
                        line = sys.stdin.readline()
                    except OSError:
                        break
                    if not line:
                        break
                    text = line.rstrip('\n')
                    if not text:
                        continue
                    wl = self._write_line_ref[0]
                    if wl:
                        wl(_v4_ctrl('CUSTOMER_CHAT', text.encode('utf-8')))
                    # Echo back with green timestamp as "received" confirmation.
                    ts = time.strftime('%H:%M', time.gmtime())
                    if sys.stdout.isatty():
                        print(f'\x1b[1;32m[{ts}][you]\x1b[m {text}', flush=True)
                    else:
                        print(f'[{ts}][you] {text}', flush=True)
            threading.Thread(target=_stdin_chat, daemon=True).start()

        # Announce session in tmux status bar (ephemeral popup).
        subprocess.run(
            ['tmux', 'display-message', f'[tunl] Session: {self.session_id}'],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )

        # Disable echo and drain stdin when we are the status pane process.
        # Typed keystrokes would otherwise be echoed into the pane and pushed
        # into scroll-back history on Enter.
        if self._status_is_stdout and sys.stdin.isatty():
            try:
                import termios
                fd = sys.stdin.fileno()
                attrs = termios.tcgetattr(fd)
                attrs[3] &= ~(termios.ECHO | termios.ICANON)
                termios.tcsetattr(fd, termios.TCSANOW, attrs)

                def _drain_stdin():
                    while True:
                        try:
                            if not os.read(fd, 256):
                                break
                        except OSError:
                            break
                threading.Thread(target=_drain_stdin, daemon=True).start()
            except Exception:
                pass

        # Draw initial pinned header via _status_row so it routes to the
        # chat pane's SESSION_INFO frame (or stderr on no-tmux path).
        header = self._status_header()
        if self._status_is_stdout:
            sys.stdout.write(
                '\x1b[H\x1b[2J'
                f'\x1b[1;1H\x1b[2K{header}'
                f'\x1b[2;1H\x1b[2K\x1b[1;33m●\x1b[m relay: connecting...'
                f'\x1b[3;1H\x1b[2K\x1b[2mSSH log: {self._ssh_log_path}\x1b[m'
            )
            sys.stdout.flush()
        else:
            self._status_row(1, header)
            self._status_row(2, '\x1b[1;33m●\x1b[m relay: connecting...')
            self._status_row(3, f'\x1b[2mSSH log: {self._ssh_log_path}\x1b[m')

        backoff = RECONNECT_DELAY_MIN
        ever_connected = self._ready   # True if pre-connected by main() via fork
        while True:
            self._set_conn_state(_ConnState.CONNECTING)
            if self.ssh_proc is not None:
                # Pre-connected (fork path): inherit live ssh_proc; only attach tmux.
                ok, conn_err = self._connect_tmux_ctrl()
            else:
                ok, conn_err = self._try_connect()
            if not ok:
                if _is_auth_error(conn_err) and not ever_connected:
                    # Credential rejected on first attempt -- retrying won't help.
                    # (Pre-flight in main() should have caught this for session IDs;
                    # this covers 6-digit tokens and any bypassed pre-flight.)
                    self._status_row(
                        2,
                        f'\x1b[1;31m● relay: auth failed -- {conn_err}\x1b[m',
                    )
                    self._status_row(
                        3,
                        '\x1b[1;31mSession ID not recognized. Check and try again.\x1b[m',
                    )
                    time.sleep(10)
                    self.cleanup()
                    return
                lost = self._last_disconnected or ''
                lost_str = f'  lost {lost}' if lost else ''
                err_note = f'\x1b[1;31m{conn_err}\x1b[m  ' if conn_err else ''
                self._set_conn_state(_ConnState.RECONNECTING)
                self._status_row(
                    2,
                    f'\x1b[1;33m●\x1b[m relay: {err_note}reconnecting ({backoff}s){lost_str}',
                )
                time.sleep(backoff)
                backoff = min(backoff * 2, RECONNECT_DELAY_MAX)
                continue

            self._set_conn_state(_ConnState.CONNECTED)
            ever_connected = True
            backoff = RECONNECT_DELAY_MIN
            self._connected_since = time.strftime('%H:%M:%S', time.gmtime())
            self._status_row(
                2,
                f'\x1b[1;32m●\x1b[m relay: connected  since {self._connected_since}',
            )
            self._status_row(3, self._mode_summary)

            sock_lock = threading.Lock()

            def _write_line(data):
                try:
                    with sock_lock:
                        self.ssh_proc.stdin.write(data)
                        self.ssh_proc.stdin.flush()
                        self._wire_tap.write('>', data)
                        self._last_outbound_to_broker = time.monotonic()
                except (OSError, AttributeError):
                    pass

            # Expose write_line to the FIFO reader thread for CUSTOMER_CHAT.
            self._write_line_ref[0] = _write_line

            # Liveness: prime timestamps for this connection, start the
            # heartbeat thread.  The thread emits a CTRL HEARTBEAT every
            # HEARTBEAT_INTERVAL when the channel is otherwise idle and
            # exits when stop_event is set (on disconnect).
            self._last_inbound_from_broker = time.monotonic()
            self._last_outbound_to_broker  = time.monotonic()
            stop_event = threading.Event()
            self._heartbeat_stop_event = stop_event
            hb_frame = _v4_ctrl('HEARTBEAT')

            def _heartbeat_loop():
                interval = HEARTBEAT_INTERVAL
                tick = max(1, interval // 2)
                while not stop_event.wait(timeout=tick):
                    if (time.monotonic() - self._last_outbound_to_broker
                            >= interval):
                        _write_line(hb_frame)

            threading.Thread(target=_heartbeat_loop,
                             name='broker-heartbeat',
                             daemon=True).start()

            # Late-detection: poll once per second; when the peer has
            # been silent for >= PEER_STALE_AFTER, render a mosh-style
            # "no contact Ns ago" line in row 2.  When traffic resumes,
            # restore the "connected since X" line.  See
            # docs/share.md "Liveness".
            late_state = ['fresh']  # 'fresh' or 'stale'

            def _late_watcher():
                while not stop_event.wait(timeout=1.0):
                    if self._conn_state != _ConnState.CONNECTED:
                        continue
                    age = int(time.monotonic() -
                              self._last_inbound_from_broker)
                    if age >= PEER_STALE_AFTER:
                        if late_state[0] != 'stale':
                            late_state[0] = 'stale'
                            trace('peer_stale', age_s=age)
                        # Re-render every tick so the seconds counter
                        # advances; the row write is idempotent in the
                        # chat-pane status header path.
                        self._status_row(
                            2,
                            f'\x1b[1;33m●\x1b[m relay: '
                            f'no contact {age}s ago')
                    else:
                        if late_state[0] == 'stale':
                            late_state[0] = 'fresh'
                            trace('peer_back', age_s=age)
                            self._status_row(
                                2,
                                f'\x1b[1;32m●\x1b[m relay: connected '
                                f'since {self._connected_since}')

            threading.Thread(target=_late_watcher,
                             name='broker-late-watcher',
                             daemon=True).start()

            fwd_thread = threading.Thread(
                target=self._run_cmd_forwarder,
                args=(self.ssh_proc, sock_lock, _write_line),
                daemon=True,
            )
            fwd_thread.start()

            # When SSH exits (auth failure, network drop, etc.) terminate the
            # local tmux control-mode process so the stdout iterator below
            # unblocks promptly instead of waiting for the next tmux event.
            _ssh_proc_ref = self.ssh_proc
            _tmux_proc_ref = self.tmux_proc

            def _watch_ssh():
                _ssh_proc_ref.wait()
                if _tmux_proc_ref is not None:
                    try:
                        _tmux_proc_ref.terminate()
                    except OSError:
                        pass

            threading.Thread(target=_watch_ssh, daemon=True).start()

            if self._tunnel_port:
                def _auto_upgrade():
                    # Small delay to let the broker finish its handshake
                    # before we inject LINBIT_TUNNEL_PORT into the stream.
                    time.sleep(2)
                    self._run_upgrade(self._tunnel_port, _write_line)
                threading.Thread(target=_auto_upgrade, daemon=True).start()

            # Only forward %output from the main pane.  Chat is forwarded
            # as CUSTOMER_CHAT by the FIFO reader; status pane output (cursor-
            # positioning escape sequences for the 3-row status area) would
            # corrupt the consumer's display if forwarded as raw VT bytes.
            _main_pane_b = self._panes.get('main', '').encode('ascii')

            # Option D: tee the raw tmux control-mode stream (pre-filter) to a
            # parallel file so we can see frames tmux emitted that we dropped.
            if tracing_enabled('tmux') and self._tmux_tap_fh is None:
                try:
                    self._tmux_tap_fh = open(
                        os.path.join(self._session_dir, 'tmux-control.log'),
                        'ab', buffering=0)
                except OSError:
                    self._tmux_tap_fh = None

            try:
                for line in self.tmux_proc.stdout:
                    if self._tmux_tap_fh is not None:
                        try:
                            self._tmux_tap_fh.write(line)
                        except OSError:
                            pass
                    if _main_pane_b:
                        # tmux emits pane data as `%output PANE PAYLOAD` for
                        # plain control clients and as `%extended-output
                        # PANE AGE [...] : PAYLOAD` for clients with the
                        # pause-after flag set (we set it -- see attach
                        # above).  Both carry pane-id as the first arg;
                        # only forward main-pane data.  Status / chat /
                        # inner panes are filtered out so their VT bytes
                        # do not corrupt the consumer's view of the main
                        # pane.
                        if line.startswith(b'%output '):
                            parts = line.split(None, 2)
                        elif line.startswith(b'%extended-output '):
                            parts = line.split(None, 2)
                        else:
                            parts = None
                        if (parts is not None and len(parts) >= 2
                                and parts[1].rstrip(b'\r\n') != _main_pane_b):
                            continue
                    # Wrap each control-mode line in a v4 DATA frame.
                    # The broker reassembles DATA payloads and runs the
                    # %output / %layout-change parsers as before.
                    frame = _v4_data(line)
                    try:
                        with sock_lock:
                            self.ssh_proc.stdin.write(frame)
                            self.ssh_proc.stdin.flush()
                            self._wire_tap.write('>', frame)
                            self._last_outbound_to_broker = time.monotonic()
                    except OSError:
                        break
                    # Reading a `%pause %N` notification means tmux
                    # paused output for that pane after the SSH pipe
                    # stalled.  We've just successfully forwarded a
                    # frame, so the pipe is moving again -- resume.
                    paused_pane = _parse_pause_pane(line)
                    if paused_pane is not None:
                        try:
                            self.tmux_proc.stdin.write(
                                b"refresh-client -A '" + paused_pane
                                + b":continue'\n")
                            self.tmux_proc.stdin.flush()
                        except (OSError, BrokenPipeError):
                            pass
            except (BrokenPipeError, KeyboardInterrupt):
                break

            if self.tmux_proc is not None:
                self.tmux_proc.terminate()
                self.tmux_proc.wait()
                self.tmux_proc = None

            disc_err = ''
            if self.ssh_proc is not None:
                try:
                    self.ssh_proc.stdin.close()
                except OSError:
                    pass
                self.ssh_proc.wait()
                if self._stderr_logger:
                    self._stderr_logger.join()
                    disc_err = self._stderr_logger.last_error()
                    self._stderr_logger = None
                trace('ssh_disconnected', rc=self.ssh_proc.returncode,
                      last_err=disc_err)
                self.ssh_proc = None

            # Stop the per-connection heartbeat thread; it has no
            # producer to write to anymore and would only spin uselessly.
            if self._heartbeat_stop_event is not None:
                self._heartbeat_stop_event.set()
                self._heartbeat_stop_event = None
            self._write_line_ref[0] = None   # FIFO thread must not use dead connection
            self._set_conn_state(_ConnState.RECONNECTING)
            self._last_disconnected = time.strftime('%H:%M:%S', time.gmtime())
            lost_str = f'  lost {self._last_disconnected}'
            err_note = f'\x1b[1;31m{disc_err}\x1b[m  ' if disc_err else ''
            self._status_row(
                2,
                f'\x1b[1;33m●\x1b[m relay: {err_note}reconnecting ({backoff}s){lost_str}',
            )
            time.sleep(backoff)
            backoff = min(backoff * 2, RECONNECT_DELAY_MAX)


# ---------------------------------------------------------------------------
# Share mode tmux launchers
# ---------------------------------------------------------------------------

def _run_propagate_stop(cmd, **kwargs):
    """Run a subprocess, stopping ourselves when the child stops.

    tmux suspend-client sends SIGTSTP to its own PID only (not the process
    group), so the parent never receives it.  We use WUNTRACED in waitpid()
    to detect when the child stops, then stop ourselves with SIGSTOP.
    The shell's 'fg' sends SIGCONT to the whole process group, resuming
    both us and the child -- no explicit forwarding needed.
    """
    proc = subprocess.Popen(cmd, **kwargs)
    while True:
        try:
            _pid, status = os.waitpid(proc.pid, os.WUNTRACED)
        except ChildProcessError:
            proc.returncode = 255
            break
        if os.WIFSTOPPED(status):
            os.kill(os.getpid(), signal.SIGSTOP)
            continue
        if os.WIFEXITED(status):
            proc.returncode = os.WEXITSTATUS(status)
            break
        if os.WIFSIGNALED(status):
            proc.returncode = -os.WTERMSIG(status)
            break
    return proc


def _token_file_path(session_id):
    """Per-session location of the cleartext token (if any)."""
    return os.path.join(_session_dir(session_id), 'token')


def _write_token_file(session_id, token):
    """Persist `token` to the per-session token file with mode 0o600.

    Closes B1.5 in the 2026-05-06 audit: passing the token via argv
    leaks it to /proc/<pid>/cmdline (world-readable on stock Linux).
    The token file lives in the strict-created session directory
    (mode 0o700, owner-only), and the open uses O_NOFOLLOW + mode set
    at creation to defeat symlink-pointed-elsewhere attacks.

    A stale token file from a prior run inside our (already-vetted)
    session dir is unlinked and replaced; this lets re-running with
    --token reuse the same path without an O_EXCL collision.
    """
    if not token:
        return
    p = _token_file_path(session_id)
    flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_NOFOLLOW
    try:
        fd = os.open(p, flags, 0o600)
    except FileExistsError:
        os.unlink(p)
        fd = os.open(p, flags, 0o600)
    with os.fdopen(fd, 'w') as f:
        f.write(token)


def _read_token_file(session_id):
    """Return the token persisted by `_write_token_file`, or '' if absent."""
    p = _token_file_path(session_id)
    try:
        with open(p, 'r') as f:
            return f.read().strip()
    except FileNotFoundError:
        return ''


def _build_inner_cmd(session_id, err_path, case_num='', token='', user='',
                     restricted=False, tunnel_port=None,
                     attach=False, debug=False, daemon=False):
    """Build the argv for the --_inner sub-process.

    The token is NOT emitted on argv (would leak to /proc/<pid>/cmdline
    for the entire session lifetime -- B1.5 in the 2026-05-06 audit).
    Instead it is written to the per-session token file before this
    call returns; the inner reads it back from that path.
    """
    _write_token_file(session_id, token)
    cmd = [sys.executable, os.path.abspath(sys.argv[0]),
           '--_inner', session_id, err_path]
    if case_num:
        cmd += ['--case', case_num]
    if user:
        cmd += ['--user', user]
    if restricted:
        cmd.append('--restricted')
    if tunnel_port is not None:
        cmd += ['--tunnel-port', str(tunnel_port)]
    if attach:
        cmd.append('--attach')
    if debug:
        cmd.append('--debug')
    if daemon:
        cmd.append('--daemon')
    return cmd


def _tmux_at_own_socket(session_id):
    """Return True if $TMUX points at the dedicated socket for this session."""
    tmux_env = os.environ.get('TMUX', '')
    if not tmux_env:
        return False
    sock_field = tmux_env.split(',')[0]
    return sock_field == _tmux_sock_path(session_id)


def _launch_in_tmux_attach(session_id, case_num='', token='', user='',
                            restricted=False, tunnel_port=None,
                            debug=False):
    """Create a linbit-tunl window inside the current tmux session.

    Used with --attach or when $TMUX already points at our dedicated socket
    (re-run from inside the linbit-tunl tmux).  Creates a new window (or
    respawns an existing one found via @linbit-role tags), then selects it.

    Returns True on success, False on error.
    """
    name = _tmux_local_name(session_id)
    err_path = os.path.join(_session_dir(session_id), 'err')
    try:
        os.unlink(err_path)
    except FileNotFoundError:
        pass

    inner_cmd = _build_inner_cmd(
        session_id, err_path, case_num=case_num, token=token, user=user,
        restricted=restricted, tunnel_port=tunnel_port, attach=True, debug=debug,
    )

    existing = _discover_share_panes()
    inner_pane = None
    if existing:
        # Prefer 'inner' (M5+); fall back to 'status' for pre-M5 sessions.
        inner_pane = existing.get('inner') or existing.get('status')
    if inner_pane:
        print(f'Reconnecting in existing window...', file=sys.stderr)
        subprocess.run(
            ['tmux', 'respawn-pane', '-k', '-t', inner_pane]
            + inner_cmd,
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )
        main_pane = existing.get('main') if existing else None
        # Select the window that hosts the visible main pane, not the
        # hidden inner pane.
        target = main_pane or inner_pane
        subprocess.run(['tmux', 'select-window', '-t', target],
                       stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    else:
        print(f'Creating linbit-tunl window...', file=sys.stderr)
        subprocess.run(
            ['tmux', 'new-window', '-n', name, '--'] + inner_cmd,
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )
    return True


def _launch_in_tmux_share_detached(session_id, case_num='', token='', user='',
                                    restricted=False,
                                    tunnel_port=None, debug=False):
    """Start a detached tmux session for non-interactive environments.

    Used when tmux is available but stdout is not a TTY (containers, systemd
    units, cron jobs).  Creates the tmux session with new-session -d, then
    blocks by polling has-session until the inner process kills the session.
    """
    sdir = _session_dir(session_id)
    sock = _tmux_sock_path(session_id)
    name = _tmux_local_name(session_id)
    err_path = os.path.join(sdir, 'err')
    try:
        os.unlink(err_path)
    except FileNotFoundError:
        pass

    inner_cmd = _build_inner_cmd(
        session_id, err_path, case_num=case_num, token=token, user=user,
        restricted=restricted, tunnel_port=tunnel_port, debug=debug, daemon=True,
    )

    _tf = ['-f', '/dev/null']
    print(f'{PREFIX} No TTY; starting detached tmux session.',
          file=sys.stderr)
    subprocess.run(
        ['tmux', *_tf, '-S', sock, 'new-session', '-d', '-s', name,
         '-x', '220', '-y', '50', '--'] + inner_cmd,
        check=True,
    )

    def _sigterm_handler(_signum, _frame):
        raise SystemExit(0)
    signal.signal(signal.SIGTERM, _sigterm_handler)

    try:
        while True:
            time.sleep(2)
            rc = subprocess.run(
                ['tmux', *_tf, '-S', sock, 'has-session', '-t', name],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            ).returncode
            if rc != 0:
                break
    except (KeyboardInterrupt, SystemExit):
        kill_tmux_session(session_id)

    err_msg = ''
    try:
        with open(err_path) as f:
            err_msg = f.read().strip()
        os.unlink(err_path)
    except FileNotFoundError:
        pass
    if err_msg:
        for line in err_msg.splitlines():
            print(f'{PREFIX} {line}', file=sys.stderr)

    cast_path = ''
    try:
        with open(os.path.join(sdir, 'cast_path')) as f:
            cast_path = f.read().strip()
    except (OSError, FileNotFoundError):
        pass
    if cast_path and os.path.isfile(cast_path):
        print(f'{PREFIX} Session recording: {cast_path}', file=sys.stderr)
        print(f'{PREFIX}   Replay with: asciinema play -i 2 {cast_path}',
              file=sys.stderr)


def _launch_in_tmux_share_outer(session_id, case_num='', token='', user='',
                                 restricted=False, tunnel_port=None,
                                 debug=False):
    """Start a new attached tmux session running ourselves in --_inner mode.

    The inner process handles SSH connect, pane layout, and the daemon relay
    loop -- all within the tmux window.  Blocks until the tmux client exits
    (detach or session death).

    On SSH error the inner writes an err_path file before killing the session;
    we read it and print to stderr.  Tic/toc: if tmux ran < 5 s and no error
    file was written, the session likely crashed before writing it.

    Returns True on clean exit (detach or normal teardown), False on error.
    Returns None if tmux is not available.
    """
    if not _tmux_share_capable():
        return None

    sdir = _session_dir(session_id)
    debug_log = os.path.join(sdir, 'debug.log')
    with _StderrTee(debug_log):
        return _launch_in_tmux_share_outer_body(session_id, sdir,
            case_num=case_num, token=token, user=user,
            restricted=restricted, tunnel_port=tunnel_port, debug=debug)


def _launch_in_tmux_share_outer_body(session_id, sdir, case_num='', token='',
                                      user='', restricted=False,
                                      tunnel_port=None, debug=False):
    sock = _tmux_sock_path(session_id)
    name = _tmux_local_name(session_id)
    err_path = os.path.join(sdir, 'err')
    try:
        os.unlink(err_path)
    except FileNotFoundError:
        pass

    inner_cmd = _build_inner_cmd(
        session_id, err_path, case_num=case_num, token=token, user=user,
        restricted=restricted, tunnel_port=tunnel_port, debug=debug,
    )

    try:
        sz = os.get_terminal_size()
        cols, rows = sz.columns, sz.lines
    except OSError:
        cols, rows = 220, 50

    # Check for an existing session at our custom socket.
    _tf = ['-f', '/dev/null']
    has_session = subprocess.run(
        ['tmux', *_tf, '-S', sock, 'has-session', '-t', name],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    ).returncode == 0

    t_start = time.monotonic()
    if has_session:
        existing = _discover_share_panes(sock, name)
        inner_pane = None
        if existing:
            # Prefer 'inner' (M5+); fall back to 'status' for pre-M5 sessions.
            inner_pane = existing.get('inner') or existing.get('status')
        if inner_pane:
            print(f'Reconnecting to existing session...',
                  file=sys.stderr)
            subprocess.run(
                ['tmux', *_tf, '-S', sock, 'respawn-pane', '-k',
                 '-t', inner_pane] + inner_cmd,
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            )
        else:
            print(f'Attaching to existing session '
                  '(daemon will start on next reconnect).',
                  file=sys.stderr)
        _run_propagate_stop(['tmux', *_tf, '-S', sock, 'attach-session', '-t', name])
    else:
        _run_propagate_stop([
            'tmux', *_tf, '-S', sock, 'new-session', '-s', name,
            '-x', str(cols), '-y', str(rows),
            '--',
        ] + inner_cmd)
    elapsed = time.monotonic() - t_start

    subprocess.run(
        ['tmux', *_tf, '-S', sock, 'kill-server'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )

    err_msg = ''
    try:
        with open(err_path) as f:
            err_msg = f.read().strip()
        os.unlink(err_path)
    except FileNotFoundError:
        pass

    abnormal = False

    if err_msg:
        for line in err_msg.splitlines():
            print(f'{PREFIX} {line}', file=sys.stderr)
        abnormal = True

    if elapsed < 5.0:
        ssh_log = os.path.join(sdir, 'ssh.log')
        tail = ''
        try:
            with open(ssh_log) as f:
                lines = f.read().strip().splitlines()
            if lines:
                tail = '\n'.join(f'  {l}' for l in lines[-10:])
        except (OSError, FileNotFoundError):
            pass
        if tail:
            print(f'{PREFIX} SSH log (last lines):\n{tail}', file=sys.stderr)
        elif not abnormal:
            print(f'{PREFIX} Session ended after {elapsed:.1f}s.'
                  ' SSH connection was not established.', file=sys.stderr)
        abnormal = True

    cast_path = ''
    try:
        with open(os.path.join(sdir, 'cast_path')) as f:
            cast_path = f.read().strip()
    except (OSError, FileNotFoundError):
        pass

    if debug or abnormal:
        print(f'{PREFIX} Session logs preserved in {sdir}', file=sys.stderr)
    else:
        shutil.rmtree(sdir, ignore_errors=True)

    if cast_path and os.path.isfile(cast_path):
        print(f'{PREFIX} Session recording: {cast_path}', file=sys.stderr)
        print(f'{PREFIX}   Replay with: asciinema play -i 2 {cast_path}',
              file=sys.stderr)

    return not abnormal


def _inner_main(session_id, err_path, argv):
    """Running inside a tmux window: connect SSH, build pane layout, run daemon.

    Called when argv[0] == '--_inner'.  Prints connection progress to stdout
    (visible in the tmux window).  On SSH failure for a fresh session, writes
    err_path and kills the session (or window in --attach mode).  On reconnect,
    proceeds to sess.run() whose reconnect loop handles retries.
    """
    case_num = ''
    user = ''
    restricted = False
    tunnel_port = None
    attach = False
    debug = False
    daemon = False

    # Token comes via the per-session token file, not argv (B1.5 in the
    # 2026-05-06 audit -- passing on argv leaks to /proc/<pid>/cmdline
    # for the entire session).  _build_inner_cmd writes the file in
    # the outer process before launching us.
    token = _read_token_file(session_id)

    i = 0
    while i < len(argv):
        arg = argv[i]
        if arg == '--case' and i + 1 < len(argv):
            i += 1; case_num = argv[i]
        elif arg == '--user' and i + 1 < len(argv):
            i += 1; user = argv[i]
        elif arg == '--restricted':
            restricted = True
        elif arg == '--tunnel-port' and i + 1 < len(argv):
            i += 1; tunnel_port = int(argv[i])
        elif arg == '--attach':
            attach = True
        elif arg == '--debug':
            debug = True
        elif arg == '--daemon':
            daemon = True
        i += 1

    sess = ShareSession(session_id, case_num=case_num, token=token,
                        user=user, restricted=restricted,
                        tunnel_port=tunnel_port)
    sess._attach_mode = attach
    sess._debug = debug
    sess._daemon_mode = daemon
    sess.known_hosts_path = setup_known_hosts(
        RELAY_HOST, RELAY_CA_PUBKEY, dest_dir=sess._session_dir)
    sess._setup_key()
    init('customer', sid=session_id,
         path=os.path.join(sess._session_dir, 'trace.jsonl'),
         lastgasp_path=os.path.join(sess._session_dir, 'lastgasp.jsonl'))
    trace('customer_inner_start', mode=('tunnel' if tunnel_port else
                                         'restricted' if restricted else 'share'),
          tunnel_port=tunnel_port, attach=attach, debug=debug)

    existing = _discover_share_panes()
    is_reconnect = bool(existing and existing.get('main'))

    mode_str = ('tunnel' if tunnel_port is not None
                else 'restricted' if restricted
                else 'share')
    print(f'Session: {session_id}')
    print(f'Relay:   {RELAY_HOST}:{RELAY_PORT}')
    print(f'Mode:    {mode_str}')
    print(f'Connecting to relay...', end='', flush=True)

    ok, reason = sess._connect_ssh()

    if not ok:
        print(f' FAILED')
        if is_reconnect:
            print(f'{reason} -- will retry')
        else:
            print(f'{PREFIX} ERROR: {reason}')
            if _is_auth_error(reason):
                print(f'{PREFIX} Session ID not recognized. '
                      'Check and try again.')
            try:
                with open(err_path, 'w') as f:
                    f.write(reason + '\n')
                    if _is_auth_error(reason):
                        f.write('Session ID not recognized. '
                                'Check and try again.')
            except OSError:
                pass
            _kill = 'kill-window' if attach else 'kill-server'
            subprocess.run(['tmux', _kill],
                           stdout=subprocess.DEVNULL,
                           stderr=subprocess.DEVNULL)
            sys.exit(1)
    else:
        print(' OK')
        sess._ready = True

    _launch_in_tmux_share_inner(sess)


def _start_share_fifo_reader(sess, fifo_path):
    """Start the outbound chat-FIFO reader thread.

    The thread reads one text line at a time from the outbound FIFO and
    translates it to a wire frame:

    - ``PANE_DEAD:``     -> ``PANE_DEAD main`` to the relay (sentinel written
                            by the pane-died hook), plus a local ephemeral
                            status notice.
    - ``RESPAWN_LOCAL:`` -> clear the ephemeral notice (written by the
                            bind-key run-shell after a respawn-pane).
    - anything else      -> ``CUSTOMER_CHAT hex`` to the relay.

    Drops frames with a trace breadcrumb when no SSH write is attached
    (pre-connect or post-disconnect).
    """
    def _fifo_reader():
        while True:
            try:
                with open(fifo_path, 'rb') as f:
                    got_data = False
                    for line_bytes in f:
                        got_data = True
                        line = line_bytes.rstrip(b'\n')
                        if not line:
                            continue
                        fn = sess._write_line_ref[0]
                        if line == b'PANE_DEAD:':
                            trace('pane_dead_fifo_recv', fn_attached=fn is not None)
                            if fn is not None:
                                trace('frame_send', frame='PANE_DEAD', dir='to_relay')
                                fn(_v4_ctrl('PANE_DEAD main'))
                            else:
                                trace('frame_dropped', frame='PANE_DEAD',
                                      reason='no_ssh_connection')
                            sess._emit_ephemeral(
                                'Shell exited.  Ctrl-b r to restart.',
                                lifecycle='state',
                            )
                        elif line == b'RESPAWN_LOCAL:':
                            trace('respawn_local_fifo_recv')
                            sess._emit_ephemeral('', lifecycle='clear')
                            sess._status_row(3, sess._mode_summary)
                        else:
                            if fn is not None:
                                trace('frame_send', frame='CUSTOMER_CHAT',
                                      len=len(line), digest=frame_digest(line))
                                fn(_v4_ctrl('CUSTOMER_CHAT', line))
                            else:
                                trace('frame_dropped', frame='CUSTOMER_CHAT',
                                      reason='no_ssh_connection', len=len(line))
                    if not got_data:
                        time.sleep(0.2)
            except OSError as e:
                trace('fifo_reader_exit', err=str(e))
                break

    threading.Thread(target=_fifo_reader, daemon=True).start()


def _discover_or_create_share_panes(shell, chat_cmd):
    """Ensure main + chat panes exist; return (main_id, chat_id).

    Two code paths:

    * **Reconnect.** ``_discover_share_panes`` found an existing main
      pane.  Keep it (customer's shell is preserved), respawn the chat
      pane (its FIFO fd is stale from the previous daemon), or create
      a chat pane if the session was a pre-M5 layout without one.

    * **Fresh.** Carve main + chat from the current single-pane window
      and break the inner (daemon) pane off to a hidden background
      window tagged ``@linbit-role=inner`` so it is findable on
      reconnect.
    """
    existing = _discover_share_panes()

    if existing and 'main' in existing:
        main_id = existing['main']
        chat_id = existing.get('chat')

        if chat_id:
            subprocess.run(
                ['tmux', 'respawn-pane', '-k', '-t', chat_id, chat_cmd],
                stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            )
        else:
            subprocess.run([
                'tmux', 'split-window', '-t', main_id, '-v', '-l', '9',
                chat_cmd,
            ], check=True)
            r = subprocess.run(
                ['tmux', 'display-message', '-p', '#{pane_id}'],
                stdout=subprocess.PIPE, universal_newlines=True)
            chat_id = r.stdout.strip()
        return main_id, chat_id

    # Fresh layout.
    r = subprocess.run(['tmux', 'display-message', '-p', '#{pane_id}'],
                       stdout=subprocess.PIPE, universal_newlines=True)
    inner_id = r.stdout.strip()

    subprocess.run(
        ['tmux', 'split-window', '-t', inner_id, '-v', shell],
        check=True)
    r = subprocess.run(['tmux', 'display-message', '-p', '#{pane_id}'],
                       stdout=subprocess.PIPE, universal_newlines=True)
    main_id = r.stdout.strip()

    subprocess.run([
        'tmux', 'split-window', '-t', main_id, '-v', '-l', '9',
        chat_cmd,
    ], check=True)
    r = subprocess.run(['tmux', 'display-message', '-p', '#{pane_id}'],
                       stdout=subprocess.PIPE, universal_newlines=True)
    chat_id = r.stdout.strip()

    # Break the inner pane off to a hidden background window so only
    # main + chat remain visible.  The inner keeps running there; its
    # own tmux command invocations still target the visible panes by
    # their pane IDs.  Tag with @linbit-role=inner so outer launchers
    # can respawn it on reconnect.
    #
    # Design choice: the daemon process intentionally lives as a tmux
    # pane (not a forked detached process).  Tmux doubles as both
    # process supervisor and discovery mechanism: kill-session reaps
    # the daemon as a side effect, and reconnect locates it by
    # `list-panes -F` filtering on @linbit-role.  Moving the daemon
    # outside tmux would replace those with explicit PID files,
    # signal-based shutdown, and a separate log channel for stdout
    # / stderr -- meaningful plumbing for negligible gain.
    subprocess.run(
        ['tmux', 'set-option', '-p', '-t', inner_id,
         '@linbit-role', 'inner'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    subprocess.run(
        ['tmux', 'break-pane', '-d', '-n', '~linbit-daemon', '-s', inner_id],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    return main_id, chat_id


def _tmux_key_display(key):
    """Render a tmux key spec (e.g. 'C-a', 'M-Space', 'F12') as a label.

    'C-' becomes 'Ctrl-', 'M-' becomes 'Alt-', 'S-' becomes 'Shift-'.
    Other modifiers and the base key are passed through verbatim.
    """
    parts = key.split('-')
    out = []
    for m in parts[:-1]:
        out.append({'C': 'Ctrl', 'M': 'Alt', 'S': 'Shift'}.get(m, m))
    out.append(parts[-1])
    return '-'.join(out)


def _resolve_tmux_prefix():
    """Read TUNL_PREFIX from env, return (tmux_key, display_label)."""
    raw = os.environ.get('TUNL_PREFIX', '').strip() or 'C-b'
    return raw, _tmux_key_display(raw)


def _apply_tmux_prefix(prefix_key):
    """Set -g prefix on the implicit ($TMUX) tmux server, plus send-prefix.

    Returns the key that ended up applied: the requested one if tmux
    accepted it, 'C-b' otherwise.  Logs a warning on rejection -- a bad
    TUNL_PREFIX must not abort the session.
    """
    if prefix_key == 'C-b':
        return 'C-b'
    r = subprocess.run(
        ['tmux', 'set-option', '-g', 'prefix', prefix_key],
        stderr=subprocess.PIPE, universal_newlines=True,
    )
    if r.returncode != 0:
        msg = (r.stderr or '').strip() or 'rejected by tmux'
        print(f"{PREFIX} TUNL_PREFIX={prefix_key!r} {msg}; using Ctrl-b.",
              file=sys.stderr)
        return 'C-b'
    subprocess.run(
        ['tmux', 'bind-key', '-T', 'prefix', prefix_key, 'send-prefix'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    return prefix_key


def _render_cheat_sheet(template, prefix_label):
    """Substitute the active prefix label into a cheat-sheet template.

    Templates are written with 'Ctrl-b' as the placeholder; no-op when
    prefix_label is the default.
    """
    if prefix_label == 'Ctrl-b':
        return template
    return template.replace('Ctrl-b', prefix_label)


_CUSTOMER_CHEAT_SHEET = """\
LINBIT tunl -- customer cheat sheet
====================================

Ctrl-b is the tmux prefix.  All "Ctrl-b <X>" combos: hold Ctrl, tap b,
release Ctrl, tap X.

Hotkeys:
  Ctrl-b Q          end the session for good (orderly shutdown, confirm)
  Ctrl-b c          toggle focus between chat and shell pane
  Ctrl-b C-c        same toggle (chord variant)
  Ctrl-b u          grant / unblock tunnel access
  Ctrl-b U          revoke tunnel access
  Ctrl-b X          block all future tunnel requests
  Ctrl-b L  /  Ctrl-b Ctrl-l
                    reset pane layout
  Ctrl-b z          toggle local zoom on the focused pane
  F1  /  Ctrl-b Enter
                    open the action menu
  Ctrl-b ?          show this cheat sheet

When the shell pane shows "Shell exited (status N)":
  Enter             restart the shell
  q                 end the session for good

Chat pane (the lower split, curses-based input):
  /help             list available slash commands
  /brb  /  /back    freeze / restore support's keystroke forwarding
  /who              show customer + connected viewers
  /me <action>      send an emote-style chat line
  /nick <name>      set a local display name
  /search <regex>   filter scrollback;  /search alone clears
  /quit             disconnect chat (same effect as Ctrl-b Q)
  Tab               complete @nick or /command
  PgUp  /  PgDn     scroll chat history
"""


def _write_set_prefix_helper(sess, template_path, cheat_path):
    """Generate set-prefix.py in the session dir and return its path.

    The helper, invoked from the "Change tmux prefix..." menu entry's
    command-prompt, applies a new prefix to the running tmux server,
    re-renders the cheat-sheet file from its template, and notifies
    the daemon via PREFIX_CHANGED on the ctl FIFO so it can refresh
    the status bar, pane-border labels, and mode-summary row.

    Kept as an external Python script (not a shell helper) so the
    label conversion uses the same logic _tmux_key_display does in
    the parent process -- no duplicated awk/sed acrobatics.
    """
    helper = os.path.join(sess._session_dir, 'set-prefix.py')
    body = (
        f"#!{sys.executable}\n"
        "import subprocess, sys\n"
        f"TEMPLATE = {template_path!r}\n"
        f"CHEAT    = {cheat_path!r}\n"
        f"CTL_FIFO = {sess._ctl_fifo!r}\n"
        "key = sys.argv[1].strip() if len(sys.argv) > 1 else ''\n"
        "if not key:\n"
        "    subprocess.run(['tmux', 'display-message',\n"
        "                    'No prefix given; nothing changed.'])\n"
        "    sys.exit(0)\n"
        "r = subprocess.run(['tmux', 'set-option', '-g', 'prefix', key],\n"
        "                   stderr=subprocess.PIPE, universal_newlines=True)\n"
        "if r.returncode != 0:\n"
        "    err = (r.stderr or '').strip() or 'rejected by tmux'\n"
        "    subprocess.run(['tmux', 'display-message', f'Prefix {key!r} {err}'])\n"
        "    sys.exit(0)\n"
        "subprocess.run(['tmux', 'bind-key', '-T', 'prefix', key, 'send-prefix'])\n"
        "parts = key.split('-')\n"
        "label = '-'.join([{'C': 'Ctrl', 'M': 'Alt', 'S': 'Shift'}.get(m, m)\n"
        "                  for m in parts[:-1]] + [parts[-1]])\n"
        "with open(TEMPLATE) as fh:\n"
        "    tpl = fh.read()\n"
        "with open(CHEAT, 'w') as fh:\n"
        "    fh.write(tpl if label == 'Ctrl-b' else tpl.replace('Ctrl-b', label))\n"
        "# Notify the daemon so it can refresh status bar / pane labels /\n"
        "# mode-summary row.  Failure here is non-fatal -- the binding\n"
        "# is already in effect and the user can manually reconnect.\n"
        "try:\n"
        "    with open(CTL_FIFO, 'w') as fh:\n"
        "        fh.write(f'PREFIX_CHANGED {key}\\n')\n"
        "except OSError:\n"
        "    pass\n"
        "subprocess.run(['tmux', 'display-message',\n"
        "                f'Prefix is now {label}.'])\n"
    )
    with open(helper, 'w') as fh:
        fh.write(body)
    # 0o700: owner rwx is enough -- run-shell exec's the path as the
    # customer's uid.  Tightening from 0o755 closes B1.9 in the
    # 2026-05-06 audit (cosmetic on a healthy host because the
    # parent dir is 0o700, but defence-in-depth if that changes).
    os.chmod(helper, 0o700)
    return helper


def _install_share_keybindings(sess, main_id, chat_id, fifo_path):
    """Bind the Ctrl-b hotkeys and build the Ctrl-b Enter menu.

    Hotkeys target our own PID (the daemon is this process) so Ctrl-b
    Q delivers SIGINT directly.  The display-menu items wrap the same
    run-shell commands for mouse-first users.

    There is no separate "force kill" key: the SIGINT handler runs
    cleanup() with bounded waits and a watchdog escalates to SIGKILL
    if cleanup ever exceeds its deadline.  One button, predictable.
    """
    pid = os.getpid()
    # Cheat-sheet popup: write a project-specific reference into the
    # session dir; bind Ctrl-b ? to a display-popup that shows it.
    # This shadows tmux's default Ctrl-b ? (list-keys) -- the list of
    # raw tmux bindings is rarely what a customer wants; a focused
    # cheat sheet is.  Single source of truth: this string.
    #
    # Template file is kept alongside the rendered cheat sheet so the
    # set-prefix.py helper (bound to "Change tmux prefix..." in the
    # menu) can re-render with a new prefix label mid-session.
    cheat_path     = os.path.join(sess._session_dir, 'cheat-sheet.txt')
    template_path  = os.path.join(sess._session_dir, 'cheat-sheet.template.txt')
    with open(template_path, 'w') as fh:
        fh.write(_CUSTOMER_CHEAT_SHEET)
    with open(cheat_path, 'w') as fh:
        fh.write(_render_cheat_sheet(_CUSTOMER_CHEAT_SHEET, sess._prefix_label))
    set_prefix_path = _write_set_prefix_helper(sess, template_path, cheat_path)
    cheat_cmd = (
        f'cat {shlex.quote(cheat_path)};'
        f' printf "\\n[press Enter to close]";'
        f' read _'
    )
    subprocess.run([
        'tmux', 'bind-key', '-T', 'prefix', '?',
        'display-popup', '-E', '-T', ' Customer cheat sheet ',
        '-h', '85%', '-w', '80%', cheat_cmd,
    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    # Ctrl-b Q ends the relay session (customer's lifecycle action).
    # Wrap in confirm-before because the action is irreversible: the
    # session closes, archive hook fires, support disconnects.
    end_session_cmd = f'run-shell "kill -INT {pid} 2>/dev/null; true"'
    end_session_confirm = (
        f'confirm-before -p "End the session for good? (y/n) " '
        f'{shlex.quote(end_session_cmd)}'
    )
    subprocess.run([
        'tmux', 'bind-key', '-T', 'prefix', 'Q', end_session_confirm,
    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    # Ctrl-b c and Ctrl-b C-c: toggle between the main shell pane
    # and the chat pane.  c overrides the default "new-window" --
    # intentional on the customer side; we don't want a noob
    # accidentally opening invisible-to-support windows.  C-c
    # mirrors the support-side chord for muscle-memory parity.
    toggle_chat = (
        f'if [ "$(tmux display-message -p \'#{{pane_id}}\')" = {_tmux_sq(chat_id)} ];'
        f' then tmux select-pane -t {_tmux_sq(main_id)};'
        f' else tmux select-pane -t {_tmux_sq(chat_id)}; fi'
    )
    for _toggle_key in ('c', 'C-c'):
        subprocess.run([
            'tmux', 'bind-key', '-T', 'prefix', _toggle_key, 'run-shell',
            toggle_chat,
        ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    # Ctrl-b u: grant/unblock tunnel access (CONFIRM -> ALLOWED, or AUTO_REJECT -> CONFIRM).
    # Ctrl-b U: revoke tunnel access (kills ControlMaster, reconnects without -R).
    # Both are always bound; _do_grant/_do_revoke are no-ops when mode is wrong.
    ctl_fifo = _tmux_sq(sess._ctl_fifo)
    subprocess.run([
        'tmux', 'bind-key', '-T', 'prefix', 'u', 'run-shell',
        f'printf "GRANT_TUNNEL\\n" >> {ctl_fifo}',
    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    subprocess.run([
        'tmux', 'bind-key', '-T', 'prefix', 'U', 'run-shell',
        f'printf "REVOKE_TUNNEL\\n" >> {ctl_fifo}',
    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    # Ctrl-b X: block all future tunnel requests (no popup, auto-reject).
    # Equivalent to the menu's "Block tunnel requests" entry; bound for
    # parity with the other tunnel-control hotkeys.
    subprocess.run([
        'tmux', 'bind-key', '-T', 'prefix', 'X', 'run-shell',
        f'printf "BLOCK_TUNNEL\\n" >> {ctl_fifo}',
    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    # Dead-pane affordances on the root table:
    #   Enter -- if pane_dead, respawn it (and tell the daemon to
    #            clear the status notice); on a live pane, pass through.
    #   q     -- if pane_dead, end the session for good (SIGINT to the
    #            daemon, same path as Ctrl-b Q); on a live pane, pass
    #            through unchanged so 'q' still reaches the live shell.
    # The remain-on-exit-format option below puts these hints right in
    # the dead-pane text.  Ctrl-b r intentionally NOT bound any more --
    # it falls through to tmux's default refresh-client, which is what
    # tmux users reflexively expect when they press it.
    fifo_arg = _tmux_sq(fifo_path)
    # Use `echo` (always emits a trailing newline) rather than printf with
    # an embedded \n: this run-shell is nested inside an if-shell, so tmux
    # parses its double-quoted argument once before passing it to the
    # shell.  An unquoted "\n" survives that round as "\n" but the shell
    # then strips the backslash (unquoted \n -> n), leaving printf with a
    # literal "RESPAWN_LOCAL:n" and no newline -- which the fifo reader
    # eventually picks up as a chat line and forwards to support.  echo
    # has no escapes to lose.
    respawn_cmd = (
        f'respawn-pane ; '
        f'run-shell "echo RESPAWN_LOCAL: >> {fifo_arg}"'
    )
    end_cmd = f'run-shell "kill -INT {pid} 2>/dev/null"'
    subprocess.run([
        'tmux', 'bind-key', '-T', 'root', 'Enter',
        'if', '-F', '#{pane_dead}', respawn_cmd, 'send-keys Enter',
    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    subprocess.run([
        'tmux', 'bind-key', '-T', 'root', 'q',
        'if', '-F', '#{pane_dead}', end_cmd, 'send-keys q',
    ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    # Ctrl-b C-l (and Ctrl-b L): reconstruct the canonical pane topology.
    # break-pane + join-pane rebuilds the split structure from scratch,
    # so this recovers from arbitrary layout cycling (even-vertical, tiled,
    # etc.) at whatever the current window size happens to be.
    reset_layout = (
        f"tmux if -F #{{window_zoomed_flag}} 'resize-pane -Z' '';"
        # -s names the source pane; -t on break-pane is the destination
        # WINDOW.  The earlier form "break-pane -d -t chat_id" was
        # silently reinterpreted by tmux (chat_id is not a window index),
        # and each iteration consumed a few rows from main instead of
        # restoring a clean 6-row chat.
        f" tmux break-pane -d -s {_tmux_sq(chat_id)};"
        f" tmux join-pane -v -t {_tmux_sq(main_id)} -s {_tmux_sq(chat_id)} -l 6;"
        f" tmux set-option -w pane-border-status top;"
        f" tmux select-pane -t {_tmux_sq(main_id)}"
    )
    for rl_key in ('C-l', 'L'):
        subprocess.run([
            'tmux', 'bind-key', '-T', 'prefix', rl_key, 'run-shell', reset_layout,
        ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    # Ctrl-b Enter: main menu (display-menu).
    menu_fifo = _tmux_sq(fifo_path)
    # "Change tmux prefix..." opens a command-prompt; %1 (the user's
    # input) is substituted by tmux into the run-shell template.  The
    # set-prefix.py helper does the actual work and reports the result
    # via tmux display-message.
    set_prefix_action = (
        f"command-prompt -p 'New tmux prefix (e.g. C-a, M-Space):'"
        f" {shlex.quote(f'run-shell {shlex.quote(set_prefix_path)} %1')}"
    )
    menu_items = [
        '-T', f'#[bold]{_tmux_fmt(sess.session_id)}#[nobold]',
        'Go to chat',              'c', f'run-shell "{toggle_chat}"',
        'Reset layout',            'L', f'run-shell "{reset_layout}"',
        'Grant/unblock tunnel',    'u', f'run-shell "printf GRANT_TUNNEL\\\\n >> {ctl_fifo}"',
        'Revoke tunnel',           'U', f'run-shell "printf REVOKE_TUNNEL\\\\n >> {ctl_fifo}"',
        'Block tunnel requests',   'X', f'run-shell "printf BLOCK_TUNNEL\\\\n >> {ctl_fifo}"',
        '',                        '',  '',
        'Change tmux prefix...',   'p', set_prefix_action,
        '',                        '',  '',
        'End session for good',    'Q', end_session_confirm,
    ]
    # F1 (root) and Ctrl-b Enter (prefix) both open the menu.  F1 is
    # the universal "help/menu" key and works without the prefix; the
    # prefix path stays as backup for tmux-savvy users and for
    # terminals that intercept F1 (some configurations of iTerm,
    # gnome-terminal, etc.).
    for _table, _key in (('root', 'F1'), ('prefix', 'Enter')):
        subprocess.run([
            'tmux', 'bind-key', '-T', _table, _key, 'display-menu',
        ] + menu_items, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def _install_share_status_bar(sess, main_id):
    """Set status-left/style/right and rename the main pane's window.

    rename-window targets the main pane's window explicitly, not the
    implicit current window (which would be the hidden daemon window).
    """
    status_left = (f'[LINBIT {sess._mode_label}]{sess._tunnel_sym}  {sess.session_id}'
                   f'  {sess._prefix_label}: Enter=menu c=chat u=tunnel Q=end session')
    for opt, val in [('status-left', status_left),
                     ('status-style', 'bg=blue,fg=white,bold'),
                     ('status-right', '')]:
        subprocess.run(['tmux', 'set-option', opt, val],
                       stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    subprocess.run(['tmux', 'rename-window', '-t', main_id, sess.session_id],
                   stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def _install_share_zoom_hook(sess, main_id):
    """Install the window-layout-changed hook that tracks Ctrl-b z zoom.

    On zoom:   hide pane-border-status so the PTY height matches the
               layout-reported height (otherwise layout_cell_is_top in
               tmux makes the pane 1 row taller than the broker expects).
    On unzoom: restore pane-border-status top.

    Both transitions also write ZOOM_TOGGLE to the ctl FIFO so the
    daemon can resize the window to match the last consumer RESIZE.

    Re-entry guard (``@tunl-zoom`` window user option): set-option
    pane-border-status unconditionally triggers layout_fix_panes in
    tmux even when the value is unchanged, which fires
    window-layout-changed again.  The guard ensures we only act on
    actual zoom/unzoom transitions.
    """
    subprocess.run(
        ['tmux', 'set-option', '-w', '-t', main_id, '@tunl-zoom', '0'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    zoom_hook_path = os.path.join(sess._session_dir, 'zoom-hook.sh')
    sock_arg = shlex.quote(sess._tmux_sock) if sess._tmux_sock else ''
    ctl_arg = shlex.quote(sess._ctl_fifo)
    tmux_prefix = 'tmux ${S:+-S "$S"}'
    with open(zoom_hook_path, 'w') as fh:
        fh.write(f'#!/bin/sh\n'
                 f'S={sock_arg}\n'
                 f'z=$({tmux_prefix} display-message -p "#{{window_zoomed_flag}}")\n'
                 f'p=$({tmux_prefix} display-message -p "#{{@tunl-zoom}}")\n'
                 f'if [ "$z" = 1 ] && [ "$p" != 1 ]; then\n'
                 f'  {tmux_prefix} set -w @tunl-zoom 1\n'
                 f'  {tmux_prefix} set-option -w pane-border-status off\n'
                 f'  echo ZOOM_TOGGLE >> {ctl_arg}\n'
                 f'elif [ "$z" != 1 ] && [ "$p" != 0 ]; then\n'
                 f'  {tmux_prefix} set -w @tunl-zoom 0\n'
                 f'  {tmux_prefix} set-option -w pane-border-status top\n'
                 f'  echo ZOOM_TOGGLE >> {ctl_arg}\n'
                 f'fi\n')
    # 0o700 owner-only rather than 0o755 (B1.9 in the 2026-05-06 audit).
    os.chmod(zoom_hook_path, 0o700)
    subprocess.run(
        ['tmux', 'set-hook', '-w', '-t', main_id, 'window-layout-changed',
         f'run-shell {_tmux_sq(zoom_hook_path)}'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )


def _install_share_pane_borders(sess, main_id):
    """Enable pane-border-status and install a per-pane label format.

    pane-border-format is a window-level option in older tmux, so
    per-pane -p calls overwrite each other.  Use a single window-level
    conditional format keyed on #{pane_id}: main pane shows the
    SHELL hint, everything else (chat) shows the CHAT hint.

    Enabling pane-border-status consumes one row per pane, which tmux
    compensates for by shrinking the window.  resize-window -A
    restores it to the largest attached client's size.
    """
    subprocess.run(
        ['tmux', 'set-option', '-w', '-t', main_id,
         'pane-border-status', 'top'],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    subprocess.run(
        ['tmux', 'resize-window', '-A', '-t', main_id],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )
    _apply_pane_border_labels(sess, main_id)


def _apply_pane_border_labels(sess, main_id):
    """Render and apply the pane-border-format string for the main window.

    Extracted from _install_share_pane_borders so PREFIX_CHANGED on the
    ctl FIFO can refresh labels mid-session after the prefix changes.
    """
    window_note = ' (window)' if sess._attach_mode else ''
    p = sess._prefix_label
    main_label = (f'#[bold][SHELL]#[nobold]  {p}: Enter=menu  u=tunnel'
                  f'  Q=end session{window_note}')
    chat_label = (f'#[bold][CHAT]#[nobold]  type message + Enter'
                  f'  {p}: Enter=menu  u=tunnel  Q=end session')
    pane_fmt = (
        f'#{{?#{{==:#{{pane_id}},{_tmux_fmt(main_id)}}},{main_label},{chat_label}}}'
    )
    subprocess.run(
        ['tmux', 'set-option', '-w', '-t', main_id,
         'pane-border-format', pane_fmt],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )


def _configure_share_panes(sess, main_id, chat_id):
    """Apply session-wide and per-pane tmux options for the share layout.

    - ``exit-unattached``: kill the server when the last client detaches,
      so a closed session ends the daemon promptly (skip in attach mode
      and when we are running as a daemon under an existing server).
    - ``remain-on-exit`` on main and chat, so a dead shell/chat pane
      stays visible long enough for the pane-died hook to fire.
    - ``@linbit-role`` labels so _discover_share_panes can find them on
      reconnect.
    """
    if not sess._attach_mode and not getattr(sess, '_daemon_mode', False):
        subprocess.run(
            ['tmux', 'set-option', '-g', 'exit-unattached', 'on'],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )

    for pane in (main_id, chat_id):
        subprocess.run(
            ['tmux', 'set-option', '-p', '-t', pane, 'remain-on-exit', 'on'],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )

    # Customize the dead-pane banner on the main shell so the customer
    # sees their options in-place: Enter to respawn, q to end the
    # session for good.  Both keys are bound on the root table by
    # _install_share_keybindings (active only when pane_dead).
    main_remain_fmt = ('Shell exited (status #{pane_dead_status}).'
                       '  Enter = restart shell    q = end session')
    subprocess.run(
        ['tmux', 'set-option', '-p', '-t', main_id,
         'remain-on-exit-format', main_remain_fmt],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )

    for pane, role in ((main_id, 'main'), (chat_id, 'chat')):
        subprocess.run(
            ['tmux', 'set-option', '-p', '-t', pane, '@linbit-role', role],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
        )


def _install_share_cast_recording(sess, main_id):
    """Pipe the main pane's output to an asciinema v2 cast writer.

    ``_choose_cast_path`` picks a persistent location outside the
    session tempdir so the file survives cleanup on exit.  The path
    is also written to ``<session_dir>/cast_path`` so the post-session
    banner and close hook can find it.

    Reads the main pane's current cols x rows for the cast header via
    tmux display-message.  Falls back to 80 x 24 if tmux does not
    return two integers (dead pane, racing respawn, etc.).
    """
    cast_path = _choose_cast_path()
    sess._cast_path = cast_path
    try:
        with open(os.path.join(sess._session_dir, 'cast_path'), 'w') as f:
            f.write(cast_path)
    except OSError:
        pass
    r = subprocess.run(
        ['tmux', 'display-message', '-t', main_id, '-p',
         '#{pane_width} #{pane_height}'],
        stdout=subprocess.PIPE, universal_newlines=True)
    parts = r.stdout.strip().split()
    cast_cols, cast_rows = 80, 24
    if len(parts) >= 2:
        try:
            cast_cols, cast_rows = int(parts[0]), int(parts[1])
        except ValueError:
            pass
    script = os.path.abspath(sys.argv[0])
    pipe_cmd = (f'{shlex.quote(sys.executable)} {shlex.quote(script)}'
                f' --_cast_writer {shlex.quote(cast_path)}'
                f' {cast_cols} {cast_rows}')
    subprocess.run(
        ['tmux', 'pipe-pane', '-o', '-t', main_id, pipe_cmd],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )


def _setup_share_fifos(sess):
    """Create the two share-mode chat FIFOs and record them on sess.

    Returns (outbound_path, inbound_path).  outbound carries PANE_DEAD /
    RESPAWN_LOCAL / customer-typed chat lines to this process; inbound
    carries rendered SCHAT frames to the curses chat pane.  Both are
    0o600 and recreated if stale.
    """
    fifo_path = os.path.join(sess._session_dir, 'chat.fifo')
    try:
        os.unlink(fifo_path)
    except OSError:
        pass
    os.mkfifo(fifo_path, 0o600)
    sess._chat_fifo = fifo_path

    # Inbound FIFO: outer process writes rendered SCHAT lines here; the
    # curses chat pane (--_chat_pane) reads them.  Same permissions.
    chat_in_fifo = os.path.join(sess._session_dir, 'chat.in.fifo')
    try:
        os.unlink(chat_in_fifo)
    except OSError:
        pass
    os.mkfifo(chat_in_fifo, 0o600)
    sess._chat_in_fifo = chat_in_fifo
    return fifo_path, chat_in_fifo


def _launch_in_tmux_share_inner(sess):
    """Set up (or reconnect) the share layout, then run the daemon.

    Two visible panes (main shell + curses chat); the daemon pane is
    broken off to a hidden background window.

    Fresh session: carve main + chat out of the current single-pane
    window.  Reconnect: reuse the existing main pane (customer's
    shell is preserved), respawn the chat pane (its FIFO fd is stale),
    and re-bind all hotkeys to the new daemon PID.
    """
    shell = os.environ.get('SHELL', '/bin/bash')

    fifo_path, chat_in_fifo = _setup_share_fifos(sess)
    _start_share_fifo_reader(sess, fifo_path)

    # Poll the customer's outer tmux client size and report TERMINAL_SIZE
    # to the broker so it can clamp RESIZE to actual screen real estate.
    sess._start_terminal_poller()

    # Chat pane runs our curses chat UI.  It reads inbound messages from
    # chat.in.fifo (SCHAT-framed) and writes outbound plain-text lines
    # to chat.fifo, so _start_share_fifo_reader picks them up and wraps
    # them as CUSTOMER_CHAT on the SSH channel.
    _chat_cmd_args = [
        sys.executable, os.path.abspath(sys.argv[0]),
        '--_chat_pane', sess.session_id, '--name', sess.user,
    ]
    _chat_cmd = ' '.join(shlex.quote(a) for a in _chat_cmd_args)

    main_id, chat_id = _discover_or_create_share_panes(shell, _chat_cmd)

    # --- Common setup (both fresh and reconnect) ---

    # Apply TUNL_PREFIX before any user-visible label or hint is drawn,
    # so status bar / pane borders / cheat sheet all render the same
    # key.  _apply_tmux_prefix may downgrade to 'C-b' if tmux rejects
    # the requested value; reflect the actual choice on sess.
    _applied = _apply_tmux_prefix(sess._prefix_key)
    if _applied != sess._prefix_key:
        sess._prefix_key = _applied
        sess._prefix_label = _tmux_key_display(_applied)

    _configure_share_panes(sess, main_id, chat_id)
    _install_share_pane_borders(sess, main_id)
    _install_share_zoom_hook(sess, main_id)
    _install_share_status_bar(sess, main_id)
    _install_share_keybindings(sess, main_id, chat_id, fifo_path)

    # Focus main pane so customer lands in their shell.
    subprocess.run(['tmux', 'select-pane', '-t', main_id],
                   stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

    sess._panes = {'main': main_id, 'chat': chat_id}
    sess._fixed_overhead = _share_fixed_overhead(main_id)
    # Share mode renders status via the chat pane's pinned header.
    # The daemon pane is hidden; ANSI escape sequences would go nowhere.
    sess._status_is_stdout = False

    # Detect main pane death: pane-died hook writes a sentinel to the chat FIFO
    # so the FIFO reader thread can forward PANE_DEAD to the broker.
    # Use echo (not printf with \n) -- tmux double-quote parsing eats \n.
    # -b runs in background; 2>/dev/null suppresses errors.
    _hook_cmd = f'run-shell -b "echo PANE_DEAD: >> {_tmux_sq(fifo_path)} 2>/dev/null"'
    subprocess.run(
        ['tmux', 'set-hook', '-p', '-t', main_id, 'pane-died', _hook_cmd],
        stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
    )

    _install_share_cast_recording(sess, main_id)

    # Clear connection-phase output before the daemon draws its 3-row layout.
    sys.stdout.write('\x1b[H\x1b[2J')
    sys.stdout.flush()

    sess.run()


def _share_run_direct(session_id, case_num='', token='', user='',
                      restricted=False, tunnel_port=None):
    """No-tmux / no-TTY fallback: connect SSH and run daemon in current process.

    Used when tmux is unavailable (packaging error) or stdout is not a TTY
    (container / CI).  No pane layout; daemon output goes to stdout/stderr.
    """
    sess = ShareSession(session_id, case_num=case_num, token=token,
                        user=user, restricted=restricted,
                        tunnel_port=tunnel_port)
    sess.known_hosts_path = setup_known_hosts(
        RELAY_HOST, RELAY_CA_PUBKEY, dest_dir=sess._session_dir)
    sess._setup_key()
    ok, reason = sess._connect_ssh()
    if not ok:
        print(f'{PREFIX} ERROR: {reason}', file=sys.stderr)
        if _is_auth_error(reason):
            print(
                f'{PREFIX} Session ID not recognized. Check and try again.',
                file=sys.stderr,
            )
        sess.cleanup()
        sys.exit(1)
    sess._ready = True
    sess.run()


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def _usage():
    return (
        f"Usage: {os.path.basename(sys.argv[0])} [--case CASE] [--token TOKEN]"
        " [--user USER] [--restricted] [--tunnel] [--attach] [--debug] [SESSION_ID]\n"
        "\n"
        "  SESSION_ID       Four-word session ID provided by LINBIT support\n"
        "                   (e.g. swift-fox-jumps-barrel).  Required.\n"
        "  --case CASE      Support case number or brief context string.\n"
        "  --token TOKEN    Six-digit pre-registration token.\n"
        "  --user USER      Username for reverse-tunnel SSH access (default: auto-detected\n"
        "                   from $USER/$LOGNAME; use this if auto-detection is wrong).\n"
        "  --restricted     Watch-only: support cannot type in your shell or open a\n"
        "                   reverse tunnel.  Chat and status messages still work.\n"
        "  --tunnel         Pre-open a reverse tunnel at session start.\n"
        "                   Use when full SSH access is agreed at the outset.\n"
        "                   The only mode that works without tmux.\n"
        "                   Default (share mode): support requests a tunnel and a\n"
        "                   confirmation popup lets you approve or deny it.\n"
        "  --attach         Create the linbit-tunl window inside the current tmux\n"
        "                   session instead of starting a dedicated tmux server.\n"
        "                   Automatic when re-run from inside the linbit-tunl tmux.\n"
        "  --debug          Preserve session logs in /tmp/linbit-tunl-*/ after exit.\n"
        "                   Without this flag, logs are only preserved on error.\n"
        "\n"
        "tmux is required for all modes except --tunnel.\n"
        "\n"
        "Environment:\n"
        f"  TUNL_RELAY           relay hostname   (default: {RELAY_HOST})\n"
        f"  TUNL_RELAY_PORT      relay SSH port   (default: {RELAY_PORT})\n"
        f"  TUNL_AUTHKEYS_FILE   authorized_keys  (default: {AUTHKEYS_FILE})\n"
    )


def _cast_writer(path, cols, rows):
    """Read pane output from stdin (via tmux pipe-pane) and write asciinema v2.

    This runs as a child of the tmux server process (not the linbit-tunl
    daemon), so it survives daemon restarts and reconnects.
    """
    import json as _json
    t0 = time.monotonic()
    ts = int(time.time())
    # O_EXCL|O_NOFOLLOW: refuse to write into a pre-existing file or
    # follow a symlink at the cast path.  Mode 0o600 owner-only.
    # The cast filename embeds a per-session timestamp + hostname, so
    # collisions are vanishingly rare; on the off chance one happens
    # we exit rather than overwrite or follow a link (B1.6 in the
    # 2026-05-06 audit).
    try:
        fd = os.open(
            path,
            os.O_WRONLY | os.O_CREAT | os.O_EXCL | os.O_NOFOLLOW,
            0o600,
        )
        fh = os.fdopen(fd, 'w', encoding='utf-8')
    except OSError:
        sys.exit(1)
    header = {
        'version':   2,
        'width':     cols,
        'height':    rows,
        'timestamp': ts,
        'env':       {'TERM': 'xterm-256color'},
    }
    fh.write(_json.dumps(header) + '\n')
    fh.flush()
    stdin = sys.stdin.buffer
    try:
        while True:
            chunk = stdin.read1(4096)
            if not chunk:
                break
            elapsed = round(time.monotonic() - t0, 6)
            text = chunk.decode('utf-8', errors='replace')
            fh.write(_json.dumps([elapsed, 'o', text]) + '\n')
            fh.flush()
    except (OSError, KeyboardInterrupt):
        pass
    finally:
        fh.close()


def _chat_pane_main(session_id: str, display_name: str = ''):
    """Run the curses chat UI inside the customer tmux chat pane.

    The outer Python process owns the SSH connection to the relay and
    translates frames:
      inbound :  broker `CHAT <name_hex> <msg_hex>`  ->  write
                 `SCHAT <name_hex> <HH:MM> <msg_hex>\\n` to chat.in.fifo
      outbound:  chat UI sends `CUSTOMER_CHAT <hex>\\n`; this function
                 unwraps it to plain text + newline and writes to
                 chat.fifo (existing _fifo_reader thread wraps it again
                 as CUSTOMER_CHAT for the SSH channel).
    """
    sdir           = _session_dir(session_id)
    chat_fifo      = os.path.join(sdir, 'chat.fifo')       # out: to outer
    chat_in_fifo   = os.path.join(sdir, 'chat.in.fifo')    # in : from outer

    init('customer', sid=session_id,
         path=os.path.join(sdir, 'trace.jsonl'),
         lastgasp_path=os.path.join(sdir, 'lastgasp.jsonl'))
    trace('chat_pane_begin', sid=session_id, name=display_name)

    # Open inbound FIFO for reading; blocks until the outer opens it for
    # writing.  Outer opens chat.in.fifo lazily per frame so this open
    # returns as soon as the first frame arrives.  To avoid the UI
    # blocking on startup we use O_RDWR so the open returns immediately
    # regardless of a writer (Linux-specific but portable enough here).
    try:
        in_fd = os.open(chat_in_fifo, os.O_RDWR)
    except OSError as e:
        trace('chat_pane_open_failed', err=str(e))
        print(f'[tunl chat] could not open inbound FIFO: {e}',
              file=sys.stderr)
        sys.exit(1)
    in_fh = os.fdopen(in_fd, 'rb', buffering=0)

    # Outbound FIFO (plain text, one line per message).  O_RDWR avoids
    # blocking on startup when _fifo_reader in the outer isn't yet
    # opened -- symmetric with chat.in.fifo above.
    try:
        out_fd = os.open(chat_fifo, os.O_RDWR)
    except OSError as e:
        trace('chat_pane_out_open_failed', err=str(e))
        print(f'[tunl chat] could not open outbound FIFO: {e}',
              file=sys.stderr)
        sys.exit(1)
    out_fh = os.fdopen(out_fd, 'wb', buffering=0)

    # The outer process writes v4 frames to chat.in.fifo (see
    # ShareSession._write_chat_in_fifo and _emit_*).  Demux them here
    # so wire_parse sees one CTRL payload at a time.
    _frame_stream = FrameStream(read_some_from_fileobj(in_fh))

    def _read_line():
        while True:
            try:
                type_code, payload = _frame_stream.read_message()
            except (EOFError, WireProtocolError):
                return None
            if type_code == TYPE_CTRL:
                return payload
            # DATA / HELLO / ERROR are not used on this leg; ignore.

    ctl_fifo_path = os.path.join(sdir, 'ctl.fifo')

    def _send_bytes(data):
        """Translate outbound v4 CTRL frames from the chat UI into
        actions for the outer (producer) process.

        - CUSTOMER_CHAT: unwrap arg, append plain-text line to
          chat.fifo; the outer's _fifo_reader re-wraps it as a
          CUSTOMER_CHAT frame to the broker.
        - CUSTOMER_NICK: append `NICK <name>\\n` to ctl.fifo; the
          outer's _ctl_reader translates that into a CUSTOMER_NICK
          frame to the broker.
        """
        try:
            buf = bytearray(data)
            def _src(n, _b=buf):
                chunk = bytes(_b[:n])
                del _b[:len(chunk)]
                return chunk
            tmp = FrameStream(_src)
            type_code, payload = tmp.read_message()
            if type_code != TYPE_CTRL:
                out_fh.write(data)  # fallback: passthrough
                return
            line, args = decode_ctrl(payload)
            verb = (line.split() or [''])[0]
            if verb == 'CUSTOMER_CHAT' and args:
                out_fh.write(args[0] + b'\n')
                return
            if verb == 'CUSTOMER_NICK':
                nick = (args[0] if args else b'').decode(
                    'utf-8', errors='replace').strip()
                # Sanity: ctl.fifo is line-oriented, no whitespace
                # allowed in the argument.  /nick already enforces this.
                if nick and not any(c.isspace() for c in nick):
                    with open(ctl_fifo_path, 'a') as fh:
                        fh.write(f'NICK {nick}\n')
                return
            out_fh.write(data)
        except (OSError, ValueError, WireProtocolError):
            pass

    def _session_info_handler(payload, pinned, ephemeral):
        """Map the SESSION_INFO frames synthesised by the outer process
        (see ShareSession._emit_status_info) to the customer's single
        pinned row.

        L: connection state ('● relay: connected 14:30')
        C: mode ('share+tunnel:30042' or 'share')
        R: session id (may be dropped on narrow terminals)
        """
        left = payload.get('connection', '') or '(starting)'
        # Header string usually contains mode / session bits; use it as
        # center if present, else fall back to session_id.
        center = payload.get('mode') or payload.get('header') or ''
        right  = payload.get('session_id', '') or ''
        pinned.set_row1(left=left, center=center, right=right)

    def _register_customer_extras(cmds, history):
        def _ctl_write(verb):
            try:
                with open(ctl_fifo_path, 'a') as fh:
                    fh.write(verb + '\n')
            except OSError as e:
                return {'kind': 'local', 'ts': now_hhmm(),
                        'body': f'/{verb.lower()} failed: {e!r}'}
            return None

        def _brb(args):
            return _ctl_write('BRB_ON')

        def _back(args):
            return _ctl_write('BRB_OFF')

        cmds.register('brb',  _brb,  'be right back -- freeze support keystrokes')
        cmds.register('back', _back, 'restore support keystrokes')

    try:
        run_chat_ui(_read_line, _send_bytes, display_name,
                    wire_out_kind='customer_chat',
                    send_name=False,
                    local_echo_on_send=True,
                    local_echo_who='customer',
                    pinned_rows=1,
                    session_info_handler=_session_info_handler,
                    mention_tokens=build_mention_tokens(
                        'customer', display_name, CUSTOMER_COMPANY),
                    session_id=session_id,
                    # If TUNL_CUSTOMER_COMPANY is set, use the first
                    # significant word as the local-echo label (e.g.
                    # "Acme" instead of "customer").  When neither the
                    # company name nor an explicit /nick is available,
                    # "me" reads better than the literal "customer" --
                    # the customer is reading their own messages.
                    customer_nick=_first_significant_word(CUSTOMER_COMPANY) or 'me',
                    extra_commands=_register_customer_extras)
    finally:
        trace('chat_pane_exit')
        try:
            in_fh.close()
        except OSError:
            pass
        try:
            out_fh.close()
        except OSError:
            pass


def main(argv=None):
    os.umask(0o077)

    if argv is None:
        argv = sys.argv[1:]

    # Internal dispatch: running inside a tmux window created by outer script.
    if argv and argv[0] == '--_inner':
        if len(argv) < 3:
            print('Internal error: --_inner requires SESSION_ID ERR_PATH',
                  file=sys.stderr)
            sys.exit(1)
        _inner_main(argv[1], argv[2], argv[3:])
        return

    if argv and argv[0] == '--_cast_writer':
        if len(argv) < 4:
            print('Internal error: --_cast_writer PATH COLS ROWS',
                  file=sys.stderr)
            sys.exit(1)
        try:
            cols, rows = int(argv[2]), int(argv[3])
        except ValueError:
            print(f'--_cast_writer: cols/rows not integers: '
                  f'{argv[2]!r} {argv[3]!r}', file=sys.stderr)
            sys.exit(1)
        _cast_writer(argv[1], cols, rows)
        return

    # Internal: curses chat pane for the customer-side chat window.
    if argv and argv[0] == '--_chat_pane':
        if len(argv) < 2:
            print('Internal error: --_chat_pane requires SESSION_ID',
                  file=sys.stderr)
            sys.exit(1)
        sid = argv[1]
        name = ''
        rest = argv[2:]
        i = 0
        while i < len(rest):
            if rest[i] == '--name' and i + 1 < len(rest):
                i += 1; name = rest[i]
            i += 1
        _chat_pane_main(sid, name)
        return

    case_num    = ""
    token       = ""
    user        = ""
    session_id  = None
    target      = None
    tunnel_mode  = False   # default: share mode
    restricted   = False
    attach_mode  = False   # --attach: create window in current tmux
    debug_mode   = False   # --debug: preserve session dir, verbose logging

    i = 0
    while i < len(argv):
        arg = argv[i]
        if arg == '--case':
            i += 1
            if i >= len(argv):
                print("--case requires an argument", file=sys.stderr)
                sys.exit(1)
            case_num = argv[i]
        elif arg == '--token':
            i += 1
            if i >= len(argv):
                print("--token requires an argument", file=sys.stderr)
                sys.exit(1)
            token = argv[i]
        elif arg == '--user':
            i += 1
            if i >= len(argv):
                print("--user requires an argument", file=sys.stderr)
                sys.exit(1)
            user = argv[i]
        elif arg == '--target':
            i += 1
            if i >= len(argv):
                print("--target requires an argument", file=sys.stderr)
                sys.exit(1)
            target = argv[i]
        elif arg == '--tunnel':
            tunnel_mode = True
        elif arg == '--restricted':
            restricted = True
        elif arg == '--attach':
            attach_mode = True
        elif arg == '--debug':
            debug_mode = True
        elif arg in ('--help', '-h'):
            print(_usage(), end='')
            sys.exit(0)
        elif arg in ('--version', '-V'):
            commit = f' ({GIT_COMMIT})' if GIT_COMMIT != 'dev' else ''
            print(f'{os.path.basename(sys.argv[0])} {VERSION}{commit}')
            sys.exit(0)
        elif arg.startswith('--'):
            print(f"Unknown option: {arg}", file=sys.stderr)
            sys.exit(1)
        else:
            if session_id is not None:
                print(f"Unexpected argument: {arg}", file=sys.stderr)
                sys.exit(1)
            session_id = arg
        i += 1

    if session_id is None:
        print(
            f"{PREFIX} ERROR: No session ID provided.\n"
            f"{PREFIX} Please contact LINBIT support and ask them to create\n"
            f"{PREFIX} a session ID for you, then run:\n"
            f"{PREFIX}   {os.path.basename(sys.argv[0])} <session-id>",
            file=sys.stderr,
        )
        sys.exit(1)
    elif not validate_session_id(session_id):
        print(
            f"{PREFIX} ERROR: Invalid session ID '{session_id}'. "
            "Must be four lowercase words separated by hyphens.",
            file=sys.stderr,
        )
        sys.exit(1)

    if target:
        global TARGET_HOST, TARGET_PORT
        parts = target.rsplit(':', 1)
        TARGET_HOST = parts[0]
        TARGET_PORT = int(parts[1]) if len(parts) > 1 else 22

    tunnel_port = pick_port() if tunnel_mode else None

    have_tmux = _tmux_is_available()
    tmux_ver  = _tmux_version() if have_tmux else None
    tmux_ok   = tmux_ver is not None and tmux_ver >= _TMUX_MIN_SHARE
    have_tty  = sys.stdout.isatty()

    # Auto-attach when re-run from inside our own dedicated tmux socket.
    if not attach_mode and _tmux_at_own_socket(session_id):
        attach_mode = True

    if attach_mode:
        if not os.environ.get('TMUX'):
            print(f'{PREFIX} ERROR: --attach requires running inside tmux.',
                  file=sys.stderr)
            sys.exit(1)
        _launch_in_tmux_attach(session_id, case_num=case_num, token=token,
                               user=user, restricted=restricted,
                               tunnel_port=tunnel_port,
                               debug=debug_mode)
    elif tmux_ok and have_tty:
        # Normal path: tmux session with full pane layout.
        _launch_in_tmux_share_outer(session_id, case_num=case_num, token=token,
                                    user=user, restricted=restricted,
                                    tunnel_port=tunnel_port,
                                    debug=debug_mode)
    elif tmux_ok and not have_tty:
        # No TTY (container, systemd, cron): detached tmux session.
        _launch_in_tmux_share_detached(session_id, case_num=case_num,
                                       token=token, user=user,
                                       restricted=restricted,
                                       tunnel_port=tunnel_port,
                                       debug=debug_mode)
    elif tunnel_mode:
        # Tunnel-only without tmux (or without a TTY, or tmux too old):
        # daemon runs directly.  Status goes to stderr; no shared shell
        # or chat pane.
        if not have_tmux:
            print(f'tmux not found; running in tunnel mode without shared shell.',
                  file=sys.stderr)
        elif not tmux_ok:
            have = '.'.join(str(x) for x in tmux_ver) if tmux_ver else '?'
            need = '.'.join(str(x) for x in _TMUX_MIN_SHARE)
            print(f'tmux {have} is too old (need >= {need}); '
                  f'running in tunnel mode without shared shell.',
                  file=sys.stderr)
        else:
            print(f'No TTY; running in tunnel mode without shared shell.',
                  file=sys.stderr)
        _share_run_direct(session_id, case_num=case_num, token=token,
                          user=user, restricted=False,
                          tunnel_port=tunnel_port)
    else:
        # Share/restricted modes all require tmux >= _TMUX_MIN_SHARE.
        mode_str = 'restricted' if restricted else 'share'
        if not have_tmux:
            reason = 'tmux not found'
        elif not tmux_ok:
            have = '.'.join(str(x) for x in tmux_ver) if tmux_ver else '?'
            need = '.'.join(str(x) for x in _TMUX_MIN_SHARE)
            reason = f'tmux {have} too old, need >= {need}'
        else:
            reason = 'no TTY'
        print(
            f'{PREFIX} ERROR: {mode_str} mode requires tmux ({reason}).',
            file=sys.stderr,
        )
        print(
            f'{PREFIX} Install a newer tmux, or start a tunnel-only session instead:',
            file=sys.stderr,
        )
        print(
            f'{PREFIX}   {os.path.basename(sys.argv[0])} --tunnel {session_id}',
            file=sys.stderr,
        )
        sys.exit(1)


if __name__ == '__main__':
    main()
