513 lines
20 KiB
Python
513 lines
20 KiB
Python
"""
|
|
Rocket.Chat Platform Adapter for Hermes Agent.
|
|
|
|
Plugin-based gateway adapter connecting to a self-hosted Rocket.Chat server.
|
|
Uses WebSocket/DDP for real-time incoming messages + presence,
|
|
and REST API for sending replies.
|
|
|
|
Zero external dependencies — stdlib only.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import os
|
|
import queue
|
|
import socket
|
|
import ssl
|
|
import struct
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
SendResult,
|
|
MessageEvent,
|
|
MessageType,
|
|
)
|
|
from gateway.session import SessionSource
|
|
from gateway.config import PlatformConfig, Platform
|
|
except ImportError:
|
|
# Fallback for standalone test / development outside gateway
|
|
BasePlatformAdapter = object
|
|
SendResult = dict
|
|
MessageEvent = dict
|
|
MessageType = object
|
|
SessionSource = object
|
|
PlatformConfig = object
|
|
Platform = str
|
|
|
|
|
|
def _get_config_str(config: PlatformConfig, key: str, env_var: str, default: str = "") -> str:
|
|
extra = getattr(config, "extra", {}) or {}
|
|
return os.getenv(env_var) or extra.get(key, default)
|
|
|
|
|
|
class RocketChatREST:
|
|
def __init__(self, base_url: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.auth_token: Optional[str] = None
|
|
self.user_id: Optional[str] = None
|
|
|
|
def _request(self, method: str, endpoint: str, headers: Optional[dict] = None, data: Optional[dict] = None):
|
|
url = f"{self.base_url}{endpoint}"
|
|
req_headers = dict(headers) if headers else {}
|
|
body = json.dumps(data).encode("utf-8") if data else None
|
|
if body:
|
|
req_headers["Content-Type"] = "application/json"
|
|
req = urllib.request.Request(url, data=body, headers=req_headers, method=method)
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode("utf-8"))
|
|
except Exception as e:
|
|
logger.debug("REST %s %s failed: %s", method, endpoint, e)
|
|
return None
|
|
|
|
def login(self, user: str, password: str) -> bool:
|
|
r = self._request("POST", "/api/v1/login", data={"user": user, "password": password})
|
|
if r and r.get("status") == "success":
|
|
self.auth_token = r["data"]["authToken"]
|
|
self.user_id = r["data"]["userId"]
|
|
return True
|
|
return False
|
|
|
|
def get_rooms(self) -> List[dict]:
|
|
if not self.auth_token:
|
|
return []
|
|
r = self._request("GET", "/api/v1/rooms.get", headers={
|
|
"X-Auth-Token": self.auth_token,
|
|
"X-User-Id": self.user_id,
|
|
})
|
|
if not r or not r.get("success"):
|
|
return []
|
|
rooms = []
|
|
for rm in r.get("update", []):
|
|
t = rm.get("t", "c")
|
|
rid = rm["_id"]
|
|
if t == "d":
|
|
usernames = rm.get("usernames", [])
|
|
other = [u for u in usernames if u != os.getenv("ROCKETCHAT_USER", "")]
|
|
name = "DM mit " + (", ".join(other) if other else "unbekannt")
|
|
else:
|
|
name = rm.get("name", "unnamed")
|
|
rooms.append({"id": rid, "name": name, "type": t})
|
|
return rooms
|
|
|
|
def post_message(self, room_id: str, text: str, attachments: Optional[List[dict]] = None) -> bool:
|
|
if not self.auth_token:
|
|
return False
|
|
payload = {"roomId": room_id, "text": text}
|
|
if attachments:
|
|
payload["attachments"] = attachments
|
|
r = self._request("POST", "/api/v1/chat.postMessage", headers={
|
|
"X-Auth-Token": self.auth_token,
|
|
"X-User-Id": self.user_id,
|
|
"Content-Type": "application/json",
|
|
}, data=payload)
|
|
return bool(r and r.get("success"))
|
|
|
|
|
|
class DDPClient:
|
|
PING_INTERVAL = 8.0
|
|
RECONNECT_BACKOFFS = [5, 10, 20, 30, 60, 120]
|
|
|
|
def __init__(self, host: str, on_message_cb=None, on_connected_cb=None):
|
|
self.host = host
|
|
self._on_message = on_message_cb
|
|
self._on_connected = on_connected_cb
|
|
self.sock: Optional[socket.socket] = None
|
|
self.connected = False
|
|
self._running = False
|
|
self._reader_thread: Optional[threading.Thread] = None
|
|
self._write_lock = threading.Lock()
|
|
self._msg_queue: queue.Queue = queue.Queue()
|
|
self._next_sub_id = 1
|
|
self._sub_rooms: Dict[str, str] = {}
|
|
self._session: Optional[str] = None
|
|
self._reconnect_attempt = 0
|
|
|
|
def _create_socket(self) -> socket.socket:
|
|
ctx = ssl.create_default_context()
|
|
raw = socket.create_connection((self.host, 443), timeout=10)
|
|
return ctx.wrap_socket(raw, server_hostname=self.host)
|
|
|
|
def _handshake(self, s: socket.socket) -> bool:
|
|
import base64
|
|
key = base64.b64encode(os.urandom(16)).decode()
|
|
hs = (
|
|
f"GET /websocket HTTP/1.1\r\n"
|
|
f"Host: {self.host}\r\n"
|
|
f"Upgrade: websocket\r\n"
|
|
f"Connection: Upgrade\r\n"
|
|
f"Sec-WebSocket-Key: {key}\r\n"
|
|
f"Sec-WebSocket-Version: 13\r\n"
|
|
f"Origin: https://{self.host}\r\n\r\n"
|
|
)
|
|
s.sendall(hs.encode())
|
|
resp = b""
|
|
while b"\r\n\r\n" not in resp:
|
|
chunk = s.recv(1024)
|
|
if not chunk:
|
|
break
|
|
resp += chunk
|
|
return b"101 Switching Protocols" in resp
|
|
|
|
def _send_frame(self, data: bytes, opcode: int = 0x01, mask: bool = True) -> None:
|
|
if not self.sock:
|
|
return
|
|
length = len(data)
|
|
if length < 126:
|
|
header = struct.pack("!BB", 0x80 | opcode, (0x80 if mask else 0x00) | length)
|
|
elif length < 65536:
|
|
header = struct.pack("!BBH", 0x80 | opcode, (0x80 if mask else 0x00) | 126, length)
|
|
else:
|
|
header = struct.pack("!BBQ", 0x80 | opcode, (0x80 if mask else 0x00) | 127, length)
|
|
frame = header
|
|
if mask:
|
|
masking = os.urandom(4)
|
|
masked = bytes(b ^ masking[i % 4] for i, b in enumerate(data))
|
|
frame += masking + masked
|
|
else:
|
|
frame += data
|
|
with self._write_lock:
|
|
self.sock.sendall(frame)
|
|
|
|
def _send_json(self, obj: dict) -> None:
|
|
self._send_frame(json.dumps(obj).encode("utf-8"), opcode=0x01, mask=True)
|
|
|
|
def _send_raw_pong(self, payload: bytes) -> None:
|
|
self._send_frame(payload, opcode=0x0A, mask=True)
|
|
|
|
def _reader_loop(self):
|
|
import select
|
|
self.sock.settimeout(1.0)
|
|
last_ping = time.time()
|
|
while self._running:
|
|
try:
|
|
if time.time() - last_ping >= self.PING_INTERVAL:
|
|
try:
|
|
self._send_json({"msg": "ping"})
|
|
last_ping = time.time()
|
|
except Exception:
|
|
break
|
|
readable, _, _ = select.select([self.sock], [], [], 1.0)
|
|
if not readable:
|
|
continue
|
|
hdr = self.sock.recv(2)
|
|
if not hdr:
|
|
break
|
|
fin = (hdr[0] & 0x80) != 0
|
|
opcode = hdr[0] & 0x0F
|
|
masked = (hdr[1] & 0x80) != 0
|
|
length = hdr[1] & 0x7F
|
|
if length == 126:
|
|
length = struct.unpack("!H", self.sock.recv(2))[0]
|
|
elif length == 127:
|
|
length = struct.unpack("!Q", self.sock.recv(8))[0]
|
|
payload = b""
|
|
remaining = length
|
|
while remaining > 0:
|
|
chunk = self.sock.recv(min(4096, remaining))
|
|
if not chunk:
|
|
break
|
|
payload += chunk
|
|
remaining -= len(chunk)
|
|
if len(payload) < length:
|
|
break
|
|
if masked:
|
|
mask = payload[:4]
|
|
payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload[4:]))
|
|
if opcode == 0x09:
|
|
self._send_raw_pong(payload)
|
|
elif opcode == 0x01 and payload:
|
|
try:
|
|
data = json.loads(payload.decode("utf-8"))
|
|
msg_type = data.get("msg", "")
|
|
if msg_type == "connected":
|
|
self.connected = True
|
|
self._session = data.get("session")
|
|
if self._on_connected:
|
|
self._on_connected()
|
|
elif msg_type == "ping":
|
|
self._send_json({"msg": "pong"})
|
|
elif msg_type == "changed" and data.get("collection") == "stream-room-messages":
|
|
fields = data.get("fields", {})
|
|
args = fields.get("args", [])
|
|
if args and isinstance(args, list):
|
|
self._msg_queue.put(args[0])
|
|
except Exception:
|
|
pass
|
|
elif opcode == 0x08:
|
|
break
|
|
except Exception:
|
|
break
|
|
self.connected = False
|
|
logger.debug("DDP reader thread exited")
|
|
|
|
def connect_and_login(self, token: str) -> bool:
|
|
try:
|
|
self.sock = self._create_socket()
|
|
if not self._handshake(self.sock):
|
|
return False
|
|
self._running = True
|
|
self._reader_thread = threading.Thread(target=self._reader_loop, daemon=True)
|
|
self._reader_thread.start()
|
|
time.sleep(0.3)
|
|
self._send_json({"msg": "connect", "version": "1", "support": ["1", "pre2", "pre1"]})
|
|
for _ in range(50):
|
|
if self.connected:
|
|
break
|
|
time.sleep(0.1)
|
|
if not self.connected:
|
|
self.disconnect()
|
|
return False
|
|
time.sleep(0.2)
|
|
self._send_json({"msg": "method", "method": "login",
|
|
"params": [{"resume": token}], "id": "ddplogin"})
|
|
time.sleep(0.5)
|
|
return True
|
|
except Exception as e:
|
|
logger.error("DDP connect failed: %s", e)
|
|
self.disconnect()
|
|
return False
|
|
|
|
def subscribe_room(self, room_id: str) -> Optional[str]:
|
|
if not self.connected:
|
|
return None
|
|
sub_id = f"sub{self._next_sub_id}"
|
|
self._next_sub_id += 1
|
|
# CRITICAL: Rocket.Chat V5+ requires {"token": "true"} as second param,
|
|
# otherwise subscribe succeeds but NO messages ever stream.
|
|
self._send_json({
|
|
"msg": "sub",
|
|
"name": "stream-room-messages",
|
|
"params": [room_id, {"token": "true"}],
|
|
"id": sub_id
|
|
})
|
|
self._sub_rooms[room_id] = sub_id
|
|
return sub_id
|
|
|
|
def set_status(self, status: str) -> bool:
|
|
if not self.connected:
|
|
return False
|
|
method = f"UserPresence:{status}"
|
|
self._send_json({"msg": "method", "method": method, "params": [], "id": f"st{int(time.time()*1000)}"})
|
|
return True
|
|
|
|
def disconnect(self):
|
|
self._running = False
|
|
if self.sock:
|
|
try:
|
|
self.sock.close()
|
|
except Exception:
|
|
pass
|
|
self.sock = None
|
|
self.connected = False
|
|
self._sub_rooms.clear()
|
|
|
|
def reconnect_and_restore(self, token: str) -> bool:
|
|
self.disconnect()
|
|
time.sleep(1)
|
|
old_subs = list(self._sub_rooms.keys())
|
|
self._sub_rooms.clear()
|
|
if self.connect_and_login(token):
|
|
for rid in old_subs:
|
|
self.subscribe_room(rid)
|
|
return True
|
|
return False
|
|
|
|
|
|
class RocketChatAdapter(BasePlatformAdapter):
|
|
def __init__(self, config: PlatformConfig):
|
|
super().__init__(config=config, platform=Platform("rocketchat"))
|
|
self.base_url = _get_config_str(config, "base_url", "ROCKETCHAT_BASE_URL")
|
|
self.user = _get_config_str(config, "user", "ROCKETCHAT_USER")
|
|
self.password = _get_config_str(config, "password", "ROCKETCHAT_PASSWORD")
|
|
self.rest = RocketChatREST(self.base_url)
|
|
self.ddp: Optional[DDPClient] = None
|
|
self._auth_token: Optional[str] = None
|
|
self._user_id: Optional[str] = None
|
|
self._rooms: List[dict] = []
|
|
self._subscribed_rooms: set = set()
|
|
self._listen_task: Optional[asyncio.Task] = None
|
|
self._status_timer: Optional[threading.Timer] = None
|
|
self._last_activity = 0.0
|
|
self._shutdown_event = threading.Event()
|
|
self._room_info: Dict[str, dict] = {}
|
|
|
|
async def connect(self) -> bool:
|
|
if not self.base_url or not self.user or not self.password:
|
|
logger.error("[rocketchat] Missing configuration")
|
|
return False
|
|
if not self.rest.login(self.user, self.password):
|
|
logger.error("[rocketchat] REST login failed")
|
|
return False
|
|
self._auth_token = self.rest.auth_token
|
|
self._user_id = self.rest.user_id
|
|
host = self.base_url.replace("https://", "").replace("http://", "").split("/")[0]
|
|
self.ddp = DDPClient(
|
|
host=host,
|
|
on_connected_cb=lambda: self.ddp.set_status("away") if self.ddp else None,
|
|
)
|
|
if not self.ddp.connect_and_login(self._auth_token):
|
|
logger.error("[rocketchat] DDP connect/login failed")
|
|
return False
|
|
self._rooms = self.rest.get_rooms()
|
|
for room in self._rooms:
|
|
sub_id = self.ddp.subscribe_room(room["id"])
|
|
if sub_id:
|
|
self._subscribed_rooms.add(room["id"])
|
|
self._room_info[room["id"]] = {"name": room["name"], "type": room["type"]}
|
|
logger.info("[rocketchat] Subscribed to %s", room["name"])
|
|
self._mark_connected()
|
|
self._listen_task = asyncio.create_task(self._listen_loop())
|
|
self._last_activity = time.time()
|
|
self._schedule_idle_timer()
|
|
logger.info("[rocketchat] Connected to %s", self.base_url)
|
|
return True
|
|
|
|
async def disconnect(self) -> None:
|
|
self._mark_disconnected()
|
|
self._shutdown_event.set()
|
|
if self._status_timer:
|
|
self._status_timer.cancel()
|
|
if self._listen_task and not self._listen_task.done():
|
|
self._listen_task.cancel()
|
|
try:
|
|
await self._listen_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
if self.ddp:
|
|
self.ddp.disconnect()
|
|
self.ddp = None
|
|
logger.info("[rocketchat] Disconnected")
|
|
|
|
async def send(self, chat_id: str, content: str, reply_to: Optional[str] = None,
|
|
metadata: Optional[Dict[str, Any]] = None) -> SendResult:
|
|
if not self.rest.auth_token:
|
|
return SendResult(success=False, error="Not authenticated")
|
|
|
|
# Handle interaction templates via metadata
|
|
attachments = None
|
|
if metadata and metadata.get("template"):
|
|
from . import _TEMPLATE_MAP, _build_custom_buttons
|
|
template_name = metadata.get("template")
|
|
question_id = metadata.get("question_id", "")
|
|
if template_name == "custom" and metadata.get("custom_buttons"):
|
|
buttons = _build_custom_buttons(metadata["custom_buttons"], prefix=question_id)
|
|
attachments = [{
|
|
"color": "#1d74f5",
|
|
"title": metadata.get("button_title", "Wähle eine Option:"),
|
|
"actions": buttons,
|
|
}]
|
|
elif template_name in _TEMPLATE_MAP:
|
|
tmpl = _TEMPLATE_MAP[template_name]
|
|
attachment = dict(tmpl.get("attachment", {}))
|
|
raw_buttons = tmpl.get("buttons", [])
|
|
buttons = []
|
|
for btn in raw_buttons:
|
|
msg_payload = btn.get("msg", "")
|
|
if question_id:
|
|
msg_payload = f"{question_id}::{msg_payload}"
|
|
buttons.append({
|
|
"type": "button",
|
|
"text": btn.get("text", "?"),
|
|
"msg": msg_payload,
|
|
"msg_in_chat_window": True,
|
|
"style": btn.get("style", "default"),
|
|
})
|
|
attachment["actions"] = buttons
|
|
attachments = [attachment]
|
|
|
|
success = self.rest.post_message(chat_id, content, attachments=attachments)
|
|
if success:
|
|
return SendResult(success=True, message_id=str(int(time.time() * 1000)))
|
|
return SendResult(success=False, error="chat.postMessage failed")
|
|
|
|
async def send_typing(self, chat_id: str, metadata=None) -> None:
|
|
pass
|
|
|
|
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
|
|
info = self._room_info.get(chat_id, {})
|
|
return {"name": info.get("name", chat_id), "type": "group" if info.get("type") in ("c", "p") else "dm"}
|
|
|
|
async def _listen_loop(self):
|
|
loop = asyncio.get_running_loop()
|
|
while self._running and not self._shutdown_event.is_set():
|
|
try:
|
|
msg_obj = await loop.run_in_executor(None, self._ddp_queue_get, 1.0)
|
|
if msg_obj is None:
|
|
if self.ddp and not self.ddp.connected:
|
|
logger.warning("[rocketchat] DDP disconnected, reconnecting...")
|
|
if self._auth_token and self.ddp.reconnect_and_restore(self._auth_token):
|
|
logger.info("[rocketchat] Reconnected")
|
|
else:
|
|
await asyncio.sleep(5)
|
|
continue
|
|
await self._process_incoming(msg_obj)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error("[rocketchat] Listener error: %s", e)
|
|
await asyncio.sleep(1)
|
|
|
|
def _ddp_queue_get(self, timeout: float):
|
|
if not self.ddp:
|
|
return None
|
|
try:
|
|
return self.ddp._msg_queue.get(timeout=timeout)
|
|
except queue.Empty:
|
|
return None
|
|
|
|
async def _process_incoming(self, msg_obj: dict):
|
|
rid = msg_obj.get("rid")
|
|
msg_text = msg_obj.get("msg", "")
|
|
msg_id = msg_obj.get("_id", "")
|
|
user_info = msg_obj.get("u", {})
|
|
sender_id = user_info.get("_id", "")
|
|
sender_name = user_info.get("username", "unknown")
|
|
if sender_id == self._user_id:
|
|
return
|
|
if not rid or not msg_text:
|
|
return
|
|
self._last_activity = time.time()
|
|
self._go_online()
|
|
room_name = self._room_info.get(rid, {}).get("name", rid)
|
|
source = self.build_source(
|
|
chat_id=rid,
|
|
chat_name=room_name,
|
|
chat_type="group" if self._room_info.get(rid, {}).get("type") in ("c", "p") else "dm",
|
|
user_id=sender_id,
|
|
user_name=sender_name,
|
|
)
|
|
event = MessageEvent(
|
|
text=msg_text,
|
|
message_type=MessageType.TEXT,
|
|
source=source,
|
|
raw_message=msg_obj,
|
|
message_id=msg_id,
|
|
)
|
|
await self.handle_message(event)
|
|
|
|
def _go_online(self):
|
|
if self.ddp and self.ddp.connected:
|
|
self.ddp.set_status("online")
|
|
self._schedule_idle_timer()
|
|
|
|
def _go_idle(self):
|
|
idle = time.time() - self._last_activity
|
|
if idle >= 900 and self.ddp and self.ddp.connected:
|
|
self.ddp.set_status("away")
|
|
|
|
def _schedule_idle_timer(self):
|
|
if self._status_timer:
|
|
self._status_timer.cancel()
|
|
self._status_timer = threading.Timer(900, self._go_idle)
|
|
self._status_timer.daemon = True
|
|
self._status_timer.start()
|