feat: allow overriding auth and session servers

This commit is contained in:
əlemi 2023-02-16 18:07:21 +01:00
parent 7bc7a14457
commit 2190554f71
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 39 additions and 24 deletions

View file

@ -26,10 +26,14 @@ class AuthException(Exception):
class AuthInterface: class AuthInterface:
accessToken : str accessToken : str
selectedProfile : GameProfile selectedProfile : GameProfile
session_server_override : str
SESSION_SERVER = "https://sessionserver.mojang.com/session/minecraft" SESSION_SERVER = "https://sessionserver.mojang.com/session/minecraft"
TIMEOUT = aiohttp.ClientTimeout(total=3) TIMEOUT = aiohttp.ClientTimeout(total=3)
def __init__(self):
raise NotImplementedError
async def login(self) -> 'AuthInterface': async def login(self) -> 'AuthInterface':
raise NotImplementedError raise NotImplementedError
@ -45,9 +49,13 @@ class AuthInterface:
def deserialize(self, data:Dict[str, Any]): def deserialize(self, data:Dict[str, Any]):
raise NotImplementedError raise NotImplementedError
@property
def session_server(self) -> str:
return self.session_server_override or self.SESSION_SERVER
async def join(self, server_id) -> dict: async def join(self, server_id) -> dict:
return await self._post( return await self._post(
self.SESSION_SERVER + "/join", self.session_server + "/join",
headers={"content-type":"application/json"}, headers={"content-type":"application/json"},
json={ json={
"serverId": server_id, "serverId": server_id,
@ -56,16 +64,14 @@ class AuthInterface:
} }
) )
@classmethod # TODO more love for server side! async def server_join(self, username:str, serverId:str, ip:Optional[str] = None):
async def server_join(cls, username:str, serverId:str, ip:Optional[str] = None):
params = {"username":username, "serverId":serverId} params = {"username":username, "serverId":serverId}
if ip: if ip:
params["ip"] = ip params["ip"] = ip
return await cls._get(cls.SESSION_SERVER + "/hasJoined", params=params) return await self._get(self.session_server + "/hasJoined", params=params)
@classmethod async def _post(self, endpoint:str, **kwargs) -> Dict[str, Any]:
async def _post(cls, endpoint:str, **kwargs) -> Dict[str, Any]: async with aiohttp.ClientSession(timeout=self.TIMEOUT) as session:
async with aiohttp.ClientSession(timeout=cls.TIMEOUT) as session:
try: try:
async with session.post(endpoint, **kwargs) as res: async with session.post(endpoint, **kwargs) as res:
try: try:
@ -79,9 +85,8 @@ class AuthInterface:
except TimeoutError: except TimeoutError:
raise AuthException(endpoint, 0, {"error": "request timed out"}, kwargs) raise AuthException(endpoint, 0, {"error": "request timed out"}, kwargs)
@classmethod async def _get(self, endpoint:str, **kwargs) -> Dict[str, Any]:
async def _get(cls, endpoint:str, **kwargs) -> Dict[str, Any]: async with aiohttp.ClientSession(timeout=self.TIMEOUT) as session:
async with aiohttp.ClientSession(timeout=cls.TIMEOUT) as session:
try: try:
async with session.get(endpoint, **kwargs) as res: async with session.get(endpoint, **kwargs) as res:
try: try:

View file

