Skip to content
Snippets Groups Projects
bomem_file.py 5.6 KiB
Newer Older
import struct
import numpy as np
from collections import namedtuple, OrderedDict
from datetime import datetime

FileHeader = namedtuple('FileHeader', ['machineCode',
                                       'skip0',
                                       'creationFlags',
                                       'numberOfSubfiles',
                                       'fileCreationDate',
                                       'headerDirectoryOffset',
                                       'subfileDirectoryOffset',
                                       'headerDataOffset',
                                       'firstSubfileDataBlockOffset',
                                       'indexTableOffset',
                                       'headerDirectorySize',
                                       'subfileDirectorySize',
                                       'headerDataSize',
                                       'indexTableSize',
                                       'headerCRC32'])


def decode_flags(flags):
    isHeaderDirCompressed = (flags & 128) != 0
    isTTablePresent = (flags & 1) != 0
    isOffsetTablePresent = (flags & 2) != 0
    isSubfileVariableSize    = (flags & 64) != 0
    isSubfileMagicNbrPresent = (flags & 8) != 0
    isSubfileSizePresent     = (flags & 16 ) != 0
    isSubfileTValuePresent   = (flags & 32 ) != 0
    isSubfileCRCPresent      = (flags & 4 ) != 0
    return locals()


def data_start(decoded):
    data_start = 0
    if (decoded['isSubfileCRCPresent']):
        data_start += 8
    if (decoded['isSubfileMagicNbrPresent']):
        data_start += 4
    if (decoded['isSubfileTValuePresent']):
        data_start += 8
    if (decoded['isSubfileSizePresent']):
        data_start += 4
    return data_start


def readString(inS):
    length = struct.unpack('<i', inS.read(4))[0]
    assert length < 1000
    return inS.read(length*2).decode('utf-16')


def read_data_directory(inS):
    magic = struct.unpack_from('<i', inS.read(4))[0]
    assert magic == 0x30726940
    directory = OrderedDict({})
    for i in range(struct.unpack_from('<i', inS.read(4))[0]):
        if i > 10000:
            break
        name = readString(inS)
        ndims, compression = struct.unpack_from('<hh', inS.read(4))
        axes = OrderedDict({})
        for dim in range(ndims):
            axisName = readString(inS)
            axisUnit = readString(inS)
            axisType,axisNpts,axisMinValue,axisMaxValue = struct.unpack('<hidd', inS.read(2+4+8+8))
            axes[axisName] = (axisUnit, axisType, axisNpts, axisMinValue, axisMaxValue)
        directory[name] = ndims, compression, axes
    return directory


def calc_data_size(subfileDirectory):
    total = 0
    for entry_name, (ndims, compression, axes) in subfileDirectory.items():
        assert compression == 0
        shape = tuple(axis[2] for axis in axes.values())
        type_number = list(axes.values())[0][1]
        types_bytes = {1: 1,
                 2:1,
                 3:1,
                 4:2,
                 5:4,
                 6: 4,
                 7:4,
                 8:8,
                 9:8,
                 10:16,
                 50:0
                }
        total += types_bytes[type_number]*np.prod(shape)
    return total


def readSubfile(index, fileheader, subfileDirectory, inS):
    dataStart = data_start(decode_flags(fileheader.creationFlags))
    subfileDataSize = calc_data_size(subfileDirectory)
    offset = 504 + index * (dataStart + subfileDataSize)
    inS.seek(offset + dataStart)
    data = {}
    for entry_name, (ndims, compression, axes) in subfileDirectory.items():
        assert compression == 0
        shape = tuple(axis[2] for axis in axes.values())
        type_number = list(axes.values())[0][1]
        types = {1: np.ubyte,
                 2:np.bool8,
                 3:np.char,
                 4:np.short,
                 5:np.int32,
                 6: np.long,
                 7:np.float32,
                 8:np.double,
                 9:np.complex64,
                 10:np.complex128,
                 50:str
                }
        dtype = types[type_number]
        entry_data = np.fromstring(inS.read(int(np.prod(shape)*dtype().nbytes)), dtype=dtype).reshape(shape)
        if shape == (1,):
            entry_data = entry_data.item(0)
        data[entry_name] = entry_data
    return data

def read_zip(zipfile, name):

    class PatchedZip:
        def __init__(self, name, zipfile):
            self.name = name
            self.zipfile = zipfile
            self.offset = 0
            self.zipext = zipfile.open(name)

        def read(self, nbytes):
            self.offset += nbytes
            return self.zipext.read(nbytes)

        def seek(self, offset):
            if self.offset > offset:
                self.zipext.close()
                self.zipext = self.zipfile.open(self.name)
                self.offset = 0
                self.read(offset)
            else:
                self.read(offset - self.offset)
                self.offset = offset
    return read_stream(PatchedZip(name, zipfile))

def read_stream(inS):
    inS.seek(0)
    fmt = '32s2s64s2s64s2s254s2s'
    [e.decode('utf-16') for e in struct.unpack_from(fmt, inS.read(struct.calcsize(fmt)))]
    fmt = '<bbiilllllliiiil'
    fh = FileHeader(*struct.unpack_from(fmt,inS.read(struct.calcsize(fmt))))
    inS.seek(fh.subfileDirectoryOffset)
    headerDirectory = read_data_directory(inS)
    subfileDirectory = read_data_directory(inS)
    all_data = OrderedDict([])
    for index in range(fh.numberOfSubfiles):
        yield readSubfile(index, fh, subfileDirectory, inS)

def read_file(path):
    with open(path, 'rb') as inS:
        yield from read_stream(inS)