import struct import numpy as np from collections import namedtuple, OrderedDict from datetime import datetime FileHeader = namedtuple('FileHeader', ['machineCode', 'skip0', 'creationFlags', 'numberOfSubfiles', 'fileCreationDate', 'headerDirectoryOffset', 'subfileDirectoryOffset', 'headerDataOffset', 'firstSubfileDataBlockOffset', 'indexTableOffset', 'headerDirectorySize', 'subfileDirectorySize', 'headerDataSize', 'indexTableSize', 'headerCRC32']) def decode_flags(flags): isHeaderDirCompressed = (flags & 128) != 0 isTTablePresent = (flags & 1) != 0 isOffsetTablePresent = (flags & 2) != 0 isSubfileVariableSize = (flags & 64) != 0 isSubfileMagicNbrPresent = (flags & 8) != 0 isSubfileSizePresent = (flags & 16 ) != 0 isSubfileTValuePresent = (flags & 32 ) != 0 isSubfileCRCPresent = (flags & 4 ) != 0 return locals() def data_start(decoded): data_start = 0 if (decoded['isSubfileCRCPresent']): data_start += 8 if (decoded['isSubfileMagicNbrPresent']): data_start += 4 if (decoded['isSubfileTValuePresent']): data_start += 8 if (decoded['isSubfileSizePresent']): data_start += 4 return data_start def readString(inS): length = struct.unpack('<i', inS.read(4))[0] assert length < 1000 return inS.read(length*2).decode('utf-16') def read_data_directory(inS): magic = struct.unpack_from('<i', inS.read(4))[0] assert magic == 0x30726940 directory = OrderedDict({}) for i in range(struct.unpack_from('<i', inS.read(4))[0]): if i > 10000: break name = readString(inS) ndims, compression = struct.unpack_from('<hh', inS.read(4)) axes = OrderedDict({}) for dim in range(ndims): axisName = readString(inS) axisUnit = readString(inS) axisType,axisNpts,axisMinValue,axisMaxValue = struct.unpack('<hidd', inS.read(2+4+8+8)) axes[axisName] = (axisUnit, axisType, axisNpts, axisMinValue, axisMaxValue) directory[name] = ndims, compression, axes return directory def calc_data_size(subfileDirectory): total = 0 for entry_name, (ndims, compression, axes) in subfileDirectory.items(): assert compression == 0 shape = tuple(axis[2] for axis in axes.values()) type_number = list(axes.values())[0][1] types_bytes = {1: 1, 2:1, 3:1, 4:2, 5:4, 6: 4, 7:4, 8:8, 9:8, 10:16, 50:0 } total += types_bytes[type_number]*np.prod(shape) return total def readSubfile(index, fileheader, subfileDirectory, inS): dataStart = data_start(decode_flags(fileheader.creationFlags)) subfileDataSize = calc_data_size(subfileDirectory) offset = 504 + index * (dataStart + subfileDataSize) inS.seek(offset + dataStart) data = {} for entry_name, (ndims, compression, axes) in subfileDirectory.items(): assert compression == 0 shape = tuple(axis[2] for axis in axes.values()) type_number = list(axes.values())[0][1] types = {1: np.ubyte, 2:np.bool8, 3:np.char, 4:np.short, 5:np.int32, 6: np.long, 7:np.float32, 8:np.double, 9:np.complex64, 10:np.complex128, 50:str } dtype = types[type_number] entry_data = np.fromstring(inS.read(int(np.prod(shape)*dtype().nbytes)), dtype=dtype).reshape(shape) if shape == (1,): entry_data = entry_data.item(0) data[entry_name] = entry_data return data def read_zip(zipfile, name): class PatchedZip: def __init__(self, name, zipfile): self.name = name self.zipfile = zipfile self.offset = 0 self.zipext = zipfile.open(name) def read(self, nbytes): self.offset += nbytes return self.zipext.read(nbytes) def seek(self, offset): if self.offset > offset: self.zipext.close() self.zipext = self.zipfile.open(self.name) self.offset = 0 self.read(offset) else: self.read(offset - self.offset) self.offset = offset return read_stream(PatchedZip(name, zipfile)) def read_stream(inS): inS.seek(0) fmt = '32s2s64s2s64s2s254s2s' [e.decode('utf-16') for e in struct.unpack_from(fmt, inS.read(struct.calcsize(fmt)))] fmt = '<bbiilllllliiiil' fh = FileHeader(*struct.unpack_from(fmt,inS.read(struct.calcsize(fmt)))) inS.seek(fh.subfileDirectoryOffset) headerDirectory = read_data_directory(inS) subfileDirectory = read_data_directory(inS) all_data = OrderedDict([]) for index in range(fh.numberOfSubfiles): yield readSubfile(index, fh, subfileDirectory, inS) def read_file(path): with open(path, 'rb') as inS: yield from read_stream(inS)