aioappsrv/appservice/appservice.py

222 lines
6.2 KiB
Python

import json
import logging
import urllib.parse
import uuid
from typing import List, Union
import bottle
import urllib3
import matrix
from misc import dict_cls, except_deleted, log_except, request
class AppService(bottle.Bottle):
def __init__(self, config: dict, http: urllib3.PoolManager) -> None:
super(AppService, self).__init__()
self.as_token = config["as_token"]
self.hs_token = config["hs_token"]
self.base_url = config["homeserver"]
self.server_name = config["server_name"]
self.user_id = f"@{config['user_id']}:{self.server_name}"
self.http = http
self.logger = logging.getLogger("appservice")
# Map events to functions.
self.mapping = {
"m.room.member": "on_member",
"m.room.message": "on_message",
"m.room.redaction": "on_redaction",
}
# Add route for bottle.
self.route(
"/transactions/<transaction>",
callback=self.receive_event,
method="PUT",
)
def handle_event(self, event: dict) -> None:
event_type = event.get("type")
if event_type == "m.room.member" or event_type == "m.room.message":
obj = self.get_event_object(event)
elif event_type == "m.room.redaction":
obj = event
else:
self.logger.info(f"Unknown event type: {event_type}")
return
func = getattr(self, self.mapping[event_type], None)
if not func:
self.logger.warning(
f"Function '{func}' not defined, ignoring event."
)
return
# We don't catch exceptions here as the homeserver will re-send us
# the event in case of a failure.
func(obj)
@log_except
def receive_event(self, transaction: str) -> dict:
"""
Verify the homeserver's token and handle events.
"""
hs_token = bottle.request.query.getone("access_token")
if not hs_token:
bottle.response.status = 401
return {"errcode": "APPSERVICE_UNAUTHORIZED"}
if hs_token != self.hs_token:
bottle.response.status = 403
return {"errcode": "APPSERVICE_FORBIDDEN"}
events = bottle.request.json.get("events")
for event in events:
self.handle_event(event)
return {}
def mxc_url(self, mxc: str) -> str:
try:
homeserver, media_id = mxc.replace("mxc://", "").split("/")
converted = (
f"https://{self.server_name}/_matrix/media/r0/download/"
f"{homeserver}/{media_id}"
)
except ValueError:
converted = ""
return converted
def get_event_object(self, event: dict) -> matrix.Event:
# TODO use caching and invalidate old cache on member events.
event["author"] = dict_cls(
self.get_profile(event["sender"]), matrix.User
)
return matrix.Event(event)
def join_room(self, room_id: str, mxid: str = "") -> None:
self.send(
"POST",
f"/join/{room_id}",
params={"user_id": mxid} if mxid else {},
)
def redact(self, event_id: str, room_id: str, mxid: str = "") -> None:
self.send(
"PUT",
f"/rooms/{room_id}/redact/{event_id}/{uuid.uuid4()}",
params={"user_id": mxid} if mxid else {},
)
def get_profile(self, mxid: str) -> dict:
resp = except_deleted(self.send)("GET", f"/profile/{mxid}")
# No profile exists for the user.
if not resp:
return {}
avatar_url = resp.get("avatar_url")
if avatar_url:
avatar_url = self.mxc_url(avatar_url)
return {
"avatar_url": avatar_url,
"displayname": resp.get("displayname"),
}
def get_members(self, room_id: str) -> List[str]:
resp = self.send(
"GET",
f"/rooms/{room_id}/members",
params={"membership": "join", "not_membership": "leave"},
)
return [
content["sender"]
for content in resp["chunk"]
if content["content"]["membership"] == "join"
]
def get_room_id(self, alias: str) -> str:
resp = self.send("GET", f"/directory/room/{urllib.parse.quote(alias)}")
# TODO cache ?
return resp["room_id"]
def upload(self, url: str) -> str:
"""
Upload a file to the homeserver and get the MXC url.
"""
resp = self.http.request("GET", url)
resp = self.send(
"POST",
content=resp.data,
content_type=resp.headers.get("Content-Type"),
params={"filename": f"{uuid.uuid4()}"},
endpoint="/_matrix/media/r0/upload",
)
return resp["content_uri"]
def send_message(
self,
room_id: str,
content: dict,
mxid: str = "",
) -> str:
resp = self.send(
"PUT",
f"/rooms/{room_id}/send/m.room.message/{uuid.uuid4()}",
content,
{"user_id": mxid} if mxid else {},
)
return resp["event_id"]
def send_typing(
self, room_id: str, mxid: str = "", timeout: int = 8000
) -> None:
self.send(
"PUT",
f"/rooms/{room_id}/typing/{mxid}",
{"typing": True, "timeout": timeout},
{"user_id": mxid} if mxid else {},
)
def send_invite(self, room_id: str, mxid: str) -> None:
self.send("POST", f"/rooms/{room_id}/invite", {"user_id": mxid})
@request
def send(
self,
method: str,
path: str = "",
content: Union[bytes, dict] = {},
params: dict = {},
content_type: str = "application/json",
endpoint: str = "/_matrix/client/r0",
) -> dict:
params["access_token"] = self.as_token
headers = {"Content-Type": content_type}
content = json.dumps(content) if isinstance(content, dict) else content
endpoint = (
f"{self.base_url}{endpoint}{path}?"
f"{urllib.parse.urlencode(params)}"
)
return self.http.request(
method, endpoint, body=content, headers=headers
)