Skip to content

Instantly share code, notes, and snippets.

@fabian-paul
Created September 27, 2020 20:48
Show Gist options
  • Select an option

  • Save fabian-paul/fb0b15139eb83c3dda61e7caecd84b42 to your computer and use it in GitHub Desktop.

Select an option

Save fabian-paul/fb0b15139eb83c3dda61e7caecd84b42 to your computer and use it in GitHub Desktop.
create npy in append mode
dtype = np.dtype([("row", np.int32, ), ("col", np.int32, ), ("cov", np.float32, )])
a = np.zeros((3,), dtype=dtype)
for i in range(3):
a[i]["row"] = np.random.randint(2*15)
a[i]["col"] = np.random.randint(2*15)
a[i]["cov"] = np.random.randn()
with NpyStream("test.npy") as s:
s.append(a[0])
s.append(a[1])
s.append(a[2])
s.append(a[0])
x = np.load("test.npy")
assert len(x)==len(s)
assert x[0] == a[0]
assert x[1] == a[1]
assert x[2] == a[2]
assert x[3] == a[0]
x = np.load("test.npy", mmap_mode="r+")
np.sort(x, order=["row", "col"])
%xdel x
from typing import Optional, Union
class NpyStream(object):
def __init__(self, fname: str, dtype: Optional[np.dtype]=None):
self._fp = None
self._fname = fname
self.dtype = dtype
self._n_items = 0
def __len__(self) -> int:
return self._n_items
@staticmethod
def _ceildiv(x: int, y: int) -> int:
return (x + y - 1) // y
def make_header(self) -> str:
descr_str = str(np.lib.format.dtype_to_descr(self.dtype))
format_str = ("{'descr':%s, 'fortran_order': False, 'shape':(%d,), }" % (descr_str, len(self))).encode("ascii")
total_length = NpyStream._ceildiv(len(format_str) + 10 , 64) * 64
if total_length > self._pad_space:
raise ValueError("Cannot write format string. Not enough pad space was reserved.")
header_length = total_length - 10
n_spaces = header_length - len(format_str) - 1
header = b"\x93NUMPY\x01\x00" + struct.pack('<H', header_length) + format_str + b" "*n_spaces + b"\n"
assert len(header) == total_length
return header
def write_header(self):
self._fp.write(self.make_header())
def open(self):
if self._fp is not None:
raise RuntimeError("File is already open. Cannot open twice.")
self._fp = open(self._fname, "wb")
self._fp.write(struct.pack('128s', b' '*128))
self._pad_space = 128
return self
def close(self):
if self._fp is not None:
self._fp.seek(0)
self.write_header()
self._fp.close()
self._fp = None
def append(self, item: Union[np.ndarray, np.void]):
if self.dtype is None:
self.dtype = item.dtype
else:
if self.dtype != item.dtype:
raise ValueError(f"dtype mismatch, expected {self.dtype} but got {item.dtype}")
self._fp.write(item.tobytes())
if isinstance(item, np.void):
self._n_items += 1
elif isinstance(item, np.ndarray):
self._n_items += item.shape[0]
else:
raise TypeError(f"Unrecognized item type {type(item)}.")
def __enter__(self):
self.open()
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment