aiocraft/compiler/proto.py
alemi 63da8ddcd5
fix: generate metadata and particles, fix switch
now entity metadata and particle data are loaded from minecraft-data and
thus should just work ™️ for all proto versions

also fixed an ugly bug in switch type about string 'true' and 'false'.
"fixed" is kind of an overstatement, more like "awfully patched hoping
it's an exception and i don't need to parse numbers too" ...
2023-11-01 22:08:52 +01:00

458 lines
14 KiB
Python

#!/usr/bin/env python
import os
import json
import keyword
import logging
from pathlib import Path
from typing import Any
# TODO de-spaghetti this file sometime!
DIR_MAP = {"toClient": "clientbound", "toServer": "serverbound"}
PREFACE = """\"\"\"[!] This file is autogenerated\"\"\"\n\n"""
IMPORTS = """from typing import Tuple, List, Dict, Union, Optional
from ....packet import Packet
from ....definitions import *
from ....types import *\n"""
IMPORT_ALL = """__all__ = [\n\t{all}\n]\n"""
REGISTRY_ENTRY = """
REGISTRY = {entries}\n"""
OBJECT = """
class {name}(Packet):
__slots__ = {slots}
{fields}
def __init__(self, proto:int,{constructor}):
super().__init__(proto,{constructed})
_state : int = {state}
_ids : Dict[int, int] = {ids}
_definitions : Dict[int, List[Tuple[str, Type]]] = {definitions}
"""
EXT_FORMATTER = """from ..types import *
class MetadataDefinitions:
_definitions: dict[int, dict[int, Type]] = {metadata}
class ParticlesDefinitions:
_definitions: dict[int, dict[int, Type]] = {particles}
"""
class Ref:
name : str
args : tuple
def __eq__(self, other) -> bool:
if self.args:
return self.name == other.name and self.args == other.args
return self.name == other.name
def __hash__(self) -> int:
return hash(repr(self))
def __ge__(self, other):
return repr(self) >= repr(other)
def __gt__(self, other):
return repr(self) > repr(other)
def __init__(self, name:str, *args):
self.name = name or "anon"
self.args = args
def __str__(self) -> str:
return repr(self)
def __repr__(self) -> str:
if self.args:
out = self.name + "("
for arg in self.args:
out += repr(arg) + ", "
out += ")"
return out
return self.name
TYPE_MAP = {
"varint": Ref('VarInt'),
"varlong": Ref('VarLong'),
"u8": Ref('Byte'),
"i8": Ref('Byte'),
"u16": Ref('UnsignedShort'),
"u32": Ref('UnsignedInt'),
"u64": Ref('UnsignedLong'),
"i16": Ref('Short'),
"i32": Ref('Int'),
"i64": Ref('Long'),
"f32": Ref('Float'),
"f64": Ref('Double'),
"bool": Ref('Boolean'),
"UUID": Ref('UUID'),
"string": Ref('String'),
"nbt": Ref('NBTTag'),
"optionalNbt": Ref('OptionalType(NBTTag)'),
"slot": Ref('Slot'),
"position": Ref('Position'),
"entityMetadataItem": Ref('EntityMetadataItem'),
"entityMetadata": Ref('EntityMetadata'),
"restBuffer": Ref('TrailingData'),
"void": Ref('Void'),
}
HINT_MAP = {
"varint": 'int',
"varlong": 'int',
"u8": 'int',
"i8": 'int',
"u16": 'int',
"u32": 'int',
"u64": 'int',
"i16": 'int',
"i32": 'int',
"i64": 'int',
"f32": 'float',
"f64": 'float',
"bool": 'bool',
"UUID": 'str',
"string": 'str',
"nbt": 'dict',
"optionalNbt": 'Optional[dict]',
"slot": 'Item',
"position": 'tuple',
"entityMetadata": 'dict',
"restBuffer": 'bytes',
"void": 'None',
"tags": 'list', # TODO this has been added in 1.13
"bitfield": 'int', # TODO this can be hinted better
}
def _format_line(i, depth:int=0) -> str:
nl = ('\n' if depth > 0 else " ")
tab = '\t' * depth
return nl + tab + \
f",{nl}{tab}".join(f"{repr(e)}" for e in i) + \
nl + ('\t' * (depth-1))
def format_dict(d:dict, depth:int=1) -> str:
return "{" + _format_line((Ref(f"{k} : {v}") for k,v in sorted(d.items())), depth) + "}"
def format_list(l:list, depth:int=0) -> str:
return "[" + _format_line(l, depth) + "]"
def format_tuple(l:tuple | list, depth:int=0) -> str:
return "(" + _format_line(l, depth) + ")"
def mctype(slot_type:Any) -> Ref:
if isinstance(slot_type, str) and slot_type in TYPE_MAP:
return TYPE_MAP[slot_type]
if isinstance(slot_type, list):
t = slot_type[0]
v = slot_type[1]
if t == "buffer": # Array of bytes
if "countType" in v and v["countType"] == "integer":
return Ref('IntegerByteArray')
return Ref('ByteArray')
elif t == "array": # Generic array
return Ref('ArrayType',
mctype(v["type"]),
(v["count"] if "count" in v else mctype(v["countType"]) if "countType" in v else Ref('VarInt'))
)
elif t == "container": # Struct
return Ref('StructType', Ref(", ".join(format_tuple((p["name"], mctype(p["type"]))) for p in v if "name" in p))) # some fields are anonymous???
elif t == "option": # Optional
return Ref('OptionalType', mctype(v))
elif t == "switch": # Union
return Ref('SwitchType',
v["compareTo"].split('/')[-1],
Ref(format_dict({int(k) if k.isnumeric() else repr(k):mctype(x) for k,x in v["fields"].items()}, depth=0)),
mctype(v["default"]) if "default" in v and v['default'] != 'void' else None,
)
elif t == "bitfield":
# TODO can be made better...
size = 0
for field in v:
size += field["size"]
if size <= 32:
return Ref('Int')
return Ref('Long')
# elif t == "mapper": # ????
# return TrailingData
else:
logging.error("Encountered unknown composite data type : %s", t)
return Ref('TrailingData')
def mchint(slot_type:Any) -> Ref:
if isinstance(slot_type, str) and slot_type in HINT_MAP:
return Ref(HINT_MAP[slot_type])
if isinstance(slot_type, list):
t = slot_type[0]
if t == "buffer": # Array of bytes
return Ref('bytes')
elif t == "array": # Generic array
return Ref('list')
elif t == "container": # Struct TODO make an object with type hints
return Ref('dict')
elif t == "option": # Optional
return Ref('tuple')
elif t == "switch": # Union
possibilities = set()
for val in slot_type[1]['fields'].values():
possibilities.add(mchint(val))
if 'default' in slot_type[1]:
possibilities.add(mchint(slot_type[1]['default']))
else:
possibilities.add(Ref('None'))
return Ref(f'Union[{", ".join(str(s) for s in sorted(possibilities))}]')
elif t == "bitfield":
return Ref('int')
# elif t == "mapper": # ????
# return TrailingData
logging.error("Unknown type %s, using 'bytes' as hint", str(slot_type))
return Ref('bytes')
def pytype(t:list) -> str:
vals = set(str(x) for x in t)
if len(vals) <= 1:
return next(iter(vals))
return 'Union[' + ','.join(x for x in sorted(vals)) + ']'
def snake_to_camel(name:str) -> str:
return "".join(x.capitalize() for x in name.split("_"))
# def parse_slot(slot: dict) -> str:
# name = slot["name"] if "name" in slot else "anon"
# if keyword.iskeyword(name):
# name = "is_" + name
# t = mctype(slot["type"] if "type" in slot else "restBuffer")
# return f"(\"{name}\", {t.__name__})"
#
# def parse_field(slot: dict) -> str:
# name = slot["name"] if "name" in slot else "anon"
# if keyword.iskeyword(name):
# name = "is_" + name
# t = mctype(slot["type"] if "type" in slot else "restBuffer")
# return f"{name} : {t._pytype.__name__}"
class PacketClassWriter:
name : str
attrs : set[str]
types : dict[str, set[Ref]]
hints : dict[str, set[Ref]]
ids : dict[int, int]
definitions : dict[int, list[tuple[str, Ref]]]
state : int
def __init__(self, pkt:dict, state:int):
self.name = pkt["name"]
self.state = state
self.attrs = set()
self.ids = {}
self.types = {}
self.hints = {}
self.definitions = {}
for v, defn in pkt["definitions"].items():
self.ids[v] = defn["id"]
self.definitions[v] = []
for field in defn["slots"]:
if "name" not in field:
logging.error("Skipping anonymous field %s", str(field))
continue
field_name : str = field["name"] if not keyword.iskeyword(field["name"]) else "is_" + field["name"]
self.attrs.add(field_name)
self.definitions[v].append((field_name, mctype(field["type"])))
if field_name not in self.types:
self.types[field_name] = set()
self.types[field_name].add(mctype(field["type"]))
if field_name not in self.hints:
self.hints[field_name] = set()
self.hints[field_name].add(mchint(field["type"]))
def compile(self) -> str:
return PREFACE + \
IMPORTS + \
OBJECT.format(
name=self.name,
ids=format_dict(self.ids, depth=2),
definitions=format_dict({ k : Ref(format_list([Ref(format_tuple(x)) for x in v])) for k,v in self.definitions.items() }, depth=2),
slots=format_tuple(["id"] + sorted(self.attrs), depth=0), # TODO jank fix when no slots
fields="\n\t" + "\n\t".join(f"{a} : {pytype(sorted(self.hints[a]))}" for a in sorted(self.attrs)),
state=self.state,
constructor=_format_line([Ref(f"{field}:{pytype(sorted(self.hints[field]))}=None") for field in sorted(self.attrs)] + [Ref("**kwargs")], depth=2),
constructed=_format_line((Ref(f"{field}={field}") for field in sorted(self.attrs)), depth=3),
)
class RegistryClassWriter:
registry : dict
def __init__(self, registry:dict):
self.registry = registry
def compile(self) -> str:
return REGISTRY_ENTRY.format(
entries=format_dict({
v : format_dict(
self.registry[v], depth=0
) for v in self.registry
})
)
# entries='{\n\t' + ",\n\t".join((
# str(v) + " : { " + ", ".join(
# f"{pid}:{clazz}" for (pid, clazz) in self.registry[v].items()
# ) + ' }' ) for v in self.registry.keys()
# ) + '\n}'
# )
def _make_module(path:Path, contents:dict):
os.mkdir(path)
imports = ""
for key, value in contents.items():
imports += f"from .{key} import {value}\n"
with open(path / "__init__.py", "w") as f:
f.write(PREFACE + imports)
def compile():
import shutil
import zipfile
from urllib.request import urlretrieve
base_path = Path(os.getcwd())
mc_path = base_path / 'aiocraft' / 'mc'
# Retrieve proto definitions from PrismarineJS/minecraft-data
urlretrieve("https://github.com/PrismarineJS/minecraft-data/zipball/master", mc_path / "minecraft-data.zip")
with zipfile.ZipFile(mc_path / 'minecraft-data.zip', 'r') as f:
f.extractall(mc_path)
# First folder starting with PrismarineJS
folder_name = next(folder for folder in os.listdir(mc_path) if folder.startswith("PrismarineJS"))
if os.path.isdir(mc_path / 'proto'):
shutil.rmtree(mc_path / 'proto')
PACKETS = {
"handshaking": {
"clientbound": {},
"serverbound": {},
},
"status": {
"clientbound": {},
"serverbound": {},
},
"login": {
"clientbound": {},
"serverbound": {},
},
"play": {
"clientbound": {},
"serverbound": {},
}
}
METADATA = {}
PARTICLES = {}
all_versions = os.listdir(mc_path / f'{folder_name}/data/pc/')
all_versions.remove("common")
all_proto_numbers = []
# _make_module(mc_path / 'proto', { f"v{v.replace('.', '_').replace('-', '_')}":"*" for v in all_versions })
for v in all_versions:
if v == "0.30c":
continue # Proto just too antique!
if not os.path.isfile(mc_path / f'{folder_name}/data/pc/{v}/protocol.json') \
or not os.path.isfile(mc_path / f'{folder_name}/data/pc/{v}/version.json'):
continue
with open(mc_path / f'{folder_name}/data/pc/{v}/version.json') as f:
proto_version = json.load(f)['version']
if proto_version < 47 or proto_version > 1000:
continue # avoid versions before 1.8
all_proto_numbers.append(proto_version)
with open(mc_path / f'{folder_name}/data/pc/{v}/protocol.json') as f:
data = json.load(f)
METADATA[proto_version] = {}
for meta_id, meta_type in data["types"]["entityMetadataItem"][1]["fields"].items():
METADATA[proto_version][int(meta_id)] = mctype(meta_type)
if "particleData" in data["types"]:
PARTICLES[proto_version] = {}
for p_id, p_type in data["types"]["particleData"][1]["fields"].items():
PARTICLES[proto_version][int(p_id)] = mctype(p_type)
# Build data structure containing all packets with all their definitions for different versions
for state in ("handshaking", "status", "login", "play"):
for _direction in ("toClient", "toServer"):
direction = DIR_MAP[_direction]
try:
buf = data[state][_direction]["types"]["packet"][1][0]["type"][1]["mappings"]
except KeyError:
logging.exception("Exception building %s|%s|%s definitions", v, state, direction)
print("Exception building %s|%s|%s definitions" % (v, state, direction))
# _make_module(mc_path / f"proto/{version}/{state}/{direction}", {})
continue
registry = { f"packet_{value}" : int(key, 16) for (key, value) in buf.items() }
for p_name in data[state][_direction]["types"].keys():
if p_name == "packet":
continue # it's the registry entry
if p_name not in registry:
logging.warning("Trying to make definitions for packet '%s'", p_name)
continue # wtf!
packet = data[state][_direction]["types"][p_name]
pid = registry[p_name]
class_name = snake_to_camel(p_name)
if p_name not in PACKETS[state][direction]:
PACKETS[state][direction][p_name] = {
"name" : class_name,
"definitions" : {},
}
PACKETS[state][direction][p_name]["definitions"][proto_version] = {
"id": pid,
"slots" : packet[1],
}
_STATE_MAP = {"handshaking": 0, "status":1, "login":2, "play":3}
_make_module(mc_path / 'proto', { k:"*" for k in PACKETS.keys() })
with open(mc_path / 'proto' / 'ext.py', 'w') as f:
f.write(
EXT_FORMATTER.format(
metadata = format_dict(METADATA, depth=2),
particles = format_dict(PARTICLES, depth=2),
)
)
for state in PACKETS.keys():
_make_module(mc_path / f"proto/{state}", { k:"*" for k in PACKETS[state].keys() })
for direction in PACKETS[state].keys():
registry = {}
_make_module(mc_path / f"proto/{state}/{direction}", { k:snake_to_camel(k) for k in PACKETS[state][direction].keys() })
for packet in PACKETS[state][direction].keys():
pkt = PACKETS[state][direction][packet]
for v, defn in pkt["definitions"].items():
if v not in registry:
registry[v] = {}
registry[v][defn['id']] = snake_to_camel(packet)
with open(mc_path / f"proto/{state}/{direction}/{packet}.py", "w") as f:
f.write(PacketClassWriter(pkt, _STATE_MAP[state]).compile())
with open(mc_path / f"proto/{state}/{direction}/__init__.py", "a") as f:
f.write(RegistryClassWriter(registry).compile())
if __name__ == "__main__":
compile()