early implementation of Microsoft login

This commit is contained in:
əlemi 2022-02-11 02:06:16 +01:00
parent ee7befc4ab
commit 3e4b10521d
7 changed files with 298 additions and 78 deletions

View file

@ -3,8 +3,6 @@ import json
import logging
from .mc.proto.play.clientbound import PacketChat
from .mc.token import Token
from .dispatcher import ConnectionState
from .client import MinecraftClient
from .server import MinecraftServer
from .util.helpers import parse_chat
@ -14,9 +12,9 @@ if __name__ == "__main__":
if sys.argv[1] == "--server":
host = sys.argv[2] if len(sys.argv) > 2 else "localhost"
port = sys.argv[3] if len(sys.argv) > 3 else 25565
port = int(sys.argv[3]) if len(sys.argv) > 3 else 25565
serv = MinecraftServer(host, port)
serv = MinecraftServer(host, port=port)
serv.run() # will block and start asyncio event loop
else:
@ -35,7 +33,7 @@ if __name__ == "__main__":
client = MinecraftClient(host, port, username=username, password=pwd)
@client.on_packet(PacketChat, ConnectionState.PLAY)
@client.on_packet(PacketChat)
async def print_chat(packet: PacketChat):
msg = parse_chat(packet.message, ansi_color=color)
print(f"[{packet.position}] {msg}")

View file

@ -12,7 +12,7 @@ from typing import Dict, List, Callable, Type, Optional, Tuple, AsyncIterator
from .dispatcher import Dispatcher
from .traits import CallbacksHolder, Runnable
from .mc.packet import Packet
from .mc.token import Token, AuthException
from .mc.auth import AuthInterface, AuthException, MojangToken
from .mc.definitions import Dimension, Difficulty, Gamemode, ConnectionState
from .mc.proto.handshaking.serverbound import PacketSetProtocol
from .mc.proto.play.serverbound import PacketKeepAlive as PacketKeepAliveResponse
@ -44,7 +44,7 @@ class MinecraftClient(CallbacksHolder, Runnable):
username:Optional[str]
password:Optional[str]
token:Optional[Token]
token:Optional[AuthInterface]
dispatcher : Dispatcher
_processing : bool
@ -60,7 +60,7 @@ class MinecraftClient(CallbacksHolder, Runnable):
port:int = 25565,
username:Optional[str] = None,
password:Optional[str] = None,
token:Optional[Token] = None,
token:Optional[AuthInterface] = None,
online_mode:bool = True,
**kwargs
):
@ -129,7 +129,7 @@ class MinecraftClient(CallbacksHolder, Runnable):
if not self.username and not self.password:
raise e # we don't have credentials to make a new token, nothing we can really do here
if self.username and self.password:
self.token = await Token.authenticate(self.username, self.password)
self.token = await MojangToken.login(self.username, self.password)
self._logger.info("Authenticated from credentials")
self._authenticated = True
return

View file

@ -0,0 +1,3 @@
from .interface import AuthException, AuthInterface
from .microsoft import MicrosoftAuthenticator
from .mojang import MojangToken

View file

@ -0,0 +1,67 @@
"""Minecraft authentication interface"""
from typing import Optional, Dict, Any
import aiohttp
from ..definitions import GameProfile
class AuthException(Exception):
endpoint : str
code : int
data : dict
kwargs : dict
def __init__(self, endpoint:str, code:int, data:dict, kwargs:dict):
self.endpoint = endpoint
self.code = code
self.data = data
self.kwargs = kwargs
super().__init__(f"[{self.code}:{self.endpoint}] {self.data} : (**{self.kwargs})")
class AuthInterface:
accessToken : str
selectedProfile : GameProfile
SESSION_SERVER = "https://sessionserver.mojang.com/session/minecraft"
# async def authenticate(self, user:str, pwd:str):
# raise NotImplementedError
async def refresh(self):
raise NotImplementedError
async def join(self, server_id) -> dict:
return await self._post(
self.SESSION_SERVER + "/join",
headers={"content-type":"application/json"},
json={
"serverId": server_id,
"accessToken": self.accessToken,
"selectedProfile": self.selectedProfile.as_dict()
}
)
@classmethod # TODO more love for server side!
async def server_join(cls, username:str, serverId:str, ip:Optional[str] = None):
params = {"username":username, "serverId":serverId}
if ip:
params["ip"] = ip
return await cls._get(cls.SESSION_SERVER + "/hasJoined", params=params)
@classmethod
async def _post(cls, endpoint:str, **kwargs) -> Dict[str, Any]:
async with aiohttp.ClientSession() as sess:
async with sess.post(endpoint, **kwargs) as res:
data = await res.json(content_type=None)
if res.status >= 400:
raise AuthException(endpoint, res.status, data, kwargs)
return data
@classmethod
async def _get(cls, endpoint:str, **kwargs) -> Dict[str, Any]:
async with aiohttp.ClientSession() as sess:
async with sess.get(endpoint, **kwargs) as res:
data = await res.json(content_type=None)
if res.status >= 400:
raise AuthException(endpoint, res.status, data, kwargs)
return data

