Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import struct
import numpy as np
from collections import namedtuple, OrderedDict
from datetime import datetime
FileHeader = namedtuple('FileHeader', ['machineCode',
'skip0',
'creationFlags',
'numberOfSubfiles',
'fileCreationDate',
'headerDirectoryOffset',
'subfileDirectoryOffset',
'headerDataOffset',
'firstSubfileDataBlockOffset',
'indexTableOffset',
'headerDirectorySize',
'subfileDirectorySize',
'headerDataSize',
'indexTableSize',
'headerCRC32'])
def decode_flags(flags):
isHeaderDirCompressed = (flags & 128) != 0
isTTablePresent = (flags & 1) != 0
isOffsetTablePresent = (flags & 2) != 0
isSubfileVariableSize = (flags & 64) != 0
isSubfileMagicNbrPresent = (flags & 8) != 0
isSubfileSizePresent = (flags & 16 ) != 0
isSubfileTValuePresent = (flags & 32 ) != 0
isSubfileCRCPresent = (flags & 4 ) != 0
return locals()
def data_start(decoded):
data_start = 0
if (decoded['isSubfileCRCPresent']):
data_start += 8
if (decoded['isSubfileMagicNbrPresent']):
data_start += 4
if (decoded['isSubfileTValuePresent']):
data_start += 8
if (decoded['isSubfileSizePresent']):
data_start += 4
return data_start
def readString(inS):
length = struct.unpack('<i', inS.read(4))[0]
assert length < 1000
return inS.read(length*2).decode('utf-16')
def read_data_directory(inS):
magic = struct.unpack_from('<i', inS.read(4))[0]
assert magic == 0x30726940
directory = OrderedDict({})
for i in range(struct.unpack_from('<i', inS.read(4))[0]):
if i > 10000:
break
name = readString(inS)
ndims, compression = struct.unpack_from('<hh', inS.read(4))
axes = OrderedDict({})
for dim in range(ndims):
axisName = readString(inS)
axisUnit = readString(inS)
axisType,axisNpts,axisMinValue,axisMaxValue = struct.unpack('<hidd', inS.read(2+4+8+8))
axes[axisName] = (axisUnit, axisType, axisNpts, axisMinValue, axisMaxValue)
directory[name] = ndims, compression, axes
return directory
def calc_data_size(subfileDirectory):
total = 0
for entry_name, (ndims, compression, axes) in subfileDirectory.items():
assert compression == 0
shape = tuple(axis[2] for axis in axes.values())
type_number = list(axes.values())[0][1]
types_bytes = {1: 1,
2:1,
3:1,
4:2,
5:4,
6: 4,
7:4,
8:8,
9:8,
10:16,
50:0
}
total += types_bytes[type_number]*np.prod(shape)
return total
def readSubfile(index, fileheader, subfileDirectory, inS):
dataStart = data_start(decode_flags(fileheader.creationFlags))
subfileDataSize = calc_data_size(subfileDirectory)
offset = 504 + index * (dataStart + subfileDataSize)
inS.seek(offset + dataStart)
data = {}
for entry_name, (ndims, compression, axes) in subfileDirectory.items():
assert compression == 0
shape = tuple(axis[2] for axis in axes.values())
type_number = list(axes.values())[0][1]
types = {1: np.ubyte,
2:np.bool8,
3:np.char,
4:np.short,
5:np.int32,
6: np.long,
7:np.float32,
8:np.double,
9:np.complex64,
10:np.complex128,
50:str
}
dtype = types[type_number]
entry_data = np.fromstring(inS.read(int(np.prod(shape)*dtype().nbytes)), dtype=dtype).reshape(shape)
if shape == (1,):
entry_data = entry_data.item(0)
data[entry_name] = entry_data
return data
def read_zip(zipfile, name):
class PatchedZip:
def __init__(self, name, zipfile):
self.name = name
self.zipfile = zipfile
self.offset = 0
self.zipext = zipfile.open(name)
def read(self, nbytes):
self.offset += nbytes
return self.zipext.read(nbytes)
def seek(self, offset):
if self.offset > offset:
self.zipext.close()
self.zipext = self.zipfile.open(self.name)
self.offset = 0
self.read(offset)
else:
self.read(offset - self.offset)
self.offset = offset
return read_stream(PatchedZip(name, zipfile))
def read_stream(inS):
inS.seek(0)
fmt = '32s2s64s2s64s2s254s2s'
[e.decode('utf-16') for e in struct.unpack_from(fmt, inS.read(struct.calcsize(fmt)))]
fmt = '<bbiilllllliiiil'
fh = FileHeader(*struct.unpack_from(fmt,inS.read(struct.calcsize(fmt))))
inS.seek(fh.subfileDirectoryOffset)
headerDirectory = read_data_directory(inS)
subfileDirectory = read_data_directory(inS)
all_data = OrderedDict([])
for index in range(fh.numberOfSubfiles):
yield readSubfile(index, fh, subfileDirectory, inS)
def read_file(path):
with open(path, 'rb') as inS:
yield from read_stream(inS)