feat: initial plugin release — adapter, init, plugin.yaml

This commit is contained in:
Bernd (Hermes Agent)
2026-05-05 06:05:47 +02:00
parent e7ffe0fc84
commit 0e31bf55e5
3 changed files with 657 additions and 0 deletions
+512
View File
@@ -0,0 +1,512 @@
"""
Rocket.Chat Platform Adapter for Hermes Agent.
Plugin-based gateway adapter connecting to a self-hosted Rocket.Chat server.
Uses WebSocket/DDP for real-time incoming messages + presence,
and REST API for sending replies.
Zero external dependencies — stdlib only.
"""
import asyncio
import json
import logging
import os
import queue
import socket
import ssl
import struct
import threading
import time
import urllib.request
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
try:
from gateway.platforms.base import (
BasePlatformAdapter,
SendResult,
MessageEvent,
MessageType,
)
from gateway.session import SessionSource
from gateway.config import PlatformConfig, Platform
except ImportError:
# Fallback for standalone test / development outside gateway
BasePlatformAdapter = object
SendResult = dict
MessageEvent = dict
MessageType = object
SessionSource = object
PlatformConfig = object
Platform = str
def _get_config_str(config: PlatformConfig, key: str, env_var: str, default: str = "") -> str:
extra = getattr(config, "extra", {}) or {}
return os.getenv(env_var) or extra.get(key, default)
class RocketChatREST:
def __init__(self, base_url: str):
self.base_url = base_url.rstrip("/")
self.auth_token: Optional[str] = None
self.user_id: Optional[str] = None
def _request(self, method: str, endpoint: str, headers: Optional[dict] = None, data: Optional[dict] = None):
url = f"{self.base_url}{endpoint}"
req_headers = dict(headers) if headers else {}
body = json.dumps(data).encode("utf-8") if data else None
if body:
req_headers["Content-Type"] = "application/json"
req = urllib.request.Request(url, data=body, headers=req_headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode("utf-8"))
except Exception as e:
logger.debug("REST %s %s failed: %s", method, endpoint, e)
return None
def login(self, user: str, password: str) -> bool:
r = self._request("POST", "/api/v1/login", data={"user": user, "password": password})
if r and r.get("status") == "success":
self.auth_token = r["data"]["authToken"]
self.user_id = r["data"]["userId"]
return True
return False
def get_rooms(self) -> List[dict]:
if not self.auth_token:
return []
r = self._request("GET", "/api/v1/rooms.get", headers={
"X-Auth-Token": self.auth_token,
"X-User-Id": self.user_id,
})
if not r or not r.get("success"):
return []
rooms = []
for rm in r.get("update", []):
t = rm.get("t", "c")
rid = rm["_id"]
if t == "d":
usernames = rm.get("usernames", [])
other = [u for u in usernames if u != os.getenv("ROCKETCHAT_USER", "")]
name = "DM mit " + (", ".join(other) if other else "unbekannt")
else:
name = rm.get("name", "unnamed")
rooms.append({"id": rid, "name": name, "type": t})
return rooms
def post_message(self, room_id: str, text: str, attachments: Optional[List[dict]] = None) -> bool:
if not self.auth_token:
return False
payload = {"roomId": room_id, "text": text}
if attachments:
payload["attachments"] = attachments
r = self._request("POST", "/api/v1/chat.postMessage", headers={
"X-Auth-Token": self.auth_token,
"X-User-Id": self.user_id,
"Content-Type": "application/json",
}, data=payload)
return bool(r and r.get("success"))
class DDPClient:
PING_INTERVAL = 8.0
RECONNECT_BACKOFFS = [5, 10, 20, 30, 60, 120]
def __init__(self, host: str, on_message_cb=None, on_connected_cb=None):
self.host = host
self._on_message = on_message_cb
self._on_connected = on_connected_cb
self.sock: Optional[socket.socket] = None
self.connected = False
self._running = False
self._reader_thread: Optional[threading.Thread] = None
self._write_lock = threading.Lock()
self._msg_queue: queue.Queue = queue.Queue()
self._next_sub_id = 1
self._sub_rooms: Dict[str, str] = {}
self._session: Optional[str] = None
self._reconnect_attempt = 0
def _create_socket(self) -> socket.socket:
ctx = ssl.create_default_context()
raw = socket.create_connection((self.host, 443), timeout=10)
return ctx.wrap_socket(raw, server_hostname=self.host)
def _handshake(self, s: socket.socket) -> bool:
import base64
key = base64.b64encode(os.urandom(16)).decode()
hs = (
f"GET /websocket HTTP/1.1\r\n"
f"Host: {self.host}\r\n"
f"Upgrade: websocket\r\n"
f"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {key}\r\n"
f"Sec-WebSocket-Version: 13\r\n"
f"Origin: https://{self.host}\r\n\r\n"
)
s.sendall(hs.encode())
resp = b""
while b"\r\n\r\n" not in resp:
chunk = s.recv(1024)
if not chunk:
break
resp += chunk
return b"101 Switching Protocols" in resp
def _send_frame(self, data: bytes, opcode: int = 0x01, mask: bool = True) -> None:
if not self.sock:
return
length = len(data)
if length < 126:
header = struct.pack("!BB", 0x80 | opcode, (0x80 if mask else 0x00) | length)
elif length < 65536:
header = struct.pack("!BBH", 0x80 | opcode, (0x80 if mask else 0x00) | 126, length)
else:
header = struct.pack("!BBQ", 0x80 | opcode, (0x80 if mask else 0x00) | 127, length)
frame = header
if mask:
masking = os.urandom(4)
masked = bytes(b ^ masking[i % 4] for i, b in enumerate(data))
frame += masking + masked
else:
frame += data
with self._write_lock:
self.sock.sendall(frame)
def _send_json(self, obj: dict) -> None:
self._send_frame(json.dumps(obj).encode("utf-8"), opcode=0x01, mask=True)
def _send_raw_pong(self, payload: bytes) -> None:
self._send_frame(payload, opcode=0x0A, mask=True)
def _reader_loop(self):
import select
self.sock.settimeout(1.0)
last_ping = time.time()
while self._running:
try:
if time.time() - last_ping >= self.PING_INTERVAL:
try:
self._send_json({"msg": "ping"})
last_ping = time.time()
except Exception:
break
readable, _, _ = select.select([self.sock], [], [], 1.0)
if not readable:
continue
hdr = self.sock.recv(2)
if not hdr:
break
fin = (hdr[0] & 0x80) != 0
opcode = hdr[0] & 0x0F
masked = (hdr[1] & 0x80) != 0
length = hdr[1] & 0x7F
if length == 126:
length = struct.unpack("!H", self.sock.recv(2))[0]
elif length == 127:
length = struct.unpack("!Q", self.sock.recv(8))[0]
payload = b""
remaining = length
while remaining > 0:
chunk = self.sock.recv(min(4096, remaining))
if not chunk:
break
payload += chunk
remaining -= len(chunk)
if len(payload) < length:
break
if masked:
mask = payload[:4]
payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload[4:]))
if opcode == 0x09:
self._send_raw_pong(payload)
elif opcode == 0x01 and payload:
try:
data = json.loads(payload.decode("utf-8"))
msg_type = data.get("msg", "")
if msg_type == "connected":
self.connected = True
self._session = data.get("session")
if self._on_connected:
self._on_connected()
elif msg_type == "ping":
self._send_json({"msg": "pong"})
elif msg_type == "changed" and data.get("collection") == "stream-room-messages":
fields = data.get("fields", {})
args = fields.get("args", [])
if args and isinstance(args, list):
self._msg_queue.put(args[0])
except Exception:
pass
elif opcode == 0x08:
break
except Exception:
break
self.connected = False
logger.debug("DDP reader thread exited")
def connect_and_login(self, token: str) -> bool:
try:
self.sock = self._create_socket()
if not self._handshake(self.sock):
return False
self._running = True
self._reader_thread = threading.Thread(target=self._reader_loop, daemon=True)
self._reader_thread.start()
time.sleep(0.3)
self._send_json({"msg": "connect", "version": "1", "support": ["1", "pre2", "pre1"]})
for _ in range(50):
if self.connected:
break
time.sleep(0.1)
if not self.connected:
self.disconnect()
return False
time.sleep(0.2)
self._send_json({"msg": "method", "method": "login",
"params": [{"resume": token}], "id": "ddplogin"})
time.sleep(0.5)
return True
except Exception as e:
logger.error("DDP connect failed: %s", e)
self.disconnect()
return False
def subscribe_room(self, room_id: str) -> Optional[str]:
if not self.connected:
return None
sub_id = f"sub{self._next_sub_id}"
self._next_sub_id += 1
# CRITICAL: Rocket.Chat V5+ requires {"token": "true"} as second param,
# otherwise subscribe succeeds but NO messages ever stream.
self._send_json({
"msg": "sub",
"name": "stream-room-messages",
"params": [room_id, {"token": "true"}],
"id": sub_id
})
self._sub_rooms[room_id] = sub_id
return sub_id
def set_status(self, status: str) -> bool:
if not self.connected:
return False
method = f"UserPresence:{status}"
self._send_json({"msg": "method", "method": method, "params": [], "id": f"st{int(time.time()*1000)}"})
return True
def disconnect(self):
self._running = False
if self.sock:
try:
self.sock.close()
except Exception:
pass
self.sock = None
self.connected = False
self._sub_rooms.clear()
def reconnect_and_restore(self, token: str) -> bool:
self.disconnect()
time.sleep(1)
old_subs = list(self._sub_rooms.keys())
self._sub_rooms.clear()
if self.connect_and_login(token):
for rid in old_subs:
self.subscribe_room(rid)
return True
return False
class RocketChatAdapter(BasePlatformAdapter):
def __init__(self, config: PlatformConfig):
super().__init__(config=config, platform=Platform("rocketchat"))
self.base_url = _get_config_str(config, "base_url", "ROCKETCHAT_BASE_URL")
self.user = _get_config_str(config, "user", "ROCKETCHAT_USER")
self.password = _get_config_str(config, "password", "ROCKETCHAT_PASSWORD")
self.rest = RocketChatREST(self.base_url)
self.ddp: Optional[DDPClient] = None
self._auth_token: Optional[str] = None
self._user_id: Optional[str] = None
self._rooms: List[dict] = []
self._subscribed_rooms: set = set()
self._listen_task: Optional[asyncio.Task] = None
self._status_timer: Optional[threading.Timer] = None
self._last_activity = 0.0
self._shutdown_event = threading.Event()
self._room_info: Dict[str, dict] = {}
async def connect(self) -> bool:
if not self.base_url or not self.user or not self.password:
logger.error("[rocketchat] Missing configuration")
return False
if not self.rest.login(self.user, self.password):
logger.error("[rocketchat] REST login failed")
return False
self._auth_token = self.rest.auth_token
self._user_id = self.rest.user_id
host = self.base_url.replace("https://", "").replace("http://", "").split("/")[0]
self.ddp = DDPClient(
host=host,
on_connected_cb=lambda: self.ddp.set_status("away") if self.ddp else None,
)
if not self.ddp.connect_and_login(self._auth_token):
logger.error("[rocketchat] DDP connect/login failed")
return False
self._rooms = self.rest.get_rooms()
for room in self._rooms:
sub_id = self.ddp.subscribe_room(room["id"])
if sub_id:
self._subscribed_rooms.add(room["id"])
self._room_info[room["id"]] = {"name": room["name"], "type": room["type"]}
logger.info("[rocketchat] Subscribed to %s", room["name"])
self._mark_connected()
self._listen_task = asyncio.create_task(self._listen_loop())
self._last_activity = time.time()
self._schedule_idle_timer()
logger.info("[rocketchat] Connected to %s", self.base_url)
return True
async def disconnect(self) -> None:
self._mark_disconnected()
self._shutdown_event.set()
if self._status_timer:
self._status_timer.cancel()
if self._listen_task and not self._listen_task.done():
self._listen_task.cancel()
try:
await self._listen_task
except asyncio.CancelledError:
pass
if self.ddp:
self.ddp.disconnect()
self.ddp = None
logger.info("[rocketchat] Disconnected")
async def send(self, chat_id: str, content: str, reply_to: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None) -> SendResult:
if not self.rest.auth_token:
return SendResult(success=False, error="Not authenticated")
# Handle interaction templates via metadata
attachments = None
if metadata and metadata.get("template"):
from . import _TEMPLATE_MAP, _build_custom_buttons
template_name = metadata.get("template")
question_id = metadata.get("question_id", "")
if template_name == "custom" and metadata.get("custom_buttons"):
buttons = _build_custom_buttons(metadata["custom_buttons"], prefix=question_id)
attachments = [{
"color": "#1d74f5",
"title": metadata.get("button_title", "Wähle eine Option:"),
"actions": buttons,
}]
elif template_name in _TEMPLATE_MAP:
tmpl = _TEMPLATE_MAP[template_name]
attachment = dict(tmpl.get("attachment", {}))
raw_buttons = tmpl.get("buttons", [])
buttons = []
for btn in raw_buttons:
msg_payload = btn.get("msg", "")
if question_id:
msg_payload = f"{question_id}::{msg_payload}"
buttons.append({
"type": "button",
"text": btn.get("text", "?"),
"msg": msg_payload,
"msg_in_chat_window": True,
"style": btn.get("style", "default"),
})
attachment["actions"] = buttons
attachments = [attachment]
success = self.rest.post_message(chat_id, content, attachments=attachments)
if success:
return SendResult(success=True, message_id=str(int(time.time() * 1000)))
return SendResult(success=False, error="chat.postMessage failed")
async def send_typing(self, chat_id: str, metadata=None) -> None:
pass
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
info = self._room_info.get(chat_id, {})
return {"name": info.get("name", chat_id), "type": "group" if info.get("type") in ("c", "p") else "dm"}
async def _listen_loop(self):
loop = asyncio.get_running_loop()
while self._running and not self._shutdown_event.is_set():
try:
msg_obj = await loop.run_in_executor(None, self._ddp_queue_get, 1.0)
if msg_obj is None:
if self.ddp and not self.ddp.connected:
logger.warning("[rocketchat] DDP disconnected, reconnecting...")
if self._auth_token and self.ddp.reconnect_and_restore(self._auth_token):
logger.info("[rocketchat] Reconnected")
else:
await asyncio.sleep(5)
continue
await self._process_incoming(msg_obj)
except asyncio.CancelledError:
break
except Exception as e:
logger.error("[rocketchat] Listener error: %s", e)
await asyncio.sleep(1)
def _ddp_queue_get(self, timeout: float):
if not self.ddp:
return None
try:
return self.ddp._msg_queue.get(timeout=timeout)
except queue.Empty:
return None
async def _process_incoming(self, msg_obj: dict):
rid = msg_obj.get("rid")
msg_text = msg_obj.get("msg", "")
msg_id = msg_obj.get("_id", "")
user_info = msg_obj.get("u", {})
sender_id = user_info.get("_id", "")
sender_name = user_info.get("username", "unknown")
if sender_id == self._user_id:
return
if not rid or not msg_text:
return
self._last_activity = time.time()
self._go_online()
room_name = self._room_info.get(rid, {}).get("name", rid)
source = self.build_source(
chat_id=rid,
chat_name=room_name,
chat_type="group" if self._room_info.get(rid, {}).get("type") in ("c", "p") else "dm",
user_id=sender_id,
user_name=sender_name,
)
event = MessageEvent(
text=msg_text,
message_type=MessageType.TEXT,
source=source,
raw_message=msg_obj,
message_id=msg_id,
)
await self.handle_message(event)
def _go_online(self):
if self.ddp and self.ddp.connected:
self.ddp.set_status("online")
self._schedule_idle_timer()
def _go_idle(self):
idle = time.time() - self._last_activity
if idle >= 900 and self.ddp and self.ddp.connected:
self.ddp.set_status("away")
def _schedule_idle_timer(self):
if self._status_timer:
self._status_timer.cancel()
self._status_timer = threading.Timer(900, self._go_idle)
self._status_timer.daemon = True
self._status_timer.start()