View file

@ -0,0 +1,177 @@
import uuid
from urllib.parse import urlencode
from typing import Dict, Optional, Any
import aiohttp
from ..definitions import GameProfile
from .interface import AuthInterface
class InvalidStateError(Exception):
pass
class MicrosoftAuthenticator(AuthInterface):
client_id : str
client_secret : str
redirect_uri : str
ms_token : Optional[str]
ms_refresh_token : Optional[str]
xbl_token : Optional[str]
xsts_token : Optional[str]
userhash : Optional[str]
mcstore : dict
accessToken : str
selectedProfile : GameProfile
def __init__(self,
client_id:str,
client_secret:str,
redirect_uri:str="http://localhost"
):
self.client_id = client_id
self.client_secret = client_secret
self.redirect_uri = redirect_uri
self.ms_token = None
self.ms_refresh_token = None
self.xbl_token = None
self.xsts_token = None
self.userhash = None
self.mcstore = {}
self.accessToken = ''
self.selectedProfile = GameProfile(id='', name='')
def url(self, state:str=""):
"""Builds MS OAuth url for the user to login"""
return (
f"https://login.live.com/oauth20_authorize.srf?" +
f"client_id={self.client_id}" +
f"&response_type=code" +
f"&redirect_uri={self.redirect_uri}" +
f"&scope=XboxLive.signin%20offline_access" +
f"&state={state}"
)
# TODO implement auth directly from credentials
async def authenticate(self, code:str, state:str=""): # TODO nicer way to get code?
await self._ms_auth(code)
await self._xbl_auth()
await self._xsts_auth()
await self._mc_auth()
await self.fetch_mcstore()
await self.fetch_profile()
async def refresh(self):
if not self.ms_refresh_token:
raise InvalidStateError("Missing MS refresh token")
await self._ms_auth()
await self._xbl_auth()
await self._xsts_auth()
await self._mc_auth()
async def _ms_auth(self, code:str=""):
"""Authorize Microsoft account"""
payload = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"redirect_uri": self.redirect_uri,
"grant_type": "authorization_code",
}
if code:
payload['code'] = code
elif self.ms_refresh_token:
payload['refresh_token'] = self.ms_refresh_token
else:
raise InvalidStateError("Missing auth code and refresh token")
auth_response = await self._post(
"https://login.live.com/oauth20_token.srf",
headers={ "Content-Type": "application/x-www-form-urlencoded" },
data=urlencode(payload)
)
self.ms_token = auth_response['access_token']
self.ms_refresh_token = auth_response['refresh_token']
# maybe store expire_in and other stuff too? TODO
async def _xbl_auth(self):
"""Authorize with XBox Live"""
if not self.ms_token:
raise InvalidStateError("Missing MS access token")
auth_response = await self._post(
"https://user.auth.xboxlive.com/user/authenticate",
headers={
"Content-Type": "application/json",
"Accept": "application/json"
},
json={
"Properties": {
"AuthMethod": "RPS",
"SiteName": "user.auth.xboxlive.com",
"RpsTicket": f"d={self.ms_token}"
},
"RelyingParty": "http://auth.xboxlive.com",
"TokenType": "JWT"
}
)
self.userhash = auth_response["DisplayClaims"]["xui"][0]["uhs"]
self.xbl_token = auth_response["Token"]
async def _xsts_auth(self):
"""Authenticate with XBox Security Tokens"""
if not self.xbl_token:
raise InvalidStateError("Missing XBL Token")
auth_response = await self._post(
"https://xsts.auth.xboxlive.com/xsts/authorize",
headers={
"Content-Type": "application/json",
"Accept": "application/json"
},
json={
"Properties": {
"SandboxId": "RETAIL",
"UserTokens": [
self.xbl_token
]
},
"RelyingParty": "rp://api.minecraftservices.com/",
"TokenType": "JWT"
}
)
self.xsts_token = auth_response["Token"]
if self.userhash != auth_response["DisplayClaims"]["xui"][0]["uhs"]:
raise InvalidStateError("userhash differs from XBL and XSTS")
async def _mc_auth(self):
"""Authenticate with Minecraft"""
if not self.userhash:
raise InvalidStateError("Missing userhash")
if not self.xsts_token:
raise InvalidStateError("Missing XSTS Token")
auth_response = await self._post(
"https://api.minecraftservices.com/authentication/login_with_xbox",
headers={
"Content-Type": "application/json",
"Accept": "application/json"
},
json={
"identityToken": f"XBL3.0 x={self.userhash};{self.xsts_token}"
},
)
self.access_token = auth_response['access_token']
async def fetch_mcstore(self):
"""Get the store information"""
self.mcstore = await self._get(
"https://api.minecraftservices.com/entitlements/mcstore",
headers={ "Authorization": f"Bearer {self.access_token}" },
)
async def fetch_profile(self):
"""Get player profile"""
p = await self._get(
"https://api.minecraftservices.com/minecraft/profile",
headers={ "Authorization": f"Bearer {self.access_token}" },
)
self.profile = GameProfile(id=p['id'], name=p['name'])

