#!/usr/bin/env python import os import json import keyword import logging from pathlib import Path from typing import List, Dict, Union from aiocraft.mc.mctypes import * DIR_MAP = {"toClient": "clientbound", "toServer": "serverbound"} PREFACE = """\"\"\"[!] This file is autogenerated\"\"\"\n\n""" IMPORTS = """from typing import Tuple, Dict from ....packet import Packet from ....mctypes import *\n""" IMPORT_ALL = """__all__ = [\n\t{all}\n]\n""" OBJECT = """ class {name}(Packet): {fields} _ids : Dict[int, int] = {ids} _slots : Dict[int, Tuple[Tuple[str, Type]]] = {slots} """ TYPE_MAP = { "varint": VarInt, "u8": UnsignedShort, "u16": UnsignedInt, "u32": UnsignedLong, "i16": Short, "i32": Int, "i64": Long, "f32": Float, "f64": Double, "bool": Boolean, "UUID": UUID, "string": String, "nbt": NBTTag, "slot": Slot, "position": Position, "entityMetadataItem": EntityMetadataItem, "entityMetadata": EntityMetadata, } def mctype(name:str) -> Type: if not isinstance(name, str): return String # should return TrailingByteArray but still haven't implemented it (: if name in TYPE_MAP: return TYPE_MAP[name] return String # should return TrailingByteArray but still haven't implemented it (: def snake_to_camel(name:str) -> str: return "".join(x.capitalize() for x in name.split("_")) def parse_slot(slot: dict) -> str: name = slot["name"] if "name" in slot else "anon" if keyword.iskeyword(name): name = "is_" + name t = mctype(slot["type"] if "type" in slot else "restBuffer") return f"(\"{name}\", {t.__name__})" def parse_field(slot: dict) -> str: name = slot["name"] if "name" in slot else "anon" if keyword.iskeyword(name): name = "is_" + name t = mctype(slot["type"] if "type" in slot else "restBuffer") return f"{name} : {t._pytype.__name__}" class PacketClassWriter: title : str ids : str slots : str fields: str def __init__(self, title:str, ids:str, slots:str, fields:str): self.title = title self.ids = ids self.slots = slots self.fields = fields def compile(self) -> str: return PREFACE + \ IMPORTS + \ OBJECT.format( name=self.title, ids=self.ids, slots=self.slots, fields=self.fields, ) def _make_module(path:Path, contents:dict): os.mkdir(path) imports = "" for key in contents: imports += f"from .{key} import {contents[key]}\n" with open(path / "__init__.py", "w") as f: f.write(PREFACE + imports) def compile(): import shutil import zipfile from urllib.request import urlretrieve base_path = Path(os.getcwd()) mc_path = base_path / 'aiocraft' / 'mc' # Retrieve proto definitions from PrismarineJS/minecraft-data urlretrieve("https://github.com/PrismarineJS/minecraft-data/zipball/master", mc_path / "minecraft-data.zip") with zipfile.ZipFile(mc_path / 'minecraft-data.zip', 'r') as f: f.extractall(mc_path) # First folder starting with PrismarineJS folder_name = next(folder for folder in os.listdir(mc_path) if folder.startswith("PrismarineJS")) shutil.rmtree(mc_path / 'proto') PACKETS = { "handshaking": { "clientbound": {}, "serverbound": {}, }, "status": { "clientbound": {}, "serverbound": {}, }, "login": { "clientbound": {}, "serverbound": {}, }, "play": { "clientbound": {}, "serverbound": {}, } } # TODO load all versions! all_versions = os.listdir(mc_path / f'{folder_name}/data/pc/') all_versions.remove("common") # _make_module(mc_path / 'proto', { f"v{v.replace('.', '_').replace('-', '_')}":"*" for v in all_versions }) for v in all_versions: if v == "0.30c": continue # Proto just too antique! if not os.path.isfile(mc_path / f'{folder_name}/data/pc/{v}/protocol.json') \ or not os.path.isfile(mc_path / f'{folder_name}/data/pc/{v}/version.json'): continue with open(mc_path / f'{folder_name}/data/pc/{v}/version.json') as f: proto_version = json.load(f)['version'] with open(mc_path / f'{folder_name}/data/pc/{v}/protocol.json') as f: data = json.load(f) for state in ("handshaking", "status", "login", "play"): for _direction in ("toClient", "toServer"): direction = DIR_MAP[_direction] try: buf = data[state][_direction]["types"]["packet"][1][0]["type"][1]["mappings"] except KeyError: logging.exception("Exception building %s|%s|%s definitions", v, state, direction) print("Exception building %s|%s|%s definitions" % (v, state, direction)) # _make_module(mc_path / f"proto/{version}/{state}/{direction}", {}) continue registry = { f"packet_{value}" : int(key, 16) for (key, value) in buf.items() } for p_name in data[state][_direction]["types"].keys(): if p_name == "packet": continue # it's the registry entry if p_name not in registry: logging.warning("Trying to make definitions for packet '%s'", p_name) continue # wtf! packet = data[state][_direction]["types"][p_name] pid = registry[p_name] class_name = snake_to_camel(p_name) if p_name not in PACKETS[state][direction]: PACKETS[state][direction][p_name] = { "name" : class_name, "definitions" : {}, } PACKETS[state][direction][p_name]["definitions"][proto_version] = { "id": pid, "slots" : packet[1], } _make_module(mc_path / 'proto', { k:"*" for k in PACKETS.keys() }) for state in PACKETS.keys(): _make_module(mc_path / f"proto/{state}", { k:"*" for k in PACKETS[state].keys() }) for direction in PACKETS[state].keys(): _make_module(mc_path / f"proto/{state}/{direction}", { k:snake_to_camel(k) for k in PACKETS[state][direction].keys() }) for packet in PACKETS[state][direction].keys(): pkt = PACKETS[state][direction][packet] slots = [] fields = set() ids = [] for v in sorted(PACKETS[state][direction][packet]["definitions"].keys()): defn = pkt["definitions"][v] ids.append(f"{v} : 0x{defn['id']:02X}") v_slots = [] v_fields = [] for slot in defn["slots"]: v_slots.append(parse_slot(slot)) fields.add(parse_field(slot)) slots.append(f"{v} : ( {','.join(v_slots)} ),") with open(mc_path / f"proto/{state}/{direction}/{packet}.py", "w") as f: f.write( PacketClassWriter( pkt["name"], '{\n\t\t' + ',\n\t\t'.join(ids) + '\n\t}\n', '{\n\t\t' + '\n\t\t'.join(slots) + '\n\t}\n', '\n\t'.join(fields), ).compile() )