@ -1,23 +1,23 @@
"""Minecraft identity utilities.""" """Minecraft identity utilities."""
import json import json
import uuid import uuid
import logging
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional, Dict, Any from typing import Optional, Dict, Any
import aiohttp
from .interface import AuthInterface, AuthException from .interface import AuthInterface, AuthException
from ..definitions import GameProfile from ..definitions import GameProfile
@dataclass @dataclass
class MojangAuthenticator(AuthInterface): class MojangAuthenticator(AuthInterface):
#selectedProfile : GameProfile
#accessToken : str
#session_server_override : str
username : str username : str
password : Optional[str] password : Optional[str]
accessToken : str
clientToken : str clientToken : str
selectedProfile : GameProfile auth_server_override : str
AGENT_NAME = "Minecraft" AGENT_NAME = "Minecraft"
AGENT_VERSION = 1 AGENT_VERSION = 1
@ -25,12 +25,19 @@ class MojangAuthenticator(AuthInterface):
CONTENT_TYPE = "application/json" CONTENT_TYPE = "application/json"
HEADERS = {"content-type": CONTENT_TYPE} HEADERS = {"content-type": CONTENT_TYPE}
def __init__(self, username:str="", password:Optional[str]=None): def __init__(
self, username:str="",
password:Optional[str]=None,
session_server_override:Optional[str]=None,
auth_server_override:Optional[str]=None,
):
self.username = username self.username = username
self.password = password self.password = password
self.accessToken = "" self.accessToken = ""
self.clientToken = "" self.clientToken = ""
self.selectedProfile = GameProfile("", username) self.selectedProfile = GameProfile("", username)
self.session_server_override = session_server_override
self.auth_server_override = auth_server_override
def __equals__(self, other) -> bool: def __equals__(self, other) -> bool:
if not isinstance(other, self.__class__): if not isinstance(other, self.__class__):
@ -48,6 +55,10 @@ class MojangAuthenticator(AuthInterface):
def __str__(self) -> str: def __str__(self) -> str:
return repr(self) return repr(self)
@property
def auth_server(self) -> str:
return self.auth_server_override or self.AUTH_SERVER
def serialize(self) -> Dict[str, Any]: def serialize(self) -> Dict[str, Any]:
return { return {
"username":self.username, "username":self.username,
@ -75,7 +86,7 @@ class MojangAuthenticator(AuthInterface):
payload["clientToken"] = uuid.uuid4().hex # don't include this to invalidate all other sessions payload["clientToken"] = uuid.uuid4().hex # don't include this to invalidate all other sessions
res = await self._post(self.AUTH_SERVER + "/authenticate", json=payload) res = await self._post(self.auth_server + "/authenticate", json=payload)
self.accessToken=res["accessToken"] self.accessToken=res["accessToken"]
self.clientToken=res["clientToken"] self.clientToken=res["clientToken"]
@ -83,11 +94,10 @@ class MojangAuthenticator(AuthInterface):
return self return self
@classmethod async def sign_out(self, username:str, password:str) -> dict:
async def sign_out(cls, username:str, password:str) -> dict: return await self._post(
return await cls._post( self.auth_server + "/signout",
cls.AUTH_SERVER + "/signout", headers=self.HEADERS,
headers=cls.HEADERS,
json={ json={
"username": username, "username": username,
"password": password "password": password
@ -98,7 +108,7 @@ class MojangAuthenticator(AuthInterface):
if not self.accessToken or not self.clientToken: if not self.accessToken or not self.clientToken:
raise AuthException("/refresh", 0, {"message":"No access token or client token"}, {}) raise AuthException("/refresh", 0, {"message":"No access token or client token"}, {})
res = await self._post( res = await self._post(
self.AUTH_SERVER + "/refresh", self.auth_server + "/refresh",
headers=self.HEADERS, headers=self.HEADERS,
json={ json={
"accessToken": self.accessToken, "accessToken": self.accessToken,
@ -122,7 +132,7 @@ class MojangAuthenticator(AuthInterface):
if clientToken: if clientToken:
payload["clientToken"] = self.clientToken payload["clientToken"] = self.clientToken
await self._post( await self._post(
self.AUTH_SERVER + "/validate", self.auth_server + "/validate",
headers=self.HEADERS, headers=self.HEADERS,
json=payload, json=payload,
) )
@ -130,7 +140,7 @@ class MojangAuthenticator(AuthInterface):
async def invalidate(self) -> AuthInterface: async def invalidate(self) -> AuthInterface:
await self._post( await self._post(
self.AUTH_SERVER + "/invalidate", self.auth_server + "/invalidate",
headers=self.HEADERS, headers=self.HEADERS,
json= { json= {
"accessToken": self.accessToken, "accessToken": self.accessToken,