reimplemented mojang authenticator

This commit is contained in:
əlemi 2022-04-06 21:26:49 +02:00
parent d8059d246d
commit a75f0ff9e4
No known key found for this signature in database
GPG key ID: BBCBFE5D7244634E
4 changed files with 43 additions and 37 deletions

View file

@ -1,3 +1,3 @@
from .interface import AuthException, AuthInterface
from .microsoft import MicrosoftAuthenticator
from .mojang import MojangToken
from .mojang import MojangAuthenticator

View file

@ -1,10 +1,13 @@
"""Minecraft authentication interface"""
import logging
from typing import Optional, Dict, Any
import aiohttp
from ..definitions import GameProfile
logger = logging.getLogger(__file__)
class AuthException(Exception):
endpoint : str
code : int
@ -25,10 +28,19 @@ class AuthInterface:
SESSION_SERVER = "https://sessionserver.mojang.com/session/minecraft"
TIMEOUT = aiohttp.ClientTimeout(total=3)
# async def authenticate(self, user:str, pwd:str):
# raise NotImplementedError
async def login(self, *args) -> 'AuthInterface':
raise NotImplementedError
async def refresh(self):
async def refresh(self) -> 'AuthInterface':
raise NotImplementedError
async def validate(self) -> 'AuthInterface':
raise NotImplementedError
def serialize(self) -> Dict[str, Any]:
raise NotImplementedError
def deserialize(self, data:Dict[str, Any]) -> 'AuthInterface':
raise NotImplementedError
async def join(self, server_id) -> dict:
@ -51,18 +63,20 @@ class AuthInterface:
@classmethod
async def _post(cls, endpoint:str, **kwargs) -> Dict[str, Any]:
async with aiohttp.ClientSession(timeout=cls.TIMEOUT) as sess:
async with sess.post(endpoint, **kwargs) as res:
async with aiohttp.ClientSession(timeout=cls.TIMEOUT) as session:
async with session.post(endpoint, **kwargs) as res:
data = await res.json(content_type=None)
logger.debug("POST /%s [%s] : %s", endpoint, str(kwargs), str(data))
if res.status >= 400:
raise AuthException(endpoint, res.status, data, kwargs)
return data
@classmethod
async def _get(cls, endpoint:str, **kwargs) -> Dict[str, Any]:
async with aiohttp.ClientSession(timeout=cls.TIMEOUT) as sess:
async with sess.get(endpoint, **kwargs) as res:
async with aiohttp.ClientSession(timeout=cls.TIMEOUT) as session:
async with session.get(endpoint, **kwargs) as res:
data = await res.json(content_type=None)
logger.debug("GET /%s [%s] : %s", endpoint, str(kwargs), str(data))
if res.status >= 400:
raise AuthException(endpoint, res.status, data, kwargs)
return data

View file

@ -9,7 +9,7 @@ from yarl import URL
import aiohttp
from ..definitions import GameProfile
from .interface import AuthInterface
from .interface import AuthInterface, AuthException
class InvalidStateError(Exception):
pass

View file

@ -12,7 +12,7 @@ from .interface import AuthInterface
from ..definitions import GameProfile
@dataclass
class MojangToken(AuthInterface):
class MojangAuthenticator(AuthInterface):
username : str
accessToken : str
clientToken : str
@ -24,6 +24,9 @@ class MojangToken(AuthInterface):
CONTENT_TYPE = "application/json"
HEADERS = {"content-type": CONTENT_TYPE}
def __init__(self):
pass
def __equals__(self, other) -> bool:
if not isinstance(other, self.__class__):
return False
@ -40,7 +43,7 @@ class MojangToken(AuthInterface):
def __str__(self) -> str:
return repr(self)
def as_dict(self):
def serialize(self):
return {
"username":self.username,
"accessToken":self.accessToken,
@ -48,26 +51,17 @@ class MojangToken(AuthInterface):
"selectedProfile": self.selectedProfile.as_dict(),
}
@classmethod
def from_file(cls, fname:str):
with open(fname) as f:
return cls.from_dict(json.load(f))
def deserialize(self, data:dict):
self.username=data["username"] if "username" in data else data["selectedProfile"]["name"],
self.accessToken=data["accessToken"],
self.clientToken=data["clientToken"],
self.selectedProfile=GameProfile(**data["selectedProfile"])
@classmethod
def from_dict(cls, data:dict):
return cls(
username=data["username"] if "username" in data else data["selectedProfile"]["name"],
accessToken=data["accessToken"],
clientToken=data["clientToken"],
selectedProfile=GameProfile(**data["selectedProfile"]),
)
@classmethod
async def login(cls, username, password, invalidate=False):
async def login(self, username:str, password:str, invalidate=False) -> AuthInterface:
payload = {
"agent": {
"name": cls.AGENT_NAME,
"version": cls.AGENT_VERSION
"name": self.AGENT_NAME,
"version": self.AGENT_VERSION
},
"username": username,
"password": password
@ -76,14 +70,14 @@ class MojangToken(AuthInterface):
if not invalidate:
payload["clientToken"] = uuid.uuid4().hex
res = await cls._post(cls.AUTH_SERVER + "/authenticate", payload)
res = await self._post(self.AUTH_SERVER + "/authenticate", payload)
return cls(
username=username,
accessToken=res["accessToken"],
clientToken=res["clientToken"],
selectedProfile=GameProfile(**res["selectedProfile"])
)
self.username=username,
self.accessToken=res["accessToken"],
self.clientToken=res["clientToken"],
self.selectedProfile=GameProfile(**res["selectedProfile"])
return self
@classmethod
async def sign_out(cls, username:str, password:str) -> dict:
@ -113,8 +107,6 @@ class MojangToken(AuthInterface):
if "user" in res:
self.username = res["user"]["username"]
return res
async def validate(self, clientToken:bool = True) -> dict:
payload = { "accessToken": self.accessToken }
if clientToken: