diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e6058f8 --- /dev/null +++ b/__init__.py @@ -0,0 +1,123 @@ +""" +Rocket.Chat Platform Plugin for Hermes Agent. + +Registers a gateway platform adapter that connects to a self-hosted +Rocket.Chat server via DDP/WebSocket (real-time incoming messages + presence) +and REST API (outgoing replies). +""" + +import logging +import os + +logger = logging.getLogger(__name__) + +# ── Interaction Templates (source of truth — zero yaml dependency) ── +_TEMPLATE_MAP = { + "yes_no": { + "label": "Ja/Nein", + "description": "Binary confirmation", + "attachment": {"color": "#1d74f5", "title": "Bitte bestätigen:"}, + "buttons": [ + {"text": "✅ Ja", "msg": "ja", "style": "primary"}, + {"text": "❌ Nein", "msg": "nein", "style": "danger"}, + ], + }, + "confirm_cancel": { + "label": "Bestätigen/Abbrechen", + "description": "Safe action with abort", + "attachment": {"color": "#1d74f5", "title": "Aktion bestätigen:"}, + "buttons": [ + {"text": "✅ Bestätigen", "msg": "bestätigen", "style": "primary"}, + {"text": "🚫 Abbrechen", "msg": "abbrechen", "style": "danger"}, + ], + }, + "ok": { + "label": "OK", + "description": "Single acknowledgement", + "attachment": {"color": "#1d74f5", "title": "Hinweis:"}, + "buttons": [ + {"text": "👍 OK", "msg": "ok", "style": "primary"}, + ], + }, + "multi_choice_3": { + "label": "3 Optionen", + "description": "Three-option selection", + "attachment": {"color": "#1d74f5", "title": "Wähle eine Option:"}, + "buttons": [ + {"text": "1._Links", "msg": "links", "style": "default"}, + {"text": "2._Rechts", "msg": "rechts", "style": "default"}, + {"text": "3._Zwei_Runden", "msg": "zwei_runden", "style": "default"}, + ], + }, +} + + +def _build_custom_buttons(labels: list, prefix: str = "") -> list: + """Build Rocket.Chat action buttons from plain label strings. + + Rocket.Chat UI truncates button text after ~11-12 chars. + Use short labels (emoji+text without spaces, or numbered refs). + """ + buttons = [] + for idx, label in enumerate(labels): + # Sanitize label for button text (keep short) + display = label.strip() + # Payload may carry question_id prefix for correlation + msg_payload = display + if prefix: + msg_payload = f"{prefix}::{msg_payload}" + buttons.append({ + "type": "button", + "text": display, + "msg": msg_payload, + "msg_in_chat_window": True, + "style": "default", + }) + return buttons + + +def check_requirements(): + """Rocket.Chat adapter uses only stdlib (urllib, socket, ssl, threading).""" + return True + + +def validate_config(config) -> bool: + """Check whether the platform is properly configured.""" + extra = getattr(config, "extra", {}) or {} + base_url = os.getenv("ROCKETCHAT_BASE_URL") or extra.get("base_url", "") + user = os.getenv("ROCKETCHAT_USER") or extra.get("user", "") + password = os.getenv("ROCKETCHAT_PASSWORD") or extra.get("password", "") + return bool(base_url and user and password) + + +def is_connected(config) -> bool: + """Check whether Rocket.Chat is configured (env or config.yaml).""" + return validate_config(config) + + +def register(ctx): + """Plugin entry point — called by the Hermes plugin system at startup.""" + from .adapter import RocketChatAdapter # lazy import avoids circular deps + + ctx.register_platform( + name="rocketchat", + label="Rocket.Chat", + adapter_factory=lambda cfg: RocketChatAdapter(cfg), + check_fn=check_requirements, + validate_config=validate_config, + is_connected=is_connected, + required_env=["ROCKETCHAT_BASE_URL", "ROCKETCHAT_USER", "ROCKETCHAT_PASSWORD"], + install_hint="No extra packages needed (stdlib only)", + allowed_users_env="ROCKETCHAT_ALLOWED_USERS", + allow_all_env="ROCKETCHAT_ALLOW_ALL_USERS", + max_message_length=4000, + emoji="🚀", + pii_safe=False, + allow_update_command=True, + platform_hint=( + "You are chatting via Rocket.Chat. You support markdown formatting: " + "**bold**, *italic*, `inline code`, ```code blocks```, and [links](url). " + "Messages can be up to ~4000 characters. You have a green online dot " + "via DDP presence. Use professional but friendly tone." + ), + ) diff --git a/adapter.py b/adapter.py new file mode 100644 index 0000000..d719936 --- /dev/null +++ b/adapter.py @@ -0,0 +1,512 @@ +""" +Rocket.Chat Platform Adapter for Hermes Agent. + +Plugin-based gateway adapter connecting to a self-hosted Rocket.Chat server. +Uses WebSocket/DDP for real-time incoming messages + presence, +and REST API for sending replies. + +Zero external dependencies — stdlib only. +""" + +import asyncio +import json +import logging +import os +import queue +import socket +import ssl +import struct +import threading +import time +import urllib.request +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + +try: + from gateway.platforms.base import ( + BasePlatformAdapter, + SendResult, + MessageEvent, + MessageType, + ) + from gateway.session import SessionSource + from gateway.config import PlatformConfig, Platform +except ImportError: + # Fallback for standalone test / development outside gateway + BasePlatformAdapter = object + SendResult = dict + MessageEvent = dict + MessageType = object + SessionSource = object + PlatformConfig = object + Platform = str + + +def _get_config_str(config: PlatformConfig, key: str, env_var: str, default: str = "") -> str: + extra = getattr(config, "extra", {}) or {} + return os.getenv(env_var) or extra.get(key, default) + + +class RocketChatREST: + def __init__(self, base_url: str): + self.base_url = base_url.rstrip("/") + self.auth_token: Optional[str] = None + self.user_id: Optional[str] = None + + def _request(self, method: str, endpoint: str, headers: Optional[dict] = None, data: Optional[dict] = None): + url = f"{self.base_url}{endpoint}" + req_headers = dict(headers) if headers else {} + body = json.dumps(data).encode("utf-8") if data else None + if body: + req_headers["Content-Type"] = "application/json" + req = urllib.request.Request(url, data=body, headers=req_headers, method=method) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode("utf-8")) + except Exception as e: + logger.debug("REST %s %s failed: %s", method, endpoint, e) + return None + + def login(self, user: str, password: str) -> bool: + r = self._request("POST", "/api/v1/login", data={"user": user, "password": password}) + if r and r.get("status") == "success": + self.auth_token = r["data"]["authToken"] + self.user_id = r["data"]["userId"] + return True + return False + + def get_rooms(self) -> List[dict]: + if not self.auth_token: + return [] + r = self._request("GET", "/api/v1/rooms.get", headers={ + "X-Auth-Token": self.auth_token, + "X-User-Id": self.user_id, + }) + if not r or not r.get("success"): + return [] + rooms = [] + for rm in r.get("update", []): + t = rm.get("t", "c") + rid = rm["_id"] + if t == "d": + usernames = rm.get("usernames", []) + other = [u for u in usernames if u != os.getenv("ROCKETCHAT_USER", "")] + name = "DM mit " + (", ".join(other) if other else "unbekannt") + else: + name = rm.get("name", "unnamed") + rooms.append({"id": rid, "name": name, "type": t}) + return rooms + + def post_message(self, room_id: str, text: str, attachments: Optional[List[dict]] = None) -> bool: + if not self.auth_token: + return False + payload = {"roomId": room_id, "text": text} + if attachments: + payload["attachments"] = attachments + r = self._request("POST", "/api/v1/chat.postMessage", headers={ + "X-Auth-Token": self.auth_token, + "X-User-Id": self.user_id, + "Content-Type": "application/json", + }, data=payload) + return bool(r and r.get("success")) + + +class DDPClient: + PING_INTERVAL = 8.0 + RECONNECT_BACKOFFS = [5, 10, 20, 30, 60, 120] + + def __init__(self, host: str, on_message_cb=None, on_connected_cb=None): + self.host = host + self._on_message = on_message_cb + self._on_connected = on_connected_cb + self.sock: Optional[socket.socket] = None + self.connected = False + self._running = False + self._reader_thread: Optional[threading.Thread] = None + self._write_lock = threading.Lock() + self._msg_queue: queue.Queue = queue.Queue() + self._next_sub_id = 1 + self._sub_rooms: Dict[str, str] = {} + self._session: Optional[str] = None + self._reconnect_attempt = 0 + + def _create_socket(self) -> socket.socket: + ctx = ssl.create_default_context() + raw = socket.create_connection((self.host, 443), timeout=10) + return ctx.wrap_socket(raw, server_hostname=self.host) + + def _handshake(self, s: socket.socket) -> bool: + import base64 + key = base64.b64encode(os.urandom(16)).decode() + hs = ( + f"GET /websocket HTTP/1.1\r\n" + f"Host: {self.host}\r\n" + f"Upgrade: websocket\r\n" + f"Connection: Upgrade\r\n" + f"Sec-WebSocket-Key: {key}\r\n" + f"Sec-WebSocket-Version: 13\r\n" + f"Origin: https://{self.host}\r\n\r\n" + ) + s.sendall(hs.encode()) + resp = b"" + while b"\r\n\r\n" not in resp: + chunk = s.recv(1024) + if not chunk: + break + resp += chunk + return b"101 Switching Protocols" in resp + + def _send_frame(self, data: bytes, opcode: int = 0x01, mask: bool = True) -> None: + if not self.sock: + return + length = len(data) + if length < 126: + header = struct.pack("!BB", 0x80 | opcode, (0x80 if mask else 0x00) | length) + elif length < 65536: + header = struct.pack("!BBH", 0x80 | opcode, (0x80 if mask else 0x00) | 126, length) + else: + header = struct.pack("!BBQ", 0x80 | opcode, (0x80 if mask else 0x00) | 127, length) + frame = header + if mask: + masking = os.urandom(4) + masked = bytes(b ^ masking[i % 4] for i, b in enumerate(data)) + frame += masking + masked + else: + frame += data + with self._write_lock: + self.sock.sendall(frame) + + def _send_json(self, obj: dict) -> None: + self._send_frame(json.dumps(obj).encode("utf-8"), opcode=0x01, mask=True) + + def _send_raw_pong(self, payload: bytes) -> None: + self._send_frame(payload, opcode=0x0A, mask=True) + + def _reader_loop(self): + import select + self.sock.settimeout(1.0) + last_ping = time.time() + while self._running: + try: + if time.time() - last_ping >= self.PING_INTERVAL: + try: + self._send_json({"msg": "ping"}) + last_ping = time.time() + except Exception: + break + readable, _, _ = select.select([self.sock], [], [], 1.0) + if not readable: + continue + hdr = self.sock.recv(2) + if not hdr: + break + fin = (hdr[0] & 0x80) != 0 + opcode = hdr[0] & 0x0F + masked = (hdr[1] & 0x80) != 0 + length = hdr[1] & 0x7F + if length == 126: + length = struct.unpack("!H", self.sock.recv(2))[0] + elif length == 127: + length = struct.unpack("!Q", self.sock.recv(8))[0] + payload = b"" + remaining = length + while remaining > 0: + chunk = self.sock.recv(min(4096, remaining)) + if not chunk: + break + payload += chunk + remaining -= len(chunk) + if len(payload) < length: + break + if masked: + mask = payload[:4] + payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload[4:])) + if opcode == 0x09: + self._send_raw_pong(payload) + elif opcode == 0x01 and payload: + try: + data = json.loads(payload.decode("utf-8")) + msg_type = data.get("msg", "") + if msg_type == "connected": + self.connected = True + self._session = data.get("session") + if self._on_connected: + self._on_connected() + elif msg_type == "ping": + self._send_json({"msg": "pong"}) + elif msg_type == "changed" and data.get("collection") == "stream-room-messages": + fields = data.get("fields", {}) + args = fields.get("args", []) + if args and isinstance(args, list): + self._msg_queue.put(args[0]) + except Exception: + pass + elif opcode == 0x08: + break + except Exception: + break + self.connected = False + logger.debug("DDP reader thread exited") + + def connect_and_login(self, token: str) -> bool: + try: + self.sock = self._create_socket() + if not self._handshake(self.sock): + return False + self._running = True + self._reader_thread = threading.Thread(target=self._reader_loop, daemon=True) + self._reader_thread.start() + time.sleep(0.3) + self._send_json({"msg": "connect", "version": "1", "support": ["1", "pre2", "pre1"]}) + for _ in range(50): + if self.connected: + break + time.sleep(0.1) + if not self.connected: + self.disconnect() + return False + time.sleep(0.2) + self._send_json({"msg": "method", "method": "login", + "params": [{"resume": token}], "id": "ddplogin"}) + time.sleep(0.5) + return True + except Exception as e: + logger.error("DDP connect failed: %s", e) + self.disconnect() + return False + + def subscribe_room(self, room_id: str) -> Optional[str]: + if not self.connected: + return None + sub_id = f"sub{self._next_sub_id}" + self._next_sub_id += 1 + # CRITICAL: Rocket.Chat V5+ requires {"token": "true"} as second param, + # otherwise subscribe succeeds but NO messages ever stream. + self._send_json({ + "msg": "sub", + "name": "stream-room-messages", + "params": [room_id, {"token": "true"}], + "id": sub_id + }) + self._sub_rooms[room_id] = sub_id + return sub_id + + def set_status(self, status: str) -> bool: + if not self.connected: + return False + method = f"UserPresence:{status}" + self._send_json({"msg": "method", "method": method, "params": [], "id": f"st{int(time.time()*1000)}"}) + return True + + def disconnect(self): + self._running = False + if self.sock: + try: + self.sock.close() + except Exception: + pass + self.sock = None + self.connected = False + self._sub_rooms.clear() + + def reconnect_and_restore(self, token: str) -> bool: + self.disconnect() + time.sleep(1) + old_subs = list(self._sub_rooms.keys()) + self._sub_rooms.clear() + if self.connect_and_login(token): + for rid in old_subs: + self.subscribe_room(rid) + return True + return False + + +class RocketChatAdapter(BasePlatformAdapter): + def __init__(self, config: PlatformConfig): + super().__init__(config=config, platform=Platform("rocketchat")) + self.base_url = _get_config_str(config, "base_url", "ROCKETCHAT_BASE_URL") + self.user = _get_config_str(config, "user", "ROCKETCHAT_USER") + self.password = _get_config_str(config, "password", "ROCKETCHAT_PASSWORD") + self.rest = RocketChatREST(self.base_url) + self.ddp: Optional[DDPClient] = None + self._auth_token: Optional[str] = None + self._user_id: Optional[str] = None + self._rooms: List[dict] = [] + self._subscribed_rooms: set = set() + self._listen_task: Optional[asyncio.Task] = None + self._status_timer: Optional[threading.Timer] = None + self._last_activity = 0.0 + self._shutdown_event = threading.Event() + self._room_info: Dict[str, dict] = {} + + async def connect(self) -> bool: + if not self.base_url or not self.user or not self.password: + logger.error("[rocketchat] Missing configuration") + return False + if not self.rest.login(self.user, self.password): + logger.error("[rocketchat] REST login failed") + return False + self._auth_token = self.rest.auth_token + self._user_id = self.rest.user_id + host = self.base_url.replace("https://", "").replace("http://", "").split("/")[0] + self.ddp = DDPClient( + host=host, + on_connected_cb=lambda: self.ddp.set_status("away") if self.ddp else None, + ) + if not self.ddp.connect_and_login(self._auth_token): + logger.error("[rocketchat] DDP connect/login failed") + return False + self._rooms = self.rest.get_rooms() + for room in self._rooms: + sub_id = self.ddp.subscribe_room(room["id"]) + if sub_id: + self._subscribed_rooms.add(room["id"]) + self._room_info[room["id"]] = {"name": room["name"], "type": room["type"]} + logger.info("[rocketchat] Subscribed to %s", room["name"]) + self._mark_connected() + self._listen_task = asyncio.create_task(self._listen_loop()) + self._last_activity = time.time() + self._schedule_idle_timer() + logger.info("[rocketchat] Connected to %s", self.base_url) + return True + + async def disconnect(self) -> None: + self._mark_disconnected() + self._shutdown_event.set() + if self._status_timer: + self._status_timer.cancel() + if self._listen_task and not self._listen_task.done(): + self._listen_task.cancel() + try: + await self._listen_task + except asyncio.CancelledError: + pass + if self.ddp: + self.ddp.disconnect() + self.ddp = None + logger.info("[rocketchat] Disconnected") + + async def send(self, chat_id: str, content: str, reply_to: Optional[str] = None, + metadata: Optional[Dict[str, Any]] = None) -> SendResult: + if not self.rest.auth_token: + return SendResult(success=False, error="Not authenticated") + + # Handle interaction templates via metadata + attachments = None + if metadata and metadata.get("template"): + from . import _TEMPLATE_MAP, _build_custom_buttons + template_name = metadata.get("template") + question_id = metadata.get("question_id", "") + if template_name == "custom" and metadata.get("custom_buttons"): + buttons = _build_custom_buttons(metadata["custom_buttons"], prefix=question_id) + attachments = [{ + "color": "#1d74f5", + "title": metadata.get("button_title", "Wähle eine Option:"), + "actions": buttons, + }] + elif template_name in _TEMPLATE_MAP: + tmpl = _TEMPLATE_MAP[template_name] + attachment = dict(tmpl.get("attachment", {})) + raw_buttons = tmpl.get("buttons", []) + buttons = [] + for btn in raw_buttons: + msg_payload = btn.get("msg", "") + if question_id: + msg_payload = f"{question_id}::{msg_payload}" + buttons.append({ + "type": "button", + "text": btn.get("text", "?"), + "msg": msg_payload, + "msg_in_chat_window": True, + "style": btn.get("style", "default"), + }) + attachment["actions"] = buttons + attachments = [attachment] + + success = self.rest.post_message(chat_id, content, attachments=attachments) + if success: + return SendResult(success=True, message_id=str(int(time.time() * 1000))) + return SendResult(success=False, error="chat.postMessage failed") + + async def send_typing(self, chat_id: str, metadata=None) -> None: + pass + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + info = self._room_info.get(chat_id, {}) + return {"name": info.get("name", chat_id), "type": "group" if info.get("type") in ("c", "p") else "dm"} + + async def _listen_loop(self): + loop = asyncio.get_running_loop() + while self._running and not self._shutdown_event.is_set(): + try: + msg_obj = await loop.run_in_executor(None, self._ddp_queue_get, 1.0) + if msg_obj is None: + if self.ddp and not self.ddp.connected: + logger.warning("[rocketchat] DDP disconnected, reconnecting...") + if self._auth_token and self.ddp.reconnect_and_restore(self._auth_token): + logger.info("[rocketchat] Reconnected") + else: + await asyncio.sleep(5) + continue + await self._process_incoming(msg_obj) + except asyncio.CancelledError: + break + except Exception as e: + logger.error("[rocketchat] Listener error: %s", e) + await asyncio.sleep(1) + + def _ddp_queue_get(self, timeout: float): + if not self.ddp: + return None + try: + return self.ddp._msg_queue.get(timeout=timeout) + except queue.Empty: + return None + + async def _process_incoming(self, msg_obj: dict): + rid = msg_obj.get("rid") + msg_text = msg_obj.get("msg", "") + msg_id = msg_obj.get("_id", "") + user_info = msg_obj.get("u", {}) + sender_id = user_info.get("_id", "") + sender_name = user_info.get("username", "unknown") + if sender_id == self._user_id: + return + if not rid or not msg_text: + return + self._last_activity = time.time() + self._go_online() + room_name = self._room_info.get(rid, {}).get("name", rid) + source = self.build_source( + chat_id=rid, + chat_name=room_name, + chat_type="group" if self._room_info.get(rid, {}).get("type") in ("c", "p") else "dm", + user_id=sender_id, + user_name=sender_name, + ) + event = MessageEvent( + text=msg_text, + message_type=MessageType.TEXT, + source=source, + raw_message=msg_obj, + message_id=msg_id, + ) + await self.handle_message(event) + + def _go_online(self): + if self.ddp and self.ddp.connected: + self.ddp.set_status("online") + self._schedule_idle_timer() + + def _go_idle(self): + idle = time.time() - self._last_activity + if idle >= 900 and self.ddp and self.ddp.connected: + self.ddp.set_status("away") + + def _schedule_idle_timer(self): + if self._status_timer: + self._status_timer.cancel() + self._status_timer = threading.Timer(900, self._go_idle) + self._status_timer.daemon = True + self._status_timer.start() diff --git a/plugin.yaml b/plugin.yaml new file mode 100644 index 0000000..91e2eef --- /dev/null +++ b/plugin.yaml @@ -0,0 +1,22 @@ +name: rocketchat-platform +kind: platform +version: 1.0.0 +description: > + Rocket.Chat gateway adapter for Hermes Agent. + Connects to a self-hosted Rocket.Chat server via WebSocket/DDP for + real-time message streaming and presence, and REST API for sending replies. + Supports channels, private groups, and direct messages. +author: Hermes Community +requires_env: + - ROCKETCHAT_BASE_URL + - ROCKETCHAT_USER + - ROCKETCHAT_PASSWORD + - name: ROCKETCHAT_ROOM_ID + description: "Default room/channel ID to subscribe (optional if auto-discovery enabled)" + required: false + - name: ROCKETCHAT_ALLOW_ALL_USERS + description: "Set to 'true' to allow all users (default: false = allowlist)" + required: false + - name: ROCKETCHAT_ALLOWED_USERS + description: "Comma-separated list of allowed Rocket.Chat usernames" + required: false