""" Rocket.Chat Platform Adapter for Hermes Agent. Plugin-based gateway adapter connecting to a self-hosted Rocket.Chat server. Uses WebSocket/DDP for real-time incoming messages + presence, and REST API for sending replies. Zero external dependencies — stdlib only. """ import asyncio import json import logging import os import queue import socket import ssl import struct import threading import time import urllib.request from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) try: from gateway.platforms.base import ( BasePlatformAdapter, SendResult, MessageEvent, MessageType, ) from gateway.session import SessionSource from gateway.config import PlatformConfig, Platform except ImportError: # Fallback for standalone test / development outside gateway BasePlatformAdapter = object SendResult = dict MessageEvent = dict MessageType = object SessionSource = object PlatformConfig = object Platform = str def _get_config_str(config: PlatformConfig, key: str, env_var: str, default: str = "") -> str: extra = getattr(config, "extra", {}) or {} return os.getenv(env_var) or extra.get(key, default) class RocketChatREST: def __init__(self, base_url: str): self.base_url = base_url.rstrip("/") self.auth_token: Optional[str] = None self.user_id: Optional[str] = None def _request(self, method: str, endpoint: str, headers: Optional[dict] = None, data: Optional[dict] = None): url = f"{self.base_url}{endpoint}" req_headers = dict(headers) if headers else {} body = json.dumps(data).encode("utf-8") if data else None if body: req_headers["Content-Type"] = "application/json" req = urllib.request.Request(url, data=body, headers=req_headers, method=method) try: with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode("utf-8")) except Exception as e: logger.debug("REST %s %s failed: %s", method, endpoint, e) return None def login(self, user: str, password: str) -> bool: r = self._request("POST", "/api/v1/login", data={"user": user, "password": password}) if r and r.get("status") == "success": self.auth_token = r["data"]["authToken"] self.user_id = r["data"]["userId"] return True return False def get_rooms(self) -> List[dict]: if not self.auth_token: return [] r = self._request("GET", "/api/v1/rooms.get", headers={ "X-Auth-Token": self.auth_token, "X-User-Id": self.user_id, }) if not r or not r.get("success"): return [] rooms = [] for rm in r.get("update", []): t = rm.get("t", "c") rid = rm["_id"] if t == "d": usernames = rm.get("usernames", []) other = [u for u in usernames if u != os.getenv("ROCKETCHAT_USER", "")] name = "DM mit " + (", ".join(other) if other else "unbekannt") else: name = rm.get("name", "unnamed") rooms.append({"id": rid, "name": name, "type": t}) return rooms def post_message(self, room_id: str, text: str, attachments: Optional[List[dict]] = None) -> bool: if not self.auth_token: return False payload = {"roomId": room_id, "text": text} if attachments: payload["attachments"] = attachments r = self._request("POST", "/api/v1/chat.postMessage", headers={ "X-Auth-Token": self.auth_token, "X-User-Id": self.user_id, "Content-Type": "application/json", }, data=payload) return bool(r and r.get("success")) class DDPClient: PING_INTERVAL = 8.0 RECONNECT_BACKOFFS = [5, 10, 20, 30, 60, 120] def __init__(self, host: str, on_message_cb=None, on_connected_cb=None): self.host = host self._on_message = on_message_cb self._on_connected = on_connected_cb self.sock: Optional[socket.socket] = None self.connected = False self._running = False self._reader_thread: Optional[threading.Thread] = None self._write_lock = threading.Lock() self._msg_queue: queue.Queue = queue.Queue() self._next_sub_id = 1 self._sub_rooms: Dict[str, str] = {} self._session: Optional[str] = None self._reconnect_attempt = 0 def _create_socket(self) -> socket.socket: ctx = ssl.create_default_context() raw = socket.create_connection((self.host, 443), timeout=10) return ctx.wrap_socket(raw, server_hostname=self.host) def _handshake(self, s: socket.socket) -> bool: import base64 key = base64.b64encode(os.urandom(16)).decode() hs = ( f"GET /websocket HTTP/1.1\r\n" f"Host: {self.host}\r\n" f"Upgrade: websocket\r\n" f"Connection: Upgrade\r\n" f"Sec-WebSocket-Key: {key}\r\n" f"Sec-WebSocket-Version: 13\r\n" f"Origin: https://{self.host}\r\n\r\n" ) s.sendall(hs.encode()) resp = b"" while b"\r\n\r\n" not in resp: chunk = s.recv(1024) if not chunk: break resp += chunk return b"101 Switching Protocols" in resp def _send_frame(self, data: bytes, opcode: int = 0x01, mask: bool = True) -> None: if not self.sock: return length = len(data) if length < 126: header = struct.pack("!BB", 0x80 | opcode, (0x80 if mask else 0x00) | length) elif length < 65536: header = struct.pack("!BBH", 0x80 | opcode, (0x80 if mask else 0x00) | 126, length) else: header = struct.pack("!BBQ", 0x80 | opcode, (0x80 if mask else 0x00) | 127, length) frame = header if mask: masking = os.urandom(4) masked = bytes(b ^ masking[i % 4] for i, b in enumerate(data)) frame += masking + masked else: frame += data with self._write_lock: self.sock.sendall(frame) def _send_json(self, obj: dict) -> None: self._send_frame(json.dumps(obj).encode("utf-8"), opcode=0x01, mask=True) def _send_raw_pong(self, payload: bytes) -> None: self._send_frame(payload, opcode=0x0A, mask=True) def _reader_loop(self): import select self.sock.settimeout(1.0) last_ping = time.time() while self._running: try: if time.time() - last_ping >= self.PING_INTERVAL: try: self._send_json({"msg": "ping"}) last_ping = time.time() except Exception: break readable, _, _ = select.select([self.sock], [], [], 1.0) if not readable: continue hdr = self.sock.recv(2) if not hdr: break fin = (hdr[0] & 0x80) != 0 opcode = hdr[0] & 0x0F masked = (hdr[1] & 0x80) != 0 length = hdr[1] & 0x7F if length == 126: length = struct.unpack("!H", self.sock.recv(2))[0] elif length == 127: length = struct.unpack("!Q", self.sock.recv(8))[0] payload = b"" remaining = length while remaining > 0: chunk = self.sock.recv(min(4096, remaining)) if not chunk: break payload += chunk remaining -= len(chunk) if len(payload) < length: break if masked: mask = payload[:4] payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload[4:])) if opcode == 0x09: self._send_raw_pong(payload) elif opcode == 0x01 and payload: try: data = json.loads(payload.decode("utf-8")) msg_type = data.get("msg", "") if msg_type == "connected": self.connected = True self._session = data.get("session") if self._on_connected: self._on_connected() elif msg_type == "ping": self._send_json({"msg": "pong"}) elif msg_type == "changed" and data.get("collection") == "stream-room-messages": fields = data.get("fields", {}) args = fields.get("args", []) if args and isinstance(args, list): self._msg_queue.put(args[0]) except Exception: pass elif opcode == 0x08: break except Exception: break self.connected = False logger.debug("DDP reader thread exited") def connect_and_login(self, token: str) -> bool: try: self.sock = self._create_socket() if not self._handshake(self.sock): return False self._running = True self._reader_thread = threading.Thread(target=self._reader_loop, daemon=True) self._reader_thread.start() time.sleep(0.3) self._send_json({"msg": "connect", "version": "1", "support": ["1", "pre2", "pre1"]}) for _ in range(50): if self.connected: break time.sleep(0.1) if not self.connected: self.disconnect() return False time.sleep(0.2) self._send_json({"msg": "method", "method": "login", "params": [{"resume": token}], "id": "ddplogin"}) time.sleep(0.5) return True except Exception as e: logger.error("DDP connect failed: %s", e) self.disconnect() return False def subscribe_room(self, room_id: str) -> Optional[str]: if not self.connected: return None sub_id = f"sub{self._next_sub_id}" self._next_sub_id += 1 # CRITICAL: Rocket.Chat V5+ requires {"token": "true"} as second param, # otherwise subscribe succeeds but NO messages ever stream. self._send_json({ "msg": "sub", "name": "stream-room-messages", "params": [room_id, {"token": "true"}], "id": sub_id }) self._sub_rooms[room_id] = sub_id return sub_id def set_status(self, status: str) -> bool: if not self.connected: return False method = f"UserPresence:{status}" self._send_json({"msg": "method", "method": method, "params": [], "id": f"st{int(time.time()*1000)}"}) return True def disconnect(self): self._running = False if self.sock: try: self.sock.close() except Exception: pass self.sock = None self.connected = False self._sub_rooms.clear() def reconnect_and_restore(self, token: str) -> bool: self.disconnect() time.sleep(1) old_subs = list(self._sub_rooms.keys()) self._sub_rooms.clear() if self.connect_and_login(token): for rid in old_subs: self.subscribe_room(rid) return True return False class RocketChatAdapter(BasePlatformAdapter): def __init__(self, config: PlatformConfig): super().__init__(config=config, platform=Platform("rocketchat")) self.base_url = _get_config_str(config, "base_url", "ROCKETCHAT_BASE_URL") self.user = _get_config_str(config, "user", "ROCKETCHAT_USER") self.password = _get_config_str(config, "password", "ROCKETCHAT_PASSWORD") self.rest = RocketChatREST(self.base_url) self.ddp: Optional[DDPClient] = None self._auth_token: Optional[str] = None self._user_id: Optional[str] = None self._rooms: List[dict] = [] self._subscribed_rooms: set = set() self._listen_task: Optional[asyncio.Task] = None self._status_timer: Optional[threading.Timer] = None self._last_activity = 0.0 self._shutdown_event = threading.Event() self._room_info: Dict[str, dict] = {} async def connect(self) -> bool: if not self.base_url or not self.user or not self.password: logger.error("[rocketchat] Missing configuration") return False if not self.rest.login(self.user, self.password): logger.error("[rocketchat] REST login failed") return False self._auth_token = self.rest.auth_token self._user_id = self.rest.user_id host = self.base_url.replace("https://", "").replace("http://", "").split("/")[0] self.ddp = DDPClient( host=host, on_connected_cb=lambda: self.ddp.set_status("away") if self.ddp else None, ) if not self.ddp.connect_and_login(self._auth_token): logger.error("[rocketchat] DDP connect/login failed") return False self._rooms = self.rest.get_rooms() for room in self._rooms: sub_id = self.ddp.subscribe_room(room["id"]) if sub_id: self._subscribed_rooms.add(room["id"]) self._room_info[room["id"]] = {"name": room["name"], "type": room["type"]} logger.info("[rocketchat] Subscribed to %s", room["name"]) self._mark_connected() self._listen_task = asyncio.create_task(self._listen_loop()) self._last_activity = time.time() self._schedule_idle_timer() logger.info("[rocketchat] Connected to %s", self.base_url) return True async def disconnect(self) -> None: self._mark_disconnected() self._shutdown_event.set() if self._status_timer: self._status_timer.cancel() if self._listen_task and not self._listen_task.done(): self._listen_task.cancel() try: await self._listen_task except asyncio.CancelledError: pass if self.ddp: self.ddp.disconnect() self.ddp = None logger.info("[rocketchat] Disconnected") async def send(self, chat_id: str, content: str, reply_to: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> SendResult: if not self.rest.auth_token: return SendResult(success=False, error="Not authenticated") # Handle interaction templates via metadata attachments = None if metadata and metadata.get("template"): from . import _TEMPLATE_MAP, _build_custom_buttons template_name = metadata.get("template") question_id = metadata.get("question_id", "") if template_name == "custom" and metadata.get("custom_buttons"): buttons = _build_custom_buttons(metadata["custom_buttons"], prefix=question_id) attachments = [{ "color": "#1d74f5", "title": metadata.get("button_title", "Wähle eine Option:"), "actions": buttons, }] elif template_name in _TEMPLATE_MAP: tmpl = _TEMPLATE_MAP[template_name] attachment = dict(tmpl.get("attachment", {})) raw_buttons = tmpl.get("buttons", []) buttons = [] for btn in raw_buttons: msg_payload = btn.get("msg", "") if question_id: msg_payload = f"{question_id}::{msg_payload}" buttons.append({ "type": "button", "text": btn.get("text", "?"), "msg": msg_payload, "msg_in_chat_window": True, "style": btn.get("style", "default"), }) attachment["actions"] = buttons attachments = [attachment] success = self.rest.post_message(chat_id, content, attachments=attachments) if success: return SendResult(success=True, message_id=str(int(time.time() * 1000))) return SendResult(success=False, error="chat.postMessage failed") async def send_typing(self, chat_id: str, metadata=None) -> None: pass async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: info = self._room_info.get(chat_id, {}) return {"name": info.get("name", chat_id), "type": "group" if info.get("type") in ("c", "p") else "dm"} async def _listen_loop(self): loop = asyncio.get_running_loop() while self._running and not self._shutdown_event.is_set(): try: msg_obj = await loop.run_in_executor(None, self._ddp_queue_get, 1.0) if msg_obj is None: if self.ddp and not self.ddp.connected: logger.warning("[rocketchat] DDP disconnected, reconnecting...") if self._auth_token and self.ddp.reconnect_and_restore(self._auth_token): logger.info("[rocketchat] Reconnected") else: await asyncio.sleep(5) continue await self._process_incoming(msg_obj) except asyncio.CancelledError: break except Exception as e: logger.error("[rocketchat] Listener error: %s", e) await asyncio.sleep(1) def _ddp_queue_get(self, timeout: float): if not self.ddp: return None try: return self.ddp._msg_queue.get(timeout=timeout) except queue.Empty: return None async def _process_incoming(self, msg_obj: dict): rid = msg_obj.get("rid") msg_text = msg_obj.get("msg", "") msg_id = msg_obj.get("_id", "") user_info = msg_obj.get("u", {}) sender_id = user_info.get("_id", "") sender_name = user_info.get("username", "unknown") if sender_id == self._user_id: return if not rid or not msg_text: return self._last_activity = time.time() self._go_online() room_name = self._room_info.get(rid, {}).get("name", rid) source = self.build_source( chat_id=rid, chat_name=room_name, chat_type="group" if self._room_info.get(rid, {}).get("type") in ("c", "p") else "dm", user_id=sender_id, user_name=sender_name, ) event = MessageEvent( text=msg_text, message_type=MessageType.TEXT, source=source, raw_message=msg_obj, message_id=msg_id, ) await self.handle_message(event) def _go_online(self): if self.ddp and self.ddp.connected: self.ddp.set_status("online") self._schedule_idle_timer() def _go_idle(self): idle = time.time() - self._last_activity if idle >= 900 and self.ddp and self.ddp.connected: self.ddp.set_status("away") def _schedule_idle_timer(self): if self._status_timer: self._status_timer.cancel() self._status_timer = threading.Timer(900, self._go_idle) self._status_timer.daemon = True self._status_timer.start()