Created
September 27, 2020 20:48
-
-
Save fabian-paul/fb0b15139eb83c3dda61e7caecd84b42 to your computer and use it in GitHub Desktop.
create npy in append mode
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| dtype = np.dtype([("row", np.int32, ), ("col", np.int32, ), ("cov", np.float32, )]) | |
| a = np.zeros((3,), dtype=dtype) | |
| for i in range(3): | |
| a[i]["row"] = np.random.randint(2*15) | |
| a[i]["col"] = np.random.randint(2*15) | |
| a[i]["cov"] = np.random.randn() | |
| with NpyStream("test.npy") as s: | |
| s.append(a[0]) | |
| s.append(a[1]) | |
| s.append(a[2]) | |
| s.append(a[0]) | |
| x = np.load("test.npy") | |
| assert len(x)==len(s) | |
| assert x[0] == a[0] | |
| assert x[1] == a[1] | |
| assert x[2] == a[2] | |
| assert x[3] == a[0] | |
| x = np.load("test.npy", mmap_mode="r+") | |
| np.sort(x, order=["row", "col"]) | |
| %xdel x |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from typing import Optional, Union | |
| class NpyStream(object): | |
| def __init__(self, fname: str, dtype: Optional[np.dtype]=None): | |
| self._fp = None | |
| self._fname = fname | |
| self.dtype = dtype | |
| self._n_items = 0 | |
| def __len__(self) -> int: | |
| return self._n_items | |
| @staticmethod | |
| def _ceildiv(x: int, y: int) -> int: | |
| return (x + y - 1) // y | |
| def make_header(self) -> str: | |
| descr_str = str(np.lib.format.dtype_to_descr(self.dtype)) | |
| format_str = ("{'descr':%s, 'fortran_order': False, 'shape':(%d,), }" % (descr_str, len(self))).encode("ascii") | |
| total_length = NpyStream._ceildiv(len(format_str) + 10 , 64) * 64 | |
| if total_length > self._pad_space: | |
| raise ValueError("Cannot write format string. Not enough pad space was reserved.") | |
| header_length = total_length - 10 | |
| n_spaces = header_length - len(format_str) - 1 | |
| header = b"\x93NUMPY\x01\x00" + struct.pack('<H', header_length) + format_str + b" "*n_spaces + b"\n" | |
| assert len(header) == total_length | |
| return header | |
| def write_header(self): | |
| self._fp.write(self.make_header()) | |
| def open(self): | |
| if self._fp is not None: | |
| raise RuntimeError("File is already open. Cannot open twice.") | |
| self._fp = open(self._fname, "wb") | |
| self._fp.write(struct.pack('128s', b' '*128)) | |
| self._pad_space = 128 | |
| return self | |
| def close(self): | |
| if self._fp is not None: | |
| self._fp.seek(0) | |
| self.write_header() | |
| self._fp.close() | |
| self._fp = None | |
| def append(self, item: Union[np.ndarray, np.void]): | |
| if self.dtype is None: | |
| self.dtype = item.dtype | |
| else: | |
| if self.dtype != item.dtype: | |
| raise ValueError(f"dtype mismatch, expected {self.dtype} but got {item.dtype}") | |
| self._fp.write(item.tobytes()) | |
| if isinstance(item, np.void): | |
| self._n_items += 1 | |
| elif isinstance(item, np.ndarray): | |
| self._n_items += item.shape[0] | |
| else: | |
| raise TypeError(f"Unrecognized item type {type(item)}.") | |
| def __enter__(self): | |
| self.open() | |
| return self | |
| def __exit__(self, exc_type, exc_value, exc_traceback): | |
| self.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment