some work on packet flow

This commit is contained in:
əlemi 2021-10-15 01:11:43 +02:00 committed by alemidev
parent 4bd0f96dc7
commit aa92bfe3cd
2 changed files with 59 additions and 18 deletions

View file

@ -2,8 +2,12 @@ import asyncio
from asyncio import Task
from enum import Enum
from typing import Dict
from .dispatcher import Dispatcher
from .mc.mctypes import VarInt
from .mc.packet import Packet
from .mc import proto
class ConnectionState(Enum):
HANDSHAKING = 0
@ -11,12 +15,24 @@ class ConnectionState(Enum):
LOGIN = 2
PLAY = 3
def _registry_from_state(state:ConnectionState) -> Dict[int, Dict[int, Packet]]:
if state == ConnectionState.HANDSHAKING:
return proto.handshaking.clientbound.REGISTRY
if state == ConnectionState.STATUS:
return proto.status.clientbound.REGISTRY
if state == ConnectionState.LOGIN:
return proto.login.clientbound.REGISTRY
if state == ConnectionState.PLAY:
return proto.play.clientbound.REGISTRY
class Client:
host:str
port:int
dispatcher : Dispatcher
proto : int
state : ConnectionState
dispatcher : Dispatcher
_processing : bool
_worker : Task
@ -30,6 +46,7 @@ class Client:
self.host = host
self.port = port
self.proto = 340 # TODO default to max available proto
self.state = ConnectionState.HANDSHAKING
self.dispatcher = Dispatcher(host, port)
self._processing = False
@ -44,27 +61,28 @@ class Client:
async def _logic_worker(self):
while self._processing:
buffer = await self.dispatcher.incoming.get()
if self.state == ConnectionState.HANDSHAKING:
self.handshaking_logic(buffer)
elif self.state == ConnectionState.LOGIN:
self.login_logic(buffer)
elif self.state == ConnectionState.PLAY:
self.play_logic(buffer)
packet_id = VarInt.deserialize(buffer)
cls = PacketRegistry.state(self.state).clientbound.get(packet_id)
packet = cls(buffer)
cls = _registry_from_state(self.state)[self.proto][packet_id]
packet = cls.deserialize(buffer)
# Process packets? switch state, invoke callbacks? Maybe implement Reactors?
if self.state == ConnectionState.HANDSHAKING:
self.handshaking_logic(packet)
elif self.state == ConnectionState.LOGIN:
self.login_logic(packet)
elif self.state == ConnectionState.PLAY:
self.play_logic(packet)
async def handshaking_logic(self, buffer: bytes):
async def handshaking_logic(self, packet:Packet):
pass
async def status_logic(self, buffer: bytes):
async def status_logic(self, packet:Packet):
pass
async def login_logic(self, buffer: bytes):
async def login_logic(self, packet:Packet):
pass
async def play_logic(self, buffer: bytes):
async def play_logic(self, packet:Packet):
pass

View file

@ -2,6 +2,8 @@ import asyncio
from asyncio import StreamReader, StreamWriter, Queue, Task
from enum import Enum
from .mc import proto
class InvalidState(Exception):
pass
@ -17,6 +19,13 @@ async def read_varint(stream: asyncio.StreamReader) -> int:
off += 1
return buf
_STATE_REGS = {
ConnectionStatus.HANDSHAKING : proto.handshaking,
ConnectionStatus.STATUS : proto.status,
ConnectionStatus.LOGIN : proto.login,
ConnectionStatus.PLAY : proto.play,
}
class Dispatcher:
_down : StreamReader
_up : StreamWriter
@ -42,17 +51,34 @@ class Dispatcher:
)
self.connected = True
packet_handshake = proto.handshaking.serverbound.PacketSetProtocol(
self.proto,
protocolVersion=self.proto,
serverHost=self.host,
serverPost=self.port,
nextState=3, # play
)
packet_login = proto.login.serverbound.PacketLoginStart(340, username=self.username)
await self.outgoing.put(packet_handshake)
await self.outgoing.put(packet_login)
self._dispatching = True
self._reader = asyncio.get_event_loop().create_task(self._down_worker())
self._writer = asyncio.get_event_loop().create_task(self._up_worker())
async def _down_worker(self):
while self._dispatching:
length = await read_varint(self._down)
buffer = await self._down.read(length)
# TODO encryption
# TODO compression
await self.incoming.put(packet)
await self.incoming.put(buffer)
async def _up_worker(self):
while self._dispatching:
buffer = await self.outgoing.get()
packet = await self.outgoing.get()
buffer = packet.serialize()
length = len(buffer)
# TODO compression
# TODO encryption
@ -62,9 +88,6 @@ class Dispatcher:
if self.connected:
raise InvalidState("Dispatcher already connected")
await self.connect()
self._dispatching = True
self._reader = asyncio.get_event_loop().create_task(self._down_worker())
self._writer = asyncio.get_event_loop().create_task(self._up_worker())
async def stop(self):
self._dispatching = False