alemi
eef8bf3d26
changed project structure: no more useless `mc` folder refactored dispatcher: no longer weird stateful builder pattern improved packets: don't need proto number to be created
458 lines
14 KiB
Python
458 lines
14 KiB
Python
#!/usr/bin/env python
|
|
import os
|
|
import json
|
|
import keyword
|
|
import logging
|
|
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
# TODO de-spaghetti this file sometime!
|
|
|
|
DIR_MAP = {"toClient": "clientbound", "toServer": "serverbound"}
|
|
PREFACE = """\"\"\"[!] This file is autogenerated\"\"\"\n\n"""
|
|
IMPORTS = """from typing import Tuple, List, Dict, Union, Optional
|
|
from ....packet import Packet
|
|
from ....types import *
|
|
from ....primitives import *\n"""
|
|
IMPORT_ALL = """__all__ = [\n\t{all}\n]\n"""
|
|
REGISTRY_ENTRY = """
|
|
REGISTRY = {entries}\n"""
|
|
OBJECT = """
|
|
class {name}(Packet):
|
|
__slots__ = {slots}
|
|
{fields}
|
|
|
|
def __init__(self, {constructor}):
|
|
super().__init__({constructed})
|
|
|
|
_state : int = {state}
|
|
|
|
_ids : Dict[int, int] = {ids}
|
|
_definitions : Dict[int, List[Tuple[str, Type]]] = {definitions}
|
|
"""
|
|
|
|
EXT_FORMATTER = """from ..types import *
|
|
|
|
class MetadataDefinitions:
|
|
_definitions: dict[int, dict[int, Type]] = {metadata}
|
|
|
|
class ParticlesDefinitions:
|
|
_definitions: dict[int, dict[int, Type]] = {particles}
|
|
"""
|
|
|
|
class Ref:
|
|
name : str
|
|
args : tuple
|
|
|
|
def __eq__(self, other) -> bool:
|
|
if self.args:
|
|
return self.name == other.name and self.args == other.args
|
|
return self.name == other.name
|
|
|
|
def __hash__(self) -> int:
|
|
return hash(repr(self))
|
|
|
|
def __ge__(self, other):
|
|
return repr(self) >= repr(other)
|
|
|
|
def __gt__(self, other):
|
|
return repr(self) > repr(other)
|
|
|
|
def __init__(self, name:str, *args):
|
|
self.name = name or "anon"
|
|
self.args = args
|
|
|
|
def __str__(self) -> str:
|
|
return repr(self)
|
|
|
|
def __repr__(self) -> str:
|
|
if self.args:
|
|
out = self.name + "("
|
|
for arg in self.args:
|
|
out += repr(arg) + ", "
|
|
out += ")"
|
|
return out
|
|
return self.name
|
|
|
|
TYPE_MAP = {
|
|
"varint": Ref('VarInt'),
|
|
"varlong": Ref('VarLong'),
|
|
"u8": Ref('Byte'),
|
|
"i8": Ref('Byte'),
|
|
"u16": Ref('UnsignedShort'),
|
|
"u32": Ref('UnsignedInt'),
|
|
"u64": Ref('UnsignedLong'),
|
|
"i16": Ref('Short'),
|
|
"i32": Ref('Int'),
|
|
"i64": Ref('Long'),
|
|
"f32": Ref('Float'),
|
|
"f64": Ref('Double'),
|
|
"bool": Ref('Boolean'),
|
|
"UUID": Ref('UUID'),
|
|
"string": Ref('String'),
|
|
"nbt": Ref('NBTTag'),
|
|
"optionalNbt": Ref('OptionalType(NBTTag)'),
|
|
"slot": Ref('Slot'),
|
|
"position": Ref('Position'),
|
|
"entityMetadataItem": Ref('EntityMetadataItem'),
|
|
"entityMetadata": Ref('EntityMetadata'),
|
|
"restBuffer": Ref('TrailingData'),
|
|
"void": Ref('Void'),
|
|
}
|
|
|
|
HINT_MAP = {
|
|
"varint": 'int',
|
|
"varlong": 'int',
|
|
"u8": 'int',
|
|
"i8": 'int',
|
|
"u16": 'int',
|
|
"u32": 'int',
|
|
"u64": 'int',
|
|
"i16": 'int',
|
|
"i32": 'int',
|
|
"i64": 'int',
|
|
"f32": 'float',
|
|
"f64": 'float',
|
|
"bool": 'bool',
|
|
"UUID": 'str',
|
|
"string": 'str',
|
|
"nbt": 'dict',
|
|
"optionalNbt": 'Optional[dict]',
|
|
"slot": 'Item',
|
|
"position": 'tuple',
|
|
"entityMetadata": 'dict',
|
|
"restBuffer": 'bytes',
|
|
"void": 'None',
|
|
"tags": 'list', # TODO this has been added in 1.13
|
|
"bitfield": 'int', # TODO this can be hinted better
|
|
}
|
|
|
|
def _format_line(i, depth:int=0) -> str:
|
|
nl = ('\n' if depth > 0 else " ")
|
|
tab = '\t' * depth
|
|
return nl + tab + \
|
|
f",{nl}{tab}".join(f"{repr(e)}" for e in i) + \
|
|
nl + ('\t' * (depth-1))
|
|
|
|
def format_dict(d:dict, depth:int=1) -> str:
|
|
return "{" + _format_line((Ref(f"{k} : {v}") for k,v in sorted(d.items())), depth) + "}"
|
|
|
|
def format_list(l:list, depth:int=0) -> str:
|
|
return "[" + _format_line(l, depth) + "]"
|
|
|
|
def format_tuple(l:tuple | list, depth:int=0) -> str:
|
|
return "(" + _format_line(l, depth) + ")"
|
|
|
|
def mctype(slot_type:Any) -> Ref:
|
|
if isinstance(slot_type, str) and slot_type in TYPE_MAP:
|
|
return TYPE_MAP[slot_type]
|
|
if isinstance(slot_type, list):
|
|
t = slot_type[0]
|
|
v = slot_type[1]
|
|
if t == "buffer": # Array of bytes
|
|
if "countType" in v and v["countType"] == "integer":
|
|
return Ref('IntegerByteArray')
|
|
return Ref('ByteArray')
|
|
elif t == "array": # Generic array
|
|
return Ref('ArrayType',
|
|
mctype(v["type"]),
|
|
(v["count"] if "count" in v else mctype(v["countType"]) if "countType" in v else Ref('VarInt'))
|
|
)
|
|
elif t == "container": # Struct
|
|
return Ref('StructType', Ref(", ".join(format_tuple((p["name"], mctype(p["type"]))) for p in v if "name" in p))) # some fields are anonymous???
|
|
elif t == "option": # Optional
|
|
return Ref('OptionalType', mctype(v))
|
|
elif t == "switch": # Union
|
|
return Ref('SwitchType',
|
|
v["compareTo"].split('/')[-1],
|
|
Ref(format_dict({int(k) if k.isnumeric() else repr(k):mctype(x) for k,x in v["fields"].items()}, depth=0)),
|
|
mctype(v["default"]) if "default" in v and v['default'] != 'void' else None,
|
|
)
|
|
elif t == "bitfield":
|
|
# TODO can be made better...
|
|
size = 0
|
|
for field in v:
|
|
size += field["size"]
|
|
if size <= 32:
|
|
return Ref('Int')
|
|
return Ref('Long')
|
|
# elif t == "mapper": # ????
|
|
# return TrailingData
|
|
else:
|
|
logging.error("Encountered unknown composite data type : %s", t)
|
|
return Ref('TrailingData')
|
|
|
|
def mchint(slot_type:Any) -> Ref:
|
|
if isinstance(slot_type, str) and slot_type in HINT_MAP:
|
|
return Ref(HINT_MAP[slot_type])
|
|
if isinstance(slot_type, list):
|
|
t = slot_type[0]
|
|
if t == "buffer": # Array of bytes
|
|
return Ref('bytes')
|
|
elif t == "array": # Generic array
|
|
return Ref('list')
|
|
elif t == "container": # Struct TODO make an object with type hints
|
|
return Ref('dict')
|
|
elif t == "option": # Optional
|
|
return Ref('tuple')
|
|
elif t == "switch": # Union
|
|
possibilities = set()
|
|
for val in slot_type[1]['fields'].values():
|
|
possibilities.add(mchint(val))
|
|
if 'default' in slot_type[1]:
|
|
possibilities.add(mchint(slot_type[1]['default']))
|
|
else:
|
|
possibilities.add(Ref('None'))
|
|
return Ref(f'Union[{", ".join(str(s) for s in sorted(possibilities))}]')
|
|
elif t == "bitfield":
|
|
return Ref('int')
|
|
# elif t == "mapper": # ????
|
|
# return TrailingData
|
|
logging.error("Unknown type %s, using 'bytes' as hint", str(slot_type))
|
|
return Ref('bytes')
|
|
|
|
def pytype(t:list) -> str:
|
|
vals = set(str(x) for x in t)
|
|
if len(vals) <= 1:
|
|
return next(iter(vals))
|
|
return 'Union[' + ','.join(x for x in sorted(vals)) + ']'
|
|
|
|
def snake_to_camel(name:str) -> str:
|
|
return "".join(x.capitalize() for x in name.split("_"))
|
|
|
|
# def parse_slot(slot: dict) -> str:
|
|
# name = slot["name"] if "name" in slot else "anon"
|
|
# if keyword.iskeyword(name):
|
|
# name = "is_" + name
|
|
# t = mctype(slot["type"] if "type" in slot else "restBuffer")
|
|
# return f"(\"{name}\", {t.__name__})"
|
|
#
|
|
# def parse_field(slot: dict) -> str:
|
|
# name = slot["name"] if "name" in slot else "anon"
|
|
# if keyword.iskeyword(name):
|
|
# name = "is_" + name
|
|
# t = mctype(slot["type"] if "type" in slot else "restBuffer")
|
|
# return f"{name} : {t._pytype.__name__}"
|
|
|
|
class PacketClassWriter:
|
|
name : str
|
|
attrs : set[str]
|
|
types : dict[str, set[Ref]]
|
|
hints : dict[str, set[Ref]]
|
|
ids : dict[int, int]
|
|
definitions : dict[int, list[tuple[str, Ref]]]
|
|
state : int
|
|
|
|
def __init__(self, pkt:dict, state:int):
|
|
self.name = pkt["name"]
|
|
self.state = state
|
|
self.attrs = set()
|
|
self.ids = {}
|
|
self.types = {}
|
|
self.hints = {}
|
|
self.definitions = {}
|
|
for v, defn in pkt["definitions"].items():
|
|
self.ids[v] = defn["id"]
|
|
self.definitions[v] = []
|
|
for field in defn["slots"]:
|
|
if "name" not in field:
|
|
logging.error("Skipping anonymous field %s", str(field))
|
|
continue
|
|
field_name : str = field["name"] if not keyword.iskeyword(field["name"]) else "is_" + field["name"]
|
|
self.attrs.add(field_name)
|
|
self.definitions[v].append((field_name, mctype(field["type"])))
|
|
if field_name not in self.types:
|
|
self.types[field_name] = set()
|
|
self.types[field_name].add(mctype(field["type"]))
|
|
if field_name not in self.hints:
|
|
self.hints[field_name] = set()
|
|
self.hints[field_name].add(mchint(field["type"]))
|
|
|
|
def compile(self) -> str:
|
|
return PREFACE + \
|
|
IMPORTS + \
|
|
OBJECT.format(
|
|
name=self.name,
|
|
ids=format_dict(self.ids, depth=2),
|
|
definitions=format_dict({ k : Ref(format_list([Ref(format_tuple(x)) for x in v])) for k,v in self.definitions.items() }, depth=2),
|
|
slots=format_tuple(["id"] + sorted(self.attrs), depth=0), # TODO jank fix when no slots
|
|
fields="\n\t" + "\n\t".join(f"{a} : {pytype(sorted(self.hints[a]))}" for a in sorted(self.attrs)),
|
|
state=self.state,
|
|
constructor=_format_line([Ref(f"{field}:{pytype(sorted(self.hints[field]))} | None = None") for field in sorted(self.attrs)] + [Ref("**kwargs")], depth=2),
|
|
constructed=_format_line((Ref(f"{field}={field}") for field in sorted(self.attrs)), depth=3),
|
|
)
|
|
|
|
class RegistryClassWriter:
|
|
registry : dict
|
|
|
|
def __init__(self, registry:dict):
|
|
self.registry = registry
|
|
|
|
def compile(self) -> str:
|
|
return REGISTRY_ENTRY.format(
|
|
entries=format_dict({
|
|
v : format_dict(
|
|
self.registry[v], depth=0
|
|
) for v in self.registry
|
|
})
|
|
)
|
|
|
|
# entries='{\n\t' + ",\n\t".join((
|
|
# str(v) + " : { " + ", ".join(
|
|
# f"{pid}:{clazz}" for (pid, clazz) in self.registry[v].items()
|
|
# ) + ' }' ) for v in self.registry.keys()
|
|
# ) + '\n}'
|
|
# )
|
|
|
|
def _make_module(path:Path, contents:dict):
|
|
os.mkdir(path)
|
|
imports = ""
|
|
for key, value in contents.items():
|
|
imports += f"from .{key} import {value}\n"
|
|
with open(path / "__init__.py", "w") as f:
|
|
f.write(PREFACE + imports)
|
|
|
|
def compile():
|
|
import shutil
|
|
import zipfile
|
|
from urllib.request import urlretrieve
|
|
|
|
base_path = Path(os.getcwd())
|
|
mc_path = base_path / 'aiocraft'
|
|
|
|
# Retrieve proto definitions from PrismarineJS/minecraft-data
|
|
urlretrieve("https://github.com/PrismarineJS/minecraft-data/zipball/master", mc_path / "minecraft-data.zip")
|
|
|
|
with zipfile.ZipFile(mc_path / 'minecraft-data.zip', 'r') as f:
|
|
f.extractall(mc_path)
|
|
|
|
# First folder starting with PrismarineJS
|
|
folder_name = next(folder for folder in os.listdir(mc_path) if folder.startswith("PrismarineJS"))
|
|
|
|
if os.path.isdir(mc_path / 'proto'):
|
|
shutil.rmtree(mc_path / 'proto')
|
|
|
|
PACKETS = {
|
|
"handshaking": {
|
|
"clientbound": {},
|
|
"serverbound": {},
|
|
},
|
|
"status": {
|
|
"clientbound": {},
|
|
"serverbound": {},
|
|
},
|
|
"login": {
|
|
"clientbound": {},
|
|
"serverbound": {},
|
|
},
|
|
"play": {
|
|
"clientbound": {},
|
|
"serverbound": {},
|
|
}
|
|
}
|
|
|
|
METADATA = {}
|
|
PARTICLES = {}
|
|
|
|
all_versions = os.listdir(mc_path / f'{folder_name}/data/pc/')
|
|
all_versions.remove("common")
|
|
all_proto_numbers = []
|
|
# _make_module(mc_path / 'proto', { f"v{v.replace('.', '_').replace('-', '_')}":"*" for v in all_versions })
|
|
|
|
for v in all_versions:
|
|
if v == "0.30c":
|
|
continue # Proto just too antique!
|
|
if not os.path.isfile(mc_path / f'{folder_name}/data/pc/{v}/protocol.json') \
|
|
or not os.path.isfile(mc_path / f'{folder_name}/data/pc/{v}/version.json'):
|
|
continue
|
|
with open(mc_path / f'{folder_name}/data/pc/{v}/version.json') as f:
|
|
proto_version = json.load(f)['version']
|
|
|
|
if proto_version < 47 or proto_version > 761:
|
|
continue # avoid versions before 1.8 and past 1.19
|
|
|
|
all_proto_numbers.append(proto_version)
|
|
|
|
with open(mc_path / f'{folder_name}/data/pc/{v}/protocol.json') as f:
|
|
data = json.load(f)
|
|
|
|
METADATA[proto_version] = {}
|
|
for meta_id, meta_type in data["types"]["entityMetadataItem"][1]["fields"].items():
|
|
METADATA[proto_version][int(meta_id)] = mctype(meta_type)
|
|
|
|
if "particleData" in data["types"]:
|
|
PARTICLES[proto_version] = {}
|
|
for p_id, p_type in data["types"]["particleData"][1]["fields"].items():
|
|
PARTICLES[proto_version][int(p_id)] = mctype(p_type)
|
|
|
|
# Build data structure containing all packets with all their definitions for different versions
|
|
for state in ("handshaking", "status", "login", "play"):
|
|
for _direction in ("toClient", "toServer"):
|
|
direction = DIR_MAP[_direction]
|
|
try:
|
|
buf = data[state][_direction]["types"]["packet"][1][0]["type"][1]["mappings"]
|
|
except KeyError:
|
|
logging.exception("Exception building %s|%s|%s definitions", v, state, direction)
|
|
print("Exception building %s|%s|%s definitions" % (v, state, direction))
|
|
# _make_module(mc_path / f"proto/{version}/{state}/{direction}", {})
|
|
continue
|
|
registry = { f"packet_{value}" : int(key, 16) for (key, value) in buf.items() }
|
|
for p_name in data[state][_direction]["types"].keys():
|
|
if p_name == "packet":
|
|
continue # it's the registry entry
|
|
if p_name not in registry:
|
|
logging.warning("Trying to make definitions for packet '%s'", p_name)
|
|
continue # wtf!
|
|
packet = data[state][_direction]["types"][p_name]
|
|
pid = registry[p_name]
|
|
class_name = snake_to_camel(p_name)
|
|
if p_name not in PACKETS[state][direction]:
|
|
PACKETS[state][direction][p_name] = {
|
|
"name" : class_name,
|
|
"definitions" : {},
|
|
}
|
|
PACKETS[state][direction][p_name]["definitions"][proto_version] = {
|
|
"id": pid,
|
|
"slots" : packet[1],
|
|
}
|
|
|
|
_STATE_MAP = {"handshaking": 0, "status":1, "login":2, "play":3}
|
|
|
|
_make_module(mc_path / 'proto', { k:"*" for k in PACKETS.keys() })
|
|
|
|
with open(mc_path / 'proto' / 'ext.py', 'w') as f:
|
|
f.write(
|
|
EXT_FORMATTER.format(
|
|
metadata = format_dict(METADATA, depth=2),
|
|
particles = format_dict(PARTICLES, depth=2),
|
|
)
|
|
)
|
|
|
|
for state in PACKETS.keys():
|
|
_make_module(mc_path / f"proto/{state}", { k:"*" for k in PACKETS[state].keys() })
|
|
for direction in PACKETS[state].keys():
|
|
registry = {}
|
|
|
|
_make_module(mc_path / f"proto/{state}/{direction}", { k:snake_to_camel(k) for k in PACKETS[state][direction].keys() })
|
|
for packet in PACKETS[state][direction].keys():
|
|
pkt = PACKETS[state][direction][packet]
|
|
|
|
for v, defn in pkt["definitions"].items():
|
|
if v not in registry:
|
|
registry[v] = {}
|
|
registry[v][defn['id']] = snake_to_camel(packet)
|
|
|
|
with open(mc_path / f"proto/{state}/{direction}/{packet}.py", "w") as f:
|
|
f.write(PacketClassWriter(pkt, _STATE_MAP[state]).compile())
|
|
|
|
with open(mc_path / f"proto/{state}/{direction}/__init__.py", "a") as f:
|
|
f.write(RegistryClassWriter(registry).compile())
|
|
|
|
if __name__ == "__main__":
|
|
compile()
|
|
|
|
|
|
|
|
|
|
|