implemented 1.12.2 format, sorta

still broken but it's progress!
This commit is contained in:
əlemi 2022-01-15 01:57:40 +01:00
parent b0b0e2dcfa
commit 0ef56df704

View file

@ -1,17 +1,19 @@
import io
import math
import logging
from typing import Dict, Tuple, Any
import numpy as np
from aiocraft.mc.types import VarInt, Short, UnsignedByte, Type
from aiocraft.mc.types import VarInt, Short, UnsignedByte, Type, Context
class BitStream:
data : bytes
cursor : int
size : int
def __init__(self, data:bytes, size:int=-1):
def __init__(self, data:bytes, size:int):
self.data = data
self.cursor = 0
self.size = size if size > 0 else len(self.data) * 8
@ -21,95 +23,134 @@ class BitStream:
def read(self, size:int) -> int:
if len(self) < size:
raise ValueError("Not enough bits")
aligned_size = (size//8)+1
raise ValueError(f"Not enough bits ({len(self)} left, {size} requested)")
start_byte = (self.cursor//8)
end_byte = math.ceil((self.cursor + size) / 8) + 1
buf = int.from_bytes(
self.data[self.cursor:aligned_size],
self.data[start_byte:end_byte],
byteorder='little', signed=False
)
cut_right = 8 - ((self.cursor + size) % 8)
self.cursor += size
delta = aligned_size - size
return ( buf << delta ) >> delta
return ( buf >> cut_right ) & ( 0xFF >> (8 - (size%8)))
class PalettedContainer(Type):
pytype : type
threshold : int
maxsize : int
size : int
def __init__(self, threshold:int, maxsize:int):
def __init__(self, threshold:int, size:int):
self.threshold = threshold
self.maxsize = maxsize
self.size = size
def write(self, data, buffer:io.BytesIO, ctx:object=None):
def write(self, data, buffer:io.BytesIO, ctx:Context):
raise NotImplementedError
def read(self, buffer:io.BytesIO, ctx:object=None):
bits = UnsignedByte.read(buffer, ctx=ctx)
def read(self, buffer:io.BytesIO, ctx:Context):
bits = max(UnsignedByte.read(buffer, ctx=ctx), 4)
if bits > 13:
raise ValueError("Bits Per Bit too high : %d", bits)
palette = np.empty((0,), dtype='int32')
palette_len = VarInt.read(buffer, ctx=ctx)
if bits == 0:
value = VarInt.read(buffer, ctx=ctx)
value = palette_len
elif bits < self.threshold:
palette_len = VarInt.read(buffer, ctx=ctx)
palette = np.zeros((palette_len,), dtype='int32')
for i in range(palette_len):
palette[i] = VarInt.read(buffer)
palette[i] = VarInt.read(buffer, ctx=ctx)
size = VarInt.read(buffer, ctx=ctx)
stream = BitStream(buffer.read(size * 8))
section = np.zeros((self.maxsize,), dtype='int32')
stream = BitStream(buffer.read(size * 8), size*8*8) # a Long is 64 bits long
section = np.zeros((self.size, self.size, self.size), dtype='int32')
index = 0
while index < self.maxsize and len(stream) > 0:
val = stream.read(bits) if bits > 0 else value
section[index] = palette[val] if bits < self.threshold and bits > 0 else val
index+=1
for y in range(self.size):
for z in range(self.size):
for x in range(self.size):
val = stream.read(bits) if bits > 0 else value
if bits < self.threshold and bits > 0 and val > len(palette):
logging.warning("Expected index %d in palette of size %d", val, len(palette))
section[x][y][z] = palette[val] if bits < self.threshold and bits > 0 and val < len(palette) else val
return section
BiomeContainer = PalettedContainer(4, 64)
BlockStateContainer = PalettedContainer(9, 4096)
BiomeContainer = PalettedContainer(4, 4)
BlockStateContainer = PalettedContainer(9, 16)
class ChunkSectionType(Type):
class NewChunkSectionType(Type):
pytype : type
def write(self, data, buffer:io.BytesIO, ctx:object=None):
def write(self, data, buffer:io.BytesIO, ctx:Context):
raise NotImplementedError
def read(self, buffer:io.BytesIO, ctx:object=None):
block_count = Short.read(buffer)
block_states = BlockStateContainer.read(buffer)
biomes = BiomeContainer.read(buffer)
def read(self, buffer:io.BytesIO, ctx:Context):
block_count = Short.read(buffer, ctx=ctx)
block_states = BlockStateContainer.read(buffer, ctx=ctx)
biomes = BiomeContainer.read(buffer, ctx=ctx)
return (
block_count,
block_states.reshape((16, 16, 16)),
biomes.reshape((4, 4, 4))
block_states,
biomes
)
ChunkSection = ChunkSectionType()
class OldChunkSectionType(Type):
pytype : type
def write(self, data, buffer:io.BytesIO, ctx:Context):
raise NotImplementedError
def read(self, buffer:io.BytesIO, ctx:Context):
section = BlockStateContainer.read(buffer, ctx=ctx)
block_light = np.empty((16, 16, 16), dtype='int32')
block_light_buffer = BitStream(buffer.read(2048), 2048*8)
for y in range(16):
for z in range(16):
for x in range(16):
block_light[x][y][z] = block_light_buffer.read(4)
sky_light = np.empty((16, 16, 16), dtype='int32')
if ctx.overworld:
sky_light_buffer = BitStream(buffer.read(2048), 2048*8)
for y in range(16):
for z in range(16):
for x in range(16):
sky_light[x][y][z] = sky_light_buffer.read(4)
return (
section,
block_light,
sky_light
)
ChunkSection = OldChunkSectionType()
class Chunk(Type):
x : int
z : int
bitmask : int
ground_up_continuous : bool
blocks : np.ndarray
biomes : np.ndarray
block_count : int
block_light : np.ndarray
sky_light : np.ndarray
biomes: bytes
def __init__(self, x:int, z:int, bitmask:int):
def __init__(self, x:int, z:int, bitmask:int, ground_up_continuous:bool):
self.x = x
self.z = z
self.bitmask = bitmask
self.blocks = np.zeros((16, 256, 16))
self.biomes = np.zeros((4, 64, 4))
self.block_count = 0
self.block_light = np.zeros((16, 256, 16))
self.sky_light = np.zeros((16, 256, 16))
self.ground_up_continuous = ground_up_continuous
def __getitem__(self, item:Any):
return self.blocks[item]
def read(self, buffer:io.BytesIO, ctx:object=None):
def read(self, buffer:io.BytesIO, ctx:Context):
for i in range(16):
if (self.bitmask >> i) & 1:
block_count, block_states, biomes = ChunkSection.read(buffer)
self.block_count += block_count
self.blocks[:, i*16 : (i+1)*16, :] = block_states
self.biomes[:, i*4 : (i+1)*4, :] = biomes
section, block_light, sky_light = ChunkSection.read(buffer, ctx=ctx)
self.blocks[:, i*16 : (i+1)*16, :] = section
self.block_light[:, i*16 : (i+1)*16, :] = block_light
self.sky_light[:, i*16 : (i+1)*16, :] = sky_light
if self.ground_up_continuous:
self.biomes = buffer.read(256)
return self
class World:
@ -121,11 +162,11 @@ class World:
def __getitem__(self, item:Tuple[int, int, int]):
return self.get(*item)
def get(self, x:int, y:int, z:int):
def get(self, x:int, y:int, z:int) -> int:
coord = (x//16, z//16)
if coord not in self.chunks:
raise KeyError(f"Chunk {coord} not loaded")
return self.chunks[coord][x%16, y, z%16]
return self.chunks[coord][int(x%16), int(y), int(z%16)]
def put(self, chunk:Chunk, x:int, z:int):
self.chunks[(x,z)] = chunk