View file

@ -8,30 +8,11 @@ from typing import Optional
import aiohttp
class AuthException(Exception):
action : str
type : str
message : str
def __init__(self, action:str, data:dict):
self.type = data["error"] if data and "error" in data else "Unknown"
self.message = data["errorMessage"] if data and "errorMessage" in data else "Token or credentials invalid"
self.action = action.rsplit('/',1)[1]
super().__init__(f"[{self.action}] {self.type} : {self.message}")
from .interface import AuthInterface
from ..definitions import GameProfile
@dataclass
class GameProfile:
id : str
name : str
def as_dict(self):
return {
"id": self.id,
"name": self.name
}
@dataclass
class Token:
class MojangToken(AuthInterface):
username : str
accessToken : str
clientToken : str
@ -40,7 +21,6 @@ class Token:
AGENT_NAME = "Minecraft"
AGENT_VERSION = 1
AUTH_SERVER = "https://authserver.mojang.com"
SESSION_SERVER = "https://sessionserver.mojang.com/session/minecraft"
CONTENT_TYPE = "application/json"
HEADERS = {"content-type": CONTENT_TYPE}
@ -83,7 +63,7 @@ class Token:
)
@classmethod
async def authenticate(cls, username, password, invalidate=False):
async def login(cls, username, password, invalidate=False):
payload = {
"agent": {
"name": cls.AGENT_NAME,
@ -107,16 +87,24 @@ class Token:
@classmethod
async def sign_out(cls, username:str, password:str) -> dict:
return await cls._post(cls.AUTH_SERVER + "/signout", {
return await cls._post(
cls.AUTH_SERVER + "/signout",
headers=cls.HEADERS,
json={
"username": username,
"password": password
})
}
)
async def refresh(self, requestUser:bool = False) -> dict:
res = await self._post(self.AUTH_SERVER + "/refresh", {
res = await self._post(
self.AUTH_SERVER + "/refresh",
headers=self.HEADERS,
json={
"accessToken": self.accessToken,
"clientToken": self.clientToken
})
}
)
self.accessToken = res["accessToken"]
self.clientToken = res["clientToken"]
@ -131,42 +119,18 @@ class Token:
payload = { "accessToken": self.accessToken }
if clientToken:
payload["clientToken"] = self.clientToken
return await self._post(self.AUTH_SERVER + "/validate", payload)
return await self._post(
self.AUTH_SERVER + "/validate",
headers=self.HEADERS,
json=payload,
)
async def invalidate(self) -> dict:
return await self._post(self.AUTH_SERVER + "/invalidate", {
return await self._post(
self.AUTH_SERVER + "/invalidate",
headers=self.HEADERS,
json= {
"accessToken": self.accessToken,
"clientToken": self.clientToken
})
async def join(self, server_id) -> dict:
return await self._post(self.SESSION_SERVER + "/join", {
"serverId": server_id,
"accessToken": self.accessToken,
"selectedProfile": self.selectedProfile.as_dict()
})
@classmethod
async def server_join(cls, username:str, serverId:str, ip:Optional[str] = None):
params = {"username":username, "serverId":serverId}
if ip:
params["ip"] = ip
return await cls._get(cls.SESSION_SERVER + "/hasJoined", params)
@classmethod
async def _post(cls, endpoint:str, data:dict) -> dict:
async with aiohttp.ClientSession() as sess:
async with sess.post(endpoint, headers=cls.HEADERS, json=data) as res:
data = await res.json(content_type=None)
if res.status >= 400:
raise AuthException(endpoint, data)
return data
@classmethod
async def _get(cls, endpoint:str, data:dict) -> dict:
async with aiohttp.ClientSession() as sess:
async with sess.get(endpoint, headers=cls.HEADERS, params=data) as res:
data = await res.json(content_type=None)
if res.status >= 400:
raise AuthException(endpoint, data)
return data
}
)

View file

@ -27,6 +27,17 @@ class Gamemode(Enum):
ADVENTURE = 2
SPECTATOR = 3
@dataclass
class GameProfile:
id : str
name : str
def as_dict(self):
return {
"id": self.id,
"name": self.name
}
class EnchantmentType(Enum):
protection = 0
fire_protection = 1