diff --git a/docs/devnotes/feat-v3-scale-offset-cast.md b/docs/devnotes/feat-v3-scale-offset-cast.md new file mode 100644 index 0000000000..a7dcb504fd --- /dev/null +++ b/docs/devnotes/feat-v3-scale-offset-cast.md @@ -0,0 +1,137 @@ +# scale_offset and cast_value codecs + +Source: https://github.com/zarr-developers/zarr-extensions/pull/43 + +## Overview + +Two array-to-array codecs for zarr v3, designed to work together for the +common pattern of storing floating-point data as compressed integers. + +--- + +## scale_offset + +**Type:** array -> array (does NOT change dtype) + +**Encode:** `out = (in - offset) * scale` +**Decode:** `out = (in / scale) + offset` + +### Parameters +- `offset` (optional): scalar subtracted during encoding. Default: 0 (additive identity). + Serialized in JSON using the zarr v3 fill-value encoding for the array's dtype. +- `scale` (optional): scalar multiplied during encoding (after offset subtraction). Default: 1 + (multiplicative identity). Same JSON encoding as offset. + +### Key rules +- Arithmetic MUST use the input array's own data type semantics (no implicit promotion). +- If any intermediate or final value is unrepresentable in that dtype, error. +- If neither scale nor offset is given, `configuration` may be omitted (codec is a no-op). +- Fill value MUST be transformed through the codec (encode direction). +- Only valid for real-number data types (int/uint/float families). + +### JSON +```json +{"name": "scale_offset", "configuration": {"offset": 5, "scale": 0.1}} +``` + +--- + +## cast_value + +**Type:** array -> array (CHANGES dtype) + +**Purpose:** Value-convert (not binary-reinterpret) array elements to a new data type. + +### Parameters +- `data_type` (required): target zarr v3 data type. +- `rounding` (optional): how to round when exact representation is impossible. + Values: `"nearest-even"` (default), `"towards-zero"`, `"towards-positive"`, + `"towards-negative"`, `"nearest-away"`. +- `out_of_range` (optional): what to do when a value is outside the target's range. + Values: `"clamp"`, `"wrap"`. If absent, out-of-range MUST error. + `"wrap"` only valid for integral two's-complement types. +- `scalar_map` (optional): explicit value overrides. + `{"encode": [[input, output], ...], "decode": [[input, output], ...]}`. + Evaluated BEFORE rounding/out_of_range. + +### Casting procedure (same for encode and decode, swapping source/target) +1. Check scalar_map — if input matches a key, use mapped value. +2. Check exact representability — if yes, use directly. +3. Apply rounding and out_of_range rules. +4. If none apply, MUST error. + +### Special values +- NaN propagates between IEEE 754 types unless scalar_map overrides. +- Signed zero preserved between IEEE 754 types. +- If target doesn't support NaN/infinity and input has them, MUST error + unless scalar_map provides a mapping. + +### Fill value +- MUST be cast using same semantics as elements. +- Implementations SHOULD validate fill value survives round-trip at metadata + construction time. + +### JSON +```json +{ + "name": "cast_value", + "configuration": { + "data_type": "uint8", + "rounding": "nearest-even", + "out_of_range": "clamp", + "scalar_map": { + "encode": [["NaN", 0], ["+Infinity", 0], ["-Infinity", 0]], + "decode": [[0, "NaN"]] + } + } +} +``` + +--- + +## Typical combined usage + +```json +{ + "data_type": "float64", + "fill_value": "NaN", + "codecs": [ + {"name": "scale_offset", "configuration": {"offset": -10, "scale": 0.1}}, + {"name": "cast_value", "configuration": { + "data_type": "uint8", + "rounding": "nearest-even", + "scalar_map": {"encode": [["NaN", 0]], "decode": [[0, "NaN"]]} + }}, + "bytes" + ] +} +``` + +--- + +## Implementation notes for zarr-python + +### scale_offset +- Subclass `ArrayArrayCodec`. +- `resolve_metadata`: transform fill_value via `(fill - offset) * scale`, keep dtype. +- `_encode_single`: `(array - offset) * scale` using numpy with same dtype. +- `_decode_single`: `(array / scale) + offset` using numpy with same dtype. +- `is_fixed_size = True`. + +### cast_value +- Subclass `ArrayArrayCodec`. +- `resolve_metadata`: change dtype to target dtype, cast fill_value. +- `_encode_single`: cast array from input dtype to target dtype. +- `_decode_single`: cast array from target dtype back to input dtype. +- Needs the input dtype stored (from `evolve_from_array_spec` or `resolve_metadata`). +- `is_fixed_size = True` (for fixed-size types). +- Initial implementation: support `rounding` and `out_of_range` for common cases. + `scalar_map` adds complexity but is needed for NaN handling. + +### Key design decisions from PR review +1. Encode = `(in - offset) * scale` (subtract, not add) — matches HDF5 and numcodecs. +2. No implicit precision promotion — arithmetic stays in the input dtype. +3. `out_of_range` defaults to error (not clamp). +4. `scalar_map` was added specifically to handle NaN-to-integer mappings. +5. Fill value must round-trip exactly through the codec chain. +6. Name uses underscore: `scale_offset`, `cast_value`. diff --git a/src/zarr/codecs/__init__.py b/src/zarr/codecs/__init__.py index 4c621290e7..04b31d0d5f 100644 --- a/src/zarr/codecs/__init__.py +++ b/src/zarr/codecs/__init__.py @@ -2,6 +2,7 @@ from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle from zarr.codecs.bytes import BytesCodec, Endian +from zarr.codecs.cast_value import CastValue from zarr.codecs.crc32c_ import Crc32cCodec from zarr.codecs.gzip import GzipCodec from zarr.codecs.numcodecs import ( @@ -27,6 +28,7 @@ Zlib, Zstd, ) +from zarr.codecs.scale_offset import ScaleOffset from zarr.codecs.sharding import ShardingCodec, ShardingCodecIndexLocation from zarr.codecs.transpose import TransposeCodec from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec @@ -38,9 +40,11 @@ "BloscCodec", "BloscShuffle", "BytesCodec", + "CastValue", "Crc32cCodec", "Endian", "GzipCodec", + "ScaleOffset", "ShardingCodec", "ShardingCodecIndexLocation", "TransposeCodec", @@ -61,6 +65,8 @@ register_codec("vlen-utf8", VLenUTF8Codec) register_codec("vlen-bytes", VLenBytesCodec) register_codec("transpose", TransposeCodec) +register_codec("scale_offset", ScaleOffset) +register_codec("cast_value", CastValue) # Register all the codecs formerly contained in numcodecs.zarr3 diff --git a/src/zarr/codecs/cast_value.py b/src/zarr/codecs/cast_value.py new file mode 100644 index 0000000000..d5eb0c5122 --- /dev/null +++ b/src/zarr/codecs/cast_value.py @@ -0,0 +1,387 @@ +from __future__ import annotations + +from dataclasses import dataclass, replace +from typing import TYPE_CHECKING, Literal, NotRequired, TypedDict + +import numpy as np + +from zarr.abc.codec import ArrayArrayCodec +from zarr.core.common import JSON, parse_named_configuration +from zarr.core.dtype import get_data_type_from_json + +if TYPE_CHECKING: + from typing import Self + + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import NDBuffer + from zarr.core.chunk_grids import ChunkGrid + from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType + +RoundingMode = Literal[ + "nearest-even", + "towards-zero", + "towards-positive", + "towards-negative", + "nearest-away", +] + +OutOfRangeMode = Literal["clamp", "wrap"] + + +class ScalarMapJSON(TypedDict): + encode: NotRequired[tuple[tuple[object, object]]] + decode: NotRequired[tuple[tuple[object, object]]] + + +# Pre-parsed scalar map entry: (source_float, target_float, source_is_nan) +_MapEntry = tuple[float, float, bool] + + +def _special_float(s: str) -> float: + """Convert special float string representations to float values.""" + if s == "NaN": + return float("nan") + if s in ("+Infinity", "Infinity"): + return float("inf") + if s == "-Infinity": + return float("-inf") + return float(s) + + +def _parse_map_entries(mapping: dict[str, str]) -> list[_MapEntry]: + """Pre-parse a scalar map dict into a list of (src, tgt, src_is_nan) tuples.""" + entries: list[_MapEntry] = [] + for src_str, tgt_str in mapping.items(): + src = _special_float(src_str) + tgt = _special_float(tgt_str) + entries.append((src, tgt, np.isnan(src))) + return entries + + +def _apply_scalar_map(work: np.ndarray, entries: list[_MapEntry]) -> None: + """Apply scalar map entries in-place. Single pass per entry.""" + for src, tgt, src_is_nan in entries: + if src_is_nan: + mask = np.isnan(work) + else: + mask = work == src + work[mask] = tgt + + +def _round_inplace(arr: np.ndarray, mode: RoundingMode) -> np.ndarray: + """Round array, returning result (may or may not be a new array). + + For nearest-away, requires 3 numpy ops. All others are a single op. + """ + if mode == "nearest-even": + return np.rint(arr) + elif mode == "towards-zero": + return np.trunc(arr) + elif mode == "towards-positive": + return np.ceil(arr) + elif mode == "towards-negative": + return np.floor(arr) + elif mode == "nearest-away": + return np.sign(arr) * np.floor(np.abs(arr) + 0.5) + raise ValueError(f"Unknown rounding mode: {mode}") + + +def _cast_array( + arr: np.ndarray, + target_dtype: np.dtype, + rounding: RoundingMode, + out_of_range: OutOfRangeMode | None, + scalar_map_entries: list[_MapEntry] | None, +) -> np.ndarray: + """Cast an array to target_dtype with rounding, out-of-range, and scalar_map handling. + + Optimized to minimize allocations and passes over the data. + For the simple case (no scalar_map, no rounding needed, no out-of-range), + this is essentially just ``arr.astype(target_dtype)``. + """ + src_is_int = np.issubdtype(arr.dtype, np.integer) + src_is_float = np.issubdtype(arr.dtype, np.floating) + tgt_is_int = np.issubdtype(target_dtype, np.integer) + tgt_is_float = np.issubdtype(target_dtype, np.floating) + + # Fast path: float→float with no scalar_map — single astype + if src_is_float and tgt_is_float and not scalar_map_entries: + return arr.astype(target_dtype) + + # Fast path: int→float with no scalar_map — single astype + if src_is_int and tgt_is_float and not scalar_map_entries: + return arr.astype(target_dtype) + + # Fast path: int→int with no scalar_map — check range then astype + if src_is_int and tgt_is_int and not scalar_map_entries: + # Check if source range could exceed target range + if arr.dtype.itemsize > target_dtype.itemsize or arr.dtype != target_dtype: + info = np.iinfo(target_dtype) + lo, hi = int(info.min), int(info.max) + arr_min, arr_max = int(arr.min()), int(arr.max()) + if arr_min >= lo and arr_max <= hi: + return arr.astype(target_dtype) + if out_of_range == "clamp": + return np.clip(arr, lo, hi).astype(target_dtype) + elif out_of_range == "wrap": + range_size = hi - lo + 1 + return ((arr.astype(np.int64) - lo) % range_size + lo).astype(target_dtype) + else: + oor_vals = arr[(arr < lo) | (arr > hi)] + raise ValueError( + f"Values out of range for {target_dtype} (valid range: [{lo}, {hi}]), " + f"got values in [{arr_min}, {arr_max}]. " + f"Out-of-range values: {oor_vals.ravel()!r}. " + f"Set out_of_range='clamp' or out_of_range='wrap' to handle this." + ) + return arr.astype(target_dtype) + + # float→int: needs rounding, range check, possibly scalar_map + if src_is_float and tgt_is_int: + # Work in float64 for the arithmetic + if arr.dtype != np.float64: + work = arr.astype(np.float64) + else: + work = arr.copy() + + if scalar_map_entries: + _apply_scalar_map(work, scalar_map_entries) + + # Check for unmapped NaN/Inf + bad = np.isnan(work) | np.isinf(work) + if bad.any(): + raise ValueError("Cannot cast NaN or Infinity to integer type without scalar_map") + + work = _round_inplace(work, rounding) + + info = np.iinfo(target_dtype) + lo, hi = float(info.min), float(info.max) + if out_of_range == "clamp": + np.clip(work, lo, hi, out=work) + elif out_of_range == "wrap": + range_size = int(info.max) - int(info.min) + 1 + oor = (work < lo) | (work > hi) + if oor.any(): + work[oor] = (work[oor].astype(np.int64) - int(info.min)) % range_size + int( + info.min + ) + elif (work.min() < lo) or (work.max() > hi): + oor_vals = work[(work < lo) | (work > hi)] + raise ValueError( + f"Values out of range for {target_dtype} (valid range: [{lo}, {hi}]), " + f"got values in [{work.min()}, {work.max()}]. " + f"Out-of-range values: {oor_vals.ravel()!r}. " + f"Set out_of_range='clamp' or out_of_range='wrap' to handle this." + ) + + return work.astype(target_dtype) + + # int→float with scalar_map + if src_is_int and tgt_is_float and scalar_map_entries: + work = arr.astype(np.float64) + _apply_scalar_map(work, scalar_map_entries) + return work.astype(target_dtype) + + # float→float with scalar_map + if src_is_float and tgt_is_float and scalar_map_entries: + work = arr.copy() + _apply_scalar_map(work, scalar_map_entries) + return work.astype(target_dtype) + + # int→int with scalar_map + if src_is_int and tgt_is_int and scalar_map_entries: + work = arr.astype(np.int64) + _apply_scalar_map(work, scalar_map_entries) + info = np.iinfo(target_dtype) + lo, hi = int(info.min), int(info.max) + w_min, w_max = int(work.min()), int(work.max()) + if w_min < lo or w_max > hi: + if out_of_range == "clamp": + np.clip(work, lo, hi, out=work) + elif out_of_range == "wrap": + range_size = hi - lo + 1 + oor = (work < lo) | (work > hi) + work[oor] = (work[oor] - lo) % range_size + lo + else: + oor_vals = work[(work < lo) | (work > hi)] + raise ValueError( + f"Values out of range for {target_dtype} (valid range: [{lo}, {hi}]), " + f"got values in [{w_min}, {w_max}]. " + f"Out-of-range values: {oor_vals.ravel()!r}. " + f"Set out_of_range='clamp' or out_of_range='wrap' to handle this." + ) + return work.astype(target_dtype) + + # Fallback + return arr.astype(target_dtype) + + +def _parse_scalar_map( + data: ScalarMapJSON | None, +) -> tuple[list[_MapEntry] | None, list[_MapEntry] | None]: + """Parse scalar_map JSON into pre-parsed encode and decode entry lists. + + Returns (encode_entries, decode_entries). Either may be None. + """ + if data is None: + return None, None + encode_raw: dict[str, str] = {} + decode_raw: dict[str, str] = {} + for src, tgt in data.get("encode", []): + encode_raw[str(src)] = str(tgt) + for src, tgt in data.get("decode", []): + decode_raw[str(src)] = str(tgt) + return ( + _parse_map_entries(encode_raw) if encode_raw else None, + _parse_map_entries(decode_raw) if decode_raw else None, + ) + + +@dataclass(frozen=True) +class CastValue(ArrayArrayCodec): + """Cast-value array-to-array codec. + + Value-converts array elements to a new data type during encoding, + and back to the original data type during decoding. + + Parameters + ---------- + data_type : str + Target zarr v3 data type name (e.g. "uint8", "float32"). + rounding : RoundingMode + How to round when exact representation is impossible. Default is "nearest-even". + out_of_range : OutOfRangeMode or None + What to do when a value is outside the target's range. + None means error. "clamp" clips to range. "wrap" uses modular arithmetic + (only valid for integer types). + scalar_map : dict or None + Explicit value overrides as JSON: {"encode": [[src, tgt], ...], "decode": [[src, tgt], ...]}. + """ + + is_fixed_size = True + + data_type: str + rounding: RoundingMode + out_of_range: OutOfRangeMode | None + scalar_map: ScalarMapJSON | None + + def __init__( + self, + *, + data_type: str, + rounding: RoundingMode = "nearest-even", + out_of_range: OutOfRangeMode | None = None, + scalar_map: ScalarMapJSON | None = None, + ) -> None: + object.__setattr__(self, "data_type", data_type) + object.__setattr__(self, "rounding", rounding) + object.__setattr__(self, "out_of_range", out_of_range) + object.__setattr__(self, "scalar_map", scalar_map) + + @classmethod + def from_dict(cls, data: dict[str, JSON]) -> Self: + _, configuration_parsed = parse_named_configuration( + data, "cast_value", require_configuration=True + ) + return cls(**configuration_parsed) # type: ignore[arg-type] + + def to_dict(self) -> dict[str, JSON]: + config: dict[str, JSON] = {"data_type": self.data_type} + if self.rounding != "nearest-even": + config["rounding"] = self.rounding + if self.out_of_range is not None: + config["out_of_range"] = self.out_of_range + if self.scalar_map is not None: + config["scalar_map"] = self.scalar_map + return {"name": "cast_value", "configuration": config} + + def _target_zdtype(self) -> ZDType[TBaseDType, TBaseScalar]: + return get_data_type_from_json(self.data_type, zarr_format=3) + + def validate( + self, + *, + shape: tuple[int, ...], + dtype: ZDType[TBaseDType, TBaseScalar], + chunk_grid: ChunkGrid, + ) -> None: + source_native = dtype.to_native_dtype() + target_native = self._target_zdtype().to_native_dtype() + for label, dt in [("source", source_native), ("target", target_native)]: + if not np.issubdtype(dt, np.integer) and not np.issubdtype(dt, np.floating): + raise ValueError( + f"cast_value codec only supports integer and floating-point data types. " + f"Got {label} dtype {dt}." + ) + if self.out_of_range == "wrap": + if not np.issubdtype(target_native, np.integer): + raise ValueError("out_of_range='wrap' is only valid for integer target types.") + + def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: + target_zdtype = self._target_zdtype() + target_native = target_zdtype.to_native_dtype() + source_native = chunk_spec.dtype.to_native_dtype() + + fill = chunk_spec.fill_value + fill_arr = np.array([fill], dtype=source_native) + + encode_entries, _ = _parse_scalar_map(self.scalar_map) + + new_fill_arr = _cast_array( + fill_arr, target_native, self.rounding, self.out_of_range, encode_entries + ) + new_fill = target_native.type(new_fill_arr[0]) + + return replace(chunk_spec, dtype=target_zdtype, fill_value=new_fill) + + def _encode_sync( + self, + chunk_array: NDBuffer, + _chunk_spec: ArraySpec, + ) -> NDBuffer | None: + arr = chunk_array.as_ndarray_like() + target_native = self._target_zdtype().to_native_dtype() + + encode_entries, _ = _parse_scalar_map(self.scalar_map) + + result = _cast_array( + np.asarray(arr), target_native, self.rounding, self.out_of_range, encode_entries + ) + return chunk_array.__class__.from_ndarray_like(result) + + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer | None: + return self._encode_sync(chunk_array, chunk_spec) + + def _decode_sync( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + arr = chunk_array.as_ndarray_like() + target_native = chunk_spec.dtype.to_native_dtype() + + _, decode_entries = _parse_scalar_map(self.scalar_map) + + result = _cast_array( + np.asarray(arr), target_native, self.rounding, self.out_of_range, decode_entries + ) + return chunk_array.__class__.from_ndarray_like(result) + + async def _decode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_array, chunk_spec) + + def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> int: + source_itemsize = chunk_spec.dtype.to_native_dtype().itemsize + target_itemsize = self._target_zdtype().to_native_dtype().itemsize + if source_itemsize == 0: + return 0 + num_elements = input_byte_length // source_itemsize + return num_elements * target_itemsize diff --git a/src/zarr/codecs/scale_offset.py b/src/zarr/codecs/scale_offset.py new file mode 100644 index 0000000000..55862137ae --- /dev/null +++ b/src/zarr/codecs/scale_offset.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from dataclasses import dataclass, field, replace +from typing import TYPE_CHECKING, Literal, NotRequired + +import numpy as np +from typing_extensions import TypedDict + +from zarr.abc.codec import ArrayArrayCodec +from zarr.core.common import JSON, NamedConfig + +if TYPE_CHECKING: + from typing import Self + + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import NDBuffer + from zarr.core.chunk_grids import ChunkGrid + from zarr.core.dtype.wrapper import TBaseDType, TBaseScalar, ZDType + + +class ScaleOffsetConfig(TypedDict, closed=True): # type: ignore[call-arg] + scale: NotRequired[JSON] + offset: NotRequired[JSON] + + +ScaleOffsetName = Literal["scale_offset"] + + +class ScaleOffsetJSON(NamedConfig[ScaleOffsetName, ScaleOffsetConfig]): + """The JSON form(s) of the `scale_offset` codec""" + + +@dataclass(kw_only=True, frozen=True) +class ScaleOffset(ArrayArrayCodec): + """Scale-offset array-to-array codec. + + Encodes values by subtracting an offset and multiplying by a scale factor. + Decodes by dividing by the scale and adding the offset. + + All arithmetic uses the input array's data type semantics. + + Parameters + ---------- + offset : float + Value subtracted during encoding. Default is 0. + scale : float + Value multiplied during encoding (after offset subtraction). Default is 1. + """ + + is_fixed_size: bool = field(default=True, init=False) + + offset: float = 0 + scale: float = 1 + + @classmethod + def from_dict(cls, data: ScaleOffsetJSON) -> Self: # type: ignore[override] + scale: float = data.get("configuration", {}).get("scale", 1) # type: ignore[assignment] + offset: float = data.get("configuration", {}).get("offset", 0) # type: ignore[assignment] + return cls(scale=scale, offset=offset) + + def to_dict(self) -> ScaleOffsetJSON: # type: ignore[override] + if self.offset == 0 and self.scale == 1: + return {"name": "scale_offset"} + config: ScaleOffsetConfig = {} # + if self.offset != 0: + config["offset"] = self.offset + if self.scale != 1: + config["scale"] = self.scale + return {"name": "scale_offset", "configuration": config} + + def validate( + self, + *, + shape: tuple[int, ...], + dtype: ZDType[TBaseDType, TBaseScalar], + chunk_grid: ChunkGrid, + ) -> None: + native = dtype.to_native_dtype() + if not np.issubdtype(native, np.integer) and not np.issubdtype(native, np.floating): + raise ValueError( + f"scale_offset codec only supports integer and floating-point data types. " + f"Got {dtype}." + ) + + def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: + """ + Define the effect of this codec on the spec for an array. The only change is to update + the output fill value by applying the scale + offset transformation. + """ + native_dtype = chunk_spec.dtype.to_native_dtype() + fill = chunk_spec.fill_value + new_fill = ( + np.dtype(native_dtype).type(fill) - native_dtype.type(self.offset) + ) * native_dtype.type(self.scale) + return replace(chunk_spec, fill_value=new_fill) + + def _encode_sync( + self, + chunk_array: NDBuffer, + _chunk_spec: ArraySpec, + ) -> NDBuffer | None: + arr = chunk_array.as_ndarray_like() + result = (arr - arr.dtype.type(self.offset)) * arr.dtype.type(self.scale) + return chunk_array.__class__.from_ndarray_like(result) + + async def _encode_single( + self, + chunk_data: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer | None: + return self._encode_sync(chunk_data, chunk_spec) + + def _decode_sync( + self, + chunk_array: NDBuffer, + _chunk_spec: ArraySpec, + ) -> NDBuffer: + arr = chunk_array.as_ndarray_like() + result = (arr / arr.dtype.type(self.scale)) + arr.dtype.type(self.offset) + return chunk_array.__class__.from_ndarray_like(result) + + async def _decode_single( + self, + chunk_data: NDBuffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_data, chunk_spec) + + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: + return input_byte_length diff --git a/src/zarr/core/chunk_grids/rectilinear.py b/src/zarr/core/chunk_grids/rectilinear.py new file mode 100644 index 0000000000..3985613244 --- /dev/null +++ b/src/zarr/core/chunk_grids/rectilinear.py @@ -0,0 +1,802 @@ +from __future__ import annotations + +import bisect +import itertools +import operator +from collections.abc import Sequence +from dataclasses import dataclass +from functools import cached_property, reduce +from typing import TYPE_CHECKING, Literal, Self, TypedDict + +import numpy as np +import numpy.typing as npt +from zarr.core.chunk_grids.common import ChunkEdgeLength, ChunkGrid + +from zarr.core.common import ( + JSON, + ShapeLike, + parse_named_configuration, + parse_shapelike, +) + +if TYPE_CHECKING: + from collections.abc import Iterator + + +class RectilinearChunkGridConfigurationDict(TypedDict): + """TypedDict for rectilinear chunk grid configuration""" + + kind: Literal["inline"] + chunk_shapes: Sequence[Sequence[ChunkEdgeLength]] + + +def _parse_chunk_shapes( + data: Sequence[Sequence[ChunkEdgeLength]], +) -> tuple[tuple[int, ...], ...]: + """ + Parse and expand chunk_shapes from metadata. + + Parameters + ---------- + data : Sequence[Sequence[ChunkEdgeLength]] + The chunk_shapes specification from metadata + + Returns + ------- + tuple[tuple[int, ...], ...] + Tuple of expanded chunk edge lengths for each axis + """ + # Runtime validation - strings are sequences but we don't want them + # Type annotation is for static typing, this validates actual JSON data + if isinstance(data, str) or not isinstance(data, Sequence): # type: ignore[redundant-expr,unreachable] + raise TypeError(f"chunk_shapes must be a sequence, got {type(data)}") + + result = [] + for i, axis_spec in enumerate(data): + # Runtime validation for each axis spec + if isinstance(axis_spec, str) or not isinstance(axis_spec, Sequence): # type: ignore[redundant-expr,unreachable] + raise TypeError(f"chunk_shapes[{i}] must be a sequence, got {type(axis_spec)}") + expanded = _expand_run_length_encoding(axis_spec) + result.append(expanded) + + return tuple(result) + + +def _expand_run_length_encoding(spec: Sequence[ChunkEdgeLength]) -> tuple[int, ...]: + """ + Expand a chunk edge length specification into a tuple of integers. + + The specification can contain: + - integers: representing explicit edge lengths + - tuples [value, count]: representing run-length encoded sequences + + Parameters + ---------- + spec : Sequence[ChunkEdgeLength] + The chunk edge length specification for one axis + + Returns + ------- + tuple[int, ...] + Expanded sequence of chunk edge lengths + + Examples + -------- + >>> _expand_run_length_encoding([2, 3]) + (2, 3) + >>> _expand_run_length_encoding([[2, 3]]) + (2, 2, 2) + >>> _expand_run_length_encoding([1, [2, 1], 3]) + (1, 2, 3) + >>> _expand_run_length_encoding([[1, 3], 3]) + (1, 1, 1, 3) + """ + result: list[int] = [] + for item in spec: + if isinstance(item, int): + # Explicit edge length + result.append(item) + elif isinstance(item, list | tuple): + # Run-length encoded: [value, count] + if len(item) != 2: + raise TypeError( + f"Run-length encoded items must be [int, int], got list of length {len(item)}" + ) + value, count = item + # Runtime validation of JSON data + if not isinstance(value, int) or not isinstance(count, int): # type: ignore[redundant-expr] + raise TypeError( + f"Run-length encoded items must be [int, int], got [{type(value).__name__}, {type(count).__name__}]" + ) + if count < 0: + raise ValueError(f"Run-length count must be non-negative, got {count}") + result.extend([value] * count) + else: + raise TypeError( + f"Chunk edge length must be int or [int, int] for run-length encoding, " + f"got {type(item)}" + ) + return tuple(result) + + +def _compress_run_length_encoding(chunks: tuple[int, ...]) -> list[int | list[int]]: + """ + Compress a sequence of chunk sizes to RLE format where beneficial. + + This function automatically detects runs of identical values and compresses them + using the [value, count] format. Single values or short runs are kept as-is. + + Parameters + ---------- + chunks : tuple[int, ...] + Sequence of chunk sizes along one dimension + + Returns + ------- + list[int | list[int]] + Compressed representation using RLE where beneficial + + Examples + -------- + >>> _compress_run_length_encoding((10, 10, 10, 10, 10, 10)) + [[10, 6]] + >>> _compress_run_length_encoding((10, 20, 30)) + [10, 20, 30] + >>> _compress_run_length_encoding((10, 10, 10, 20, 20, 30)) + [[10, 3], [20, 2], 30] + >>> _compress_run_length_encoding((5, 5, 10, 10, 10, 10, 15)) + [[5, 2], [10, 4], 15] + """ + if not chunks: + return [] + + result: list[int | list[int]] = [] + current_value = chunks[0] + current_count = 1 + + for value in chunks[1:]: + if value == current_value: + current_count += 1 + else: + # Decide whether to use RLE or explicit value + # Use RLE if count >= 3 to save space (tradeoff: [v,c] vs v,v,v) + if current_count >= 3: + result.append([current_value, current_count]) + elif current_count == 2: + # For count=2, RLE doesn't save space, but use it for consistency + result.append([current_value, current_count]) + else: + result.append(current_value) + + current_value = value + current_count = 1 + + # Handle the last run + if current_count >= 3 or current_count == 2: + result.append([current_value, current_count]) + else: + result.append(current_value) + + return result + + +def _normalize_rectilinear_chunks( + chunks: Sequence[Sequence[int | Sequence[int]]], shape: tuple[int, ...] +) -> tuple[tuple[int, ...], ...]: + """ + Normalize and validate variable chunks for RectilinearChunkGrid. + + Supports both explicit chunk sizes and run-length encoding (RLE). + RLE format: [[value, count]] expands to 'count' repetitions of 'value'. + + Parameters + ---------- + chunks : Sequence[Sequence[int | Sequence[int]]] + Nested sequence where each element is a sequence of chunk sizes along that dimension. + Each chunk size can be: + - An integer: explicit chunk size + - A sequence [value, count]: RLE format (expands to 'count' chunks of size 'value') + shape : tuple[int, ...] + The shape of the array. + + Returns + ------- + tuple[tuple[int, ...], ...] + Normalized chunk shapes as tuple of tuples. + + Raises + ------ + ValueError + If chunks don't match shape or sum incorrectly. + TypeError + If chunk specification format is invalid. + + Examples + -------- + >>> _normalize_rectilinear_chunks([[10, 20, 30], [25, 25]], (60, 50)) + ((10, 20, 30), (25, 25)) + >>> _normalize_rectilinear_chunks([[[10, 6]], [[10, 5]]], (60, 50)) + ((10, 10, 10, 10, 10, 10), (10, 10, 10, 10, 10)) + """ + # Expand RLE for each dimension + try: + chunk_shapes = tuple( + _expand_run_length_encoding(dim) # type: ignore[arg-type] + for dim in chunks + ) + except (TypeError, ValueError) as e: + raise TypeError( + f"Invalid variable chunks: {chunks}. Expected nested sequence of integers " + f"or RLE format [[value, count]]." + ) from e + + # Validate dimensionality + if len(chunk_shapes) != len(shape): + raise ValueError( + f"Variable chunks dimensionality ({len(chunk_shapes)}) " + f"must match array shape dimensionality ({len(shape)})" + ) + + # Validate that chunks sum to shape for each dimension + for i, (dim_chunks, dim_size) in enumerate(zip(chunk_shapes, shape, strict=False)): + chunk_sum = sum(dim_chunks) + if chunk_sum < dim_size: + raise ValueError( + f"Variable chunks along dimension {i} sum to {chunk_sum} " + f"but array shape is {dim_size}. " + f"Chunks must sum to be greater than or equal to the shape." + ) + if sum(dim_chunks[:-1]) >= dim_size: + raise ValueError( + f"Dimension {i} has more chunks than needed. " + f"The last chunk(s) would contain no valid data. " + f"Remove the extra chunk(s) or increase the array shape." + ) + + return chunk_shapes + + +@dataclass(frozen=True) +class RectilinearChunkGrid(ChunkGrid): + """ + A rectilinear chunk grid where chunk sizes vary along each axis. + + .. warning:: + This is an experimental feature and may change in future releases. + Expected to stabilize in Zarr version 3.3. + + Attributes + ---------- + chunk_shapes : tuple[tuple[int, ...], ...] + For each axis, a tuple of chunk edge lengths along that axis. + The sum of edge lengths must be >= the array shape along that axis. + """ + + _array_shape: tuple[int, ...] + chunk_shapes: tuple[tuple[int, ...], ...] + + def __init__(self, *, chunk_shapes: Sequence[Sequence[int]], array_shape: ShapeLike) -> None: + """ + Initialize a RectilinearChunkGrid. + + Parameters + ---------- + chunk_shapes : Sequence[Sequence[int]] + For each axis, a sequence of chunk edge lengths. + array_shape : ShapeLike + The shape of the array this chunk grid is bound to. + """ + array_shape_parsed = parse_shapelike(array_shape) + + # Convert to nested tuples and validate + parsed_shapes: list[tuple[int, ...]] = [] + for i, axis_chunks in enumerate(chunk_shapes): + if not isinstance(axis_chunks, Sequence): + raise TypeError(f"chunk_shapes[{i}] must be a sequence, got {type(axis_chunks)}") + # Validate all are positive integers + axis_tuple = tuple(axis_chunks) + for j, size in enumerate(axis_tuple): + if not isinstance(size, int): + raise TypeError( + f"chunk_shapes[{i}][{j}] must be an int, got {type(size).__name__}" + ) + if size <= 0: + raise ValueError(f"chunk_shapes[{i}][{j}] must be positive, got {size}") + parsed_shapes.append(axis_tuple) + + chunk_shapes_parsed = tuple(parsed_shapes) + object.__setattr__(self, "chunk_shapes", chunk_shapes_parsed) + object.__setattr__(self, "_array_shape", array_shape_parsed) + + # Validate array_shape is compatible with chunk_shapes + self._validate_array_shape() + + @property + def array_shape(self) -> tuple[int, ...]: + return self._array_shape + + @classmethod + def from_dict( # type: ignore[override] + cls, data: dict[str, JSON], *, array_shape: ShapeLike + ) -> Self: + """ + Parse a RectilinearChunkGrid from metadata dict. + + Parameters + ---------- + data : dict[str, JSON] + Metadata dictionary with 'name' and 'configuration' keys + array_shape : ShapeLike + The shape of the array this chunk grid is bound to. + + Returns + ------- + Self + A RectilinearChunkGrid instance + """ + _, configuration = parse_named_configuration(data, "rectilinear") + + if not isinstance(configuration, dict): + raise TypeError(f"configuration must be a dict, got {type(configuration)}") + + # Validate kind field + kind = configuration.get("kind") + if kind != "inline": + raise ValueError(f"Only 'inline' kind is supported, got {kind!r}") + + # Parse chunk_shapes with run-length encoding support + chunk_shapes_raw = configuration.get("chunk_shapes") + if chunk_shapes_raw is None: + raise ValueError("configuration must contain 'chunk_shapes'") + + # Type ignore: JSON data validated at runtime by _parse_chunk_shapes + chunk_shapes_expanded = _parse_chunk_shapes(chunk_shapes_raw) # type: ignore[arg-type] + + return cls(chunk_shapes=chunk_shapes_expanded, array_shape=array_shape) + + def to_dict(self) -> dict[str, JSON]: + """ + Convert to metadata dict format with automatic RLE compression. + + This method automatically compresses chunk shapes using run-length encoding + where beneficial (runs of 2 or more identical values). This reduces metadata + size for arrays with many uniform chunks. + + Returns + ------- + dict[str, JSON] + Metadata dictionary with 'name' and 'configuration' keys + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[10, 10, 10, 10, 10, 10], [5, 5, 5, 5, 5]], array_shape=(60, 25)) + >>> grid.to_dict()['configuration']['chunk_shapes'] + [[[10, 6]], [[5, 5]]] + """ + # Compress each dimension using RLE where beneficial + chunk_shapes_compressed = [ + _compress_run_length_encoding(axis_chunks) for axis_chunks in self.chunk_shapes + ] + + return { + "name": "rectilinear", + "configuration": { + "kind": "inline", + "chunk_shapes": chunk_shapes_compressed, + }, + } + + def update_shape(self, new_shape: tuple[int, ...]) -> Self: + """ + Update the RectilinearChunkGrid to accommodate a new array shape. + + When resizing an array, this method adjusts the chunk grid to match the new shape. + For dimensions that grow, a new chunk is added with size equal to the size difference. + For dimensions that shrink, chunks are truncated or removed to fit the new shape. + + Parameters + ---------- + new_shape : tuple[int, ...] + The new shape of the array. Must have the same number of dimensions as the + chunk grid. + + Returns + ------- + Self + A new RectilinearChunkGrid instance with updated chunk shapes and array_shape + + Raises + ------ + ValueError + If the number of dimensions in new_shape doesn't match the number of dimensions + in the chunk grid + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[10, 20], [15, 15]], array_shape=(30, 30)) + >>> grid.update_shape((50, 40)) # Grow both dimensions + RectilinearChunkGrid(_array_shape=(50, 40), chunk_shapes=((10, 20, 20), (15, 15, 10))) + + >>> grid = RectilinearChunkGrid(chunk_shapes=[[10, 20, 30], [25, 25]], array_shape=(60, 50)) + >>> grid.update_shape((25, 30)) # Shrink first dimension + RectilinearChunkGrid(_array_shape=(25, 30), chunk_shapes=((10, 20), (25, 25))) + + Notes + ----- + This method is automatically called when an array is resized. The chunk size + strategy for growing dimensions adds a single new chunk with size equal to + the growth amount. This may not be optimal for all use cases, and users may + want to manually adjust chunk shapes after resizing. + """ + + if len(new_shape) != len(self.chunk_shapes): + raise ValueError( + f"new_shape has {len(new_shape)} dimensions but " + f"chunk_shapes has {len(self.chunk_shapes)} dimensions" + ) + + new_chunk_shapes: list[tuple[int, ...]] = [] + for dim in range(len(new_shape)): + old_dim_length = sum(self.chunk_shapes[dim]) + new_dim_chunks: tuple[int, ...] + if new_shape[dim] == old_dim_length: + new_dim_chunks = self.chunk_shapes[dim] # no changes + + elif new_shape[dim] > old_dim_length: + new_dim_chunks = (*self.chunk_shapes[dim], new_shape[dim] - old_dim_length) + else: + # drop chunk sizes that are not inside the shape anymore + total = 0 + i = 0 + for c in self.chunk_shapes[dim]: + i += 1 + total += c + if total >= new_shape[dim]: + break + # keep the last chunk (it may be too long) + new_dim_chunks = self.chunk_shapes[dim][:i] + + new_chunk_shapes.append(new_dim_chunks) + + return type(self)(chunk_shapes=tuple(new_chunk_shapes), array_shape=new_shape) + + def all_chunk_coords(self) -> Iterator[tuple[int, ...]]: + """ + Generate all chunk coordinates. + + Yields + ------ + tuple[int, ...] + Chunk coordinates + """ + nchunks_per_axis = [len(axis_chunks) for axis_chunks in self.chunk_shapes] + return itertools.product(*(range(n) for n in nchunks_per_axis)) + + def get_nchunks(self) -> int: + """ + Get the total number of chunks. + + Returns + ------- + int + Total number of chunks + """ + return reduce(operator.mul, (len(axis_chunks) for axis_chunks in self.chunk_shapes), 1) + + def _validate_array_shape(self) -> None: + """ + Validate that array_shape is compatible with chunk_shapes. + + Raises + ------ + ValueError + If array_shape is incompatible with chunk_shapes + """ + if len(self._array_shape) != len(self.chunk_shapes): + raise ValueError( + f"array_shape has {len(self._array_shape)} dimensions but " + f"chunk_shapes has {len(self.chunk_shapes)} dimensions" + ) + + for axis, (arr_size, axis_chunks) in enumerate( + zip(self._array_shape, self.chunk_shapes, strict=False) + ): + chunk_sum = sum(axis_chunks) + if chunk_sum < arr_size: + raise ValueError( + f"Sum of chunk sizes along axis {axis} is {chunk_sum} " + f"but array shape is {arr_size}. This is invalid for the " + "RectilinearChunkGrid." + ) + + @cached_property + def _cumulative_sizes(self) -> tuple[tuple[int, ...], ...]: + """ + Compute cumulative sizes for each axis. + + Returns a tuple of tuples where each inner tuple contains cumulative + chunk sizes for an axis. Used for efficient chunk boundary calculations. + + Returns + ------- + tuple[tuple[int, ...], ...] + Cumulative sizes for each axis + + Examples + -------- + For chunk_shapes = [[2, 3, 1], [4, 2]]: + Returns ((0, 2, 5, 6), (0, 4, 6)) + """ + result = [] + for axis_chunks in self.chunk_shapes: + cumsum = [0] + for size in axis_chunks: + cumsum.append(cumsum[-1] + size) + result.append(tuple(cumsum)) + return tuple(result) + + def get_chunk_start(self, chunk_coord: tuple[int, ...]) -> tuple[int, ...]: + """ + Get the starting position (offset) of a chunk in the array. + + Parameters + ---------- + chunk_coord : tuple[int, ...] + Chunk coordinates (indices into the chunk grid) + + Returns + ------- + tuple[int, ...] + Starting index of the chunk in the array + + Raises + ------ + IndexError + If chunk_coord is out of bounds + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[2, 2, 2], [3, 3]], array_shape=(6, 6)) + >>> grid.get_chunk_start((0, 0)) + (0, 0) + >>> grid.get_chunk_start((1, 1)) + (2, 3) + """ + # Validate chunk coordinates are in bounds + for axis, (coord, axis_chunks) in enumerate( + zip(chunk_coord, self.chunk_shapes, strict=False) + ): + if not (0 <= coord < len(axis_chunks)): + raise IndexError( + f"chunk_coord[{axis}] = {coord} is out of bounds [0, {len(axis_chunks)})" + ) + + # Use cumulative sizes to get start position + return tuple(self._cumulative_sizes[axis][coord] for axis, coord in enumerate(chunk_coord)) + + def get_chunk_shape(self, chunk_coord: tuple[int, ...]) -> tuple[int, ...]: + """ + Get the shape of a specific chunk. + + Parameters + ---------- + chunk_coord : tuple[int, ...] + Chunk coordinates (indices into the chunk grid) + + Returns + ------- + tuple[int, ...] + Shape of the chunk + + Raises + ------ + IndexError + If chunk_coord is out of bounds + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[2, 3, 1], [4, 2]], array_shape=(6, 6)) + >>> grid.get_chunk_shape((0, 0)) + (2, 4) + >>> grid.get_chunk_shape((1, 0)) + (3, 4) + """ + # Validate chunk coordinates are in bounds + for axis, (coord, axis_chunks) in enumerate( + zip(chunk_coord, self.chunk_shapes, strict=False) + ): + if not (0 <= coord < len(axis_chunks)): + raise IndexError( + f"chunk_coord[{axis}] = {coord} is out of bounds [0, {len(axis_chunks)})" + ) + + # Get shape directly from chunk_shapes + return tuple( + axis_chunks[coord] + for axis_chunks, coord in zip(self.chunk_shapes, chunk_coord, strict=False) + ) + + def get_chunk_slice(self, chunk_coord: tuple[int, ...]) -> tuple[slice, ...]: + """ + Get the slice for indexing into an array for a specific chunk. + + Parameters + ---------- + chunk_coord : tuple[int, ...] + Chunk coordinates (indices into the chunk grid) + + Returns + ------- + tuple[slice, ...] + Slice tuple for indexing the array + + Raises + ------ + IndexError + If chunk_coord is out of bounds + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[2, 2, 2], [3, 3]], array_shape=(6, 6)) + >>> grid.get_chunk_slice((0, 0)) + (slice(0, 2, None), slice(0, 3, None)) + >>> grid.get_chunk_slice((1, 1)) + (slice(2, 4, None), slice(3, 6, None)) + """ + start = self.get_chunk_start(chunk_coord) + shape = self.get_chunk_shape(chunk_coord) + + return tuple(slice(s, s + length) for s, length in zip(start, shape, strict=False)) + + def get_chunk_grid_shape(self) -> tuple[int, ...]: + """ + Get the shape of the chunk grid (number of chunks per axis). + + Returns + ------- + tuple[int, ...] + Number of chunks along each axis + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[2, 2, 2], [3, 3]], array_shape=(6, 6)) + >>> grid.get_chunk_grid_shape() + (3, 2) + """ + return tuple(len(axis_chunks) for axis_chunks in self.chunk_shapes) + + def array_index_to_chunk_coord(self, array_index: tuple[int, ...]) -> tuple[int, ...]: + """ + Find which chunk contains a given array index. + + Parameters + ---------- + array_index : tuple[int, ...] + Index into the array + + Returns + ------- + tuple[int, ...] + Chunk coordinates containing the array index + + Raises + ------ + IndexError + If array_index is out of bounds + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[2, 3, 1], [4, 2]], array_shape=(6, 6)) + >>> grid.array_index_to_chunk_coord((0, 0)) + (0, 0) + >>> grid.array_index_to_chunk_coord((2, 0)) + (1, 0) + >>> grid.array_index_to_chunk_coord((5, 5)) + (2, 1) + """ + # Validate array index is in bounds + for axis, (idx, size) in enumerate(zip(array_index, self._array_shape, strict=False)): + if not (0 <= idx < size): + raise IndexError(f"array_index[{axis}] = {idx} is out of bounds [0, {size})") + + # Use binary search in cumulative sizes to find chunk coordinate + result = [] + for axis, idx in enumerate(array_index): + cumsum = self._cumulative_sizes[axis] + # bisect_right gives us the chunk index + 1, so subtract 1 + chunk_idx = bisect.bisect_right(cumsum, idx) - 1 + result.append(chunk_idx) + + return tuple(result) + + def array_indices_to_chunk_dim( + self, dim: int, indices: npt.NDArray[np.intp] + ) -> npt.NDArray[np.intp]: + """ + Vectorized mapping of array indices to chunk coordinates along one dimension. + + For RectilinearChunkGrid, uses np.searchsorted on cumulative sizes. + """ + cumsum = np.asarray(self._cumulative_sizes[dim]) + return np.searchsorted(cumsum, indices, side="right").astype(np.intp) - 1 + + def chunks_in_selection(self, selection: tuple[slice, ...]) -> Iterator[tuple[int, ...]]: + """ + Get all chunks that intersect with a given selection. + + Parameters + ---------- + selection : tuple[slice, ...] + Selection (slices) into the array + + Yields + ------ + tuple[int, ...] + Chunk coordinates that intersect with the selection + + Raises + ------ + ValueError + If selection is invalid + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[2, 2, 2], [3, 3]], array_shape=(6, 6)) + >>> selection = (slice(1, 5), slice(2, 5)) + >>> list(grid.chunks_in_selection(selection)) + [(0, 0), (0, 1), (1, 0), (1, 1), (2, 0), (2, 1)] + """ + # Normalize slices and find chunk ranges for each axis + chunk_ranges = [] + for axis, (sel, size) in enumerate(zip(selection, self._array_shape, strict=False)): + if not isinstance(sel, slice): + raise TypeError(f"selection[{axis}] must be a slice, got {type(sel)}") + + # Normalize slice with array size + start, stop, step = sel.indices(size) + + if step != 1: + raise ValueError(f"selection[{axis}] has step={step}, only step=1 is supported") + + if start >= stop: + # Empty selection + return + + # Find first and last chunk that intersect with [start, stop) + start_chunk = self.array_index_to_chunk_coord( + tuple(start if i == axis else 0 for i in range(len(self._array_shape))) + )[axis] + + # stop-1 is the last index we need + end_chunk = self.array_index_to_chunk_coord( + tuple(stop - 1 if i == axis else 0 for i in range(len(self._array_shape))) + )[axis] + + chunk_ranges.append(range(start_chunk, end_chunk + 1)) + + # Generate all combinations of chunk coordinates + yield from itertools.product(*chunk_ranges) + + def chunks_per_dim(self, dim: int) -> int: + """ + Get the number of chunks along a specific dimension. + + Parameters + ---------- + dim : int + Dimension index + + Returns + ------- + int + Number of chunks along the dimension + + Examples + -------- + >>> grid = RectilinearChunkGrid(chunk_shapes=[[10, 20], [5, 5, 5]], array_shape=(30, 15)) + >>> grid.chunks_per_dim(0) # 2 chunks along axis 0 + 2 + >>> grid.chunks_per_dim(1) # 3 chunks along axis 1 + 3 + """ + return len(self.chunk_shapes[dim]) diff --git a/tests/test_codecs/test_cast_value.py b/tests/test_codecs/test_cast_value.py new file mode 100644 index 0000000000..16b7bd3859 --- /dev/null +++ b/tests/test_codecs/test_cast_value.py @@ -0,0 +1,440 @@ +"""Tests for the cast_value codec.""" + +from __future__ import annotations + +import numpy as np +import pytest + +import zarr +from zarr.codecs.bytes import BytesCodec +from zarr.codecs.cast_value import CastValue +from zarr.codecs.scale_offset import ScaleOffset +from zarr.storage import MemoryStore + + +class TestCastValueCodec: + """Tests for the cast_value codec.""" + + def test_float64_to_float32(self) -> None: + """Cast float64 to float32 and back.""" + store = MemoryStore() + data = np.array([1.0, 2.0, 3.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(3,), + codecs=[CastValue(data_type="float32"), BytesCodec()], + ) + arr[:] = data + result = arr[:] + np.testing.assert_allclose(result, data) + + def test_float64_to_int32_towards_zero(self) -> None: + """Cast float64 to int32 with towards-zero rounding.""" + store = MemoryStore() + data = np.array([1.7, -1.7, 2.3, -2.3], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(4,), + codecs=[CastValue(data_type="int32", rounding="towards-zero"), BytesCodec()], + ) + arr[:] = data + result = arr[:] + # After encoding to int32 with towards-zero: [1, -1, 2, -2] + # After decoding back to float64: [1.0, -1.0, 2.0, -2.0] + np.testing.assert_array_equal(result, [1.0, -1.0, 2.0, -2.0]) + + def test_float64_to_uint8_clamp(self) -> None: + """Cast float64 to uint8 with clamping out-of-range values.""" + store = MemoryStore() + data = np.array([0.0, 128.0, 300.0, -10.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(4,), + codecs=[ + CastValue(data_type="uint8", rounding="nearest-even", out_of_range="clamp"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + np.testing.assert_array_equal(result, [0.0, 128.0, 255.0, 0.0]) + + def test_float64_to_int8_wrap(self) -> None: + """Cast float64 to int8 with wrapping for out-of-range values.""" + store = MemoryStore() + data = np.array([200.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(1,), + codecs=[ + CastValue(data_type="int8", rounding="nearest-even", out_of_range="wrap"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + # 200 wraps in int8 range [-128, 127]: (200 - (-128)) % 256 + (-128) = 328 % 256 - 128 = 72 - 128 = -56 + # Decode: -56 cast back to float64 = -56.0 + np.testing.assert_array_equal(result, [-56.0]) + + def test_nan_to_integer_without_scalar_map_errors(self) -> None: + """NaN cast to integer without scalar_map should raise.""" + store = MemoryStore() + data = np.array([float("nan")], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(1,), + codecs=[CastValue(data_type="uint8", out_of_range="clamp"), BytesCodec()], + ) + with pytest.raises(ValueError, match="Cannot cast NaN"): + arr[:] = data + + def test_nan_scalar_map(self) -> None: + """NaN should be mapped via scalar_map when provided.""" + store = MemoryStore() + data = np.array([1.0, float("nan"), 3.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(3,), + codecs=[ + CastValue( + data_type="uint8", + out_of_range="clamp", + scalar_map={ + "encode": [["NaN", 0]], + "decode": [[0, "NaN"]], + }, + ), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + assert result[0] == 1.0 # 1.0 survives round-trip + assert np.isnan(result[1]) # NaN -> 0 -> NaN via scalar_map + assert result[2] == 3.0 + + def test_rounding_nearest_even(self) -> None: + """nearest-even rounding: 0.5 rounds to 0, 1.5 rounds to 2.""" + store = MemoryStore() + data = np.array([0.5, 1.5, 2.5, 3.5], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(4,), + codecs=[ + CastValue(data_type="int32", rounding="nearest-even"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + np.testing.assert_array_equal(result, [0.0, 2.0, 2.0, 4.0]) + + def test_rounding_towards_positive(self) -> None: + """towards-positive rounds up (ceil).""" + store = MemoryStore() + data = np.array([1.1, -1.1, 1.9, -1.9], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(4,), + codecs=[ + CastValue(data_type="int32", rounding="towards-positive"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + np.testing.assert_array_equal(result, [2.0, -1.0, 2.0, -1.0]) + + def test_rounding_towards_negative(self) -> None: + """towards-negative rounds down (floor).""" + store = MemoryStore() + data = np.array([1.1, -1.1, 1.9, -1.9], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(4,), + codecs=[ + CastValue(data_type="int32", rounding="towards-negative"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + np.testing.assert_array_equal(result, [1.0, -2.0, 1.0, -2.0]) + + def test_rounding_nearest_away(self) -> None: + """nearest-away rounds 0.5 away from zero.""" + store = MemoryStore() + data = np.array([0.5, 1.5, -0.5, -1.5], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(4,), + codecs=[ + CastValue(data_type="int32", rounding="nearest-away"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + np.testing.assert_array_equal(result, [1.0, 2.0, -1.0, -2.0]) + + def test_out_of_range_errors_by_default(self) -> None: + """Without out_of_range, values outside target range should error.""" + store = MemoryStore() + data = np.array([300.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(1,), + codecs=[CastValue(data_type="uint8"), BytesCodec()], + ) + with pytest.raises(ValueError, match="out of range"): + arr[:] = data + + def test_wrap_only_valid_for_integers(self) -> None: + """wrap should be rejected for float target types.""" + with pytest.raises(ValueError, match="only valid for integer"): + zarr.create( + store=MemoryStore(), + shape=(5,), + dtype="float64", + chunks=(5,), + codecs=[ + CastValue(data_type="float32", out_of_range="wrap"), + BytesCodec(), + ], + ) + + def test_validate_rejects_complex_source(self) -> None: + """Validate should reject complex source dtype.""" + with pytest.raises(ValueError, match="only supports integer and floating-point"): + zarr.create( + store=MemoryStore(), + shape=(5,), + dtype="complex128", + chunks=(5,), + codecs=[CastValue(data_type="float64"), BytesCodec()], + ) + + def test_int32_to_int16_clamp(self) -> None: + """Integer-to-integer cast with clamping.""" + store = MemoryStore() + data = np.array([0, 100, 40000, -40000], dtype="int32") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(4,), + codecs=[ + CastValue(data_type="int16", out_of_range="clamp"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + np.testing.assert_array_equal(result, [0, 100, 32767, -32768]) + + def test_to_dict(self) -> None: + """Serialization to dict.""" + codec = CastValue( + data_type="uint8", + rounding="towards-zero", + out_of_range="clamp", + scalar_map={"encode": [["NaN", 0]], "decode": [[0, "NaN"]]}, + ) + d = codec.to_dict() + assert d["name"] == "cast_value" + assert d["configuration"]["data_type"] == "uint8" + assert d["configuration"]["rounding"] == "towards-zero" + assert d["configuration"]["out_of_range"] == "clamp" + assert d["configuration"]["scalar_map"] == { + "encode": [["NaN", 0]], + "decode": [[0, "NaN"]], + } + + def test_to_dict_minimal(self) -> None: + """Only required fields in dict when defaults are used.""" + codec = CastValue(data_type="float32") + d = codec.to_dict() + assert d == {"name": "cast_value", "configuration": {"data_type": "float32"}} + + def test_from_dict(self) -> None: + """Deserialization from dict.""" + codec = CastValue.from_dict( + { + "name": "cast_value", + "configuration": { + "data_type": "uint8", + "rounding": "towards-zero", + "out_of_range": "clamp", + }, + } + ) + assert codec.data_type == "uint8" + assert codec.rounding == "towards-zero" + assert codec.out_of_range == "clamp" + + def test_roundtrip_json(self) -> None: + """to_dict -> from_dict should preserve all parameters.""" + original = CastValue( + data_type="int16", + rounding="towards-negative", + out_of_range="clamp", + scalar_map={"encode": [["NaN", 0]]}, + ) + restored = CastValue.from_dict(original.to_dict()) + assert restored.data_type == original.data_type + assert restored.rounding == original.rounding + assert restored.out_of_range == original.out_of_range + assert restored.scalar_map == original.scalar_map + + def test_fill_value_cast(self) -> None: + """Fill value should be cast to the target dtype.""" + store = MemoryStore() + arr = zarr.create( + store=store, + shape=(5,), + dtype="float64", + chunks=(5,), + fill_value=42.0, + codecs=[CastValue(data_type="int32"), BytesCodec()], + ) + result = arr[:] + np.testing.assert_array_equal(result, np.full(5, 42.0)) + + def test_computed_encoded_size(self) -> None: + """Encoded size should reflect the target dtype's item size.""" + codec = CastValue(data_type="uint8") + from zarr.core.array_spec import ArrayConfig, ArraySpec + from zarr.core.buffer.cpu import buffer_prototype + from zarr.core.dtype import parse_dtype + + spec = ArraySpec( + shape=(10,), + dtype=parse_dtype("float64", zarr_format=3), + fill_value=0.0, + config=ArrayConfig.from_dict({}), + prototype=buffer_prototype, + ) + # 10 float64 elements = 80 bytes input, 10 uint8 elements = 10 bytes output + assert codec.compute_encoded_size(80, spec) == 10 + + +class TestScaleOffsetAndCastValueCombined: + """Tests for the combined scale_offset + cast_value codec pipeline.""" + + def test_float64_to_uint8_roundtrip(self) -> None: + """Typical usage: float64 -> scale_offset -> cast_value(uint8) -> bytes.""" + store = MemoryStore() + # Data in range [0, 25.5] maps to [0, 255] with scale=10 + data = np.array([0.0, 1.0, 2.5, 10.0, 25.5], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(5,), + codecs=[ + ScaleOffset(offset=0, scale=10), + CastValue(data_type="uint8", out_of_range="clamp"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + np.testing.assert_allclose(result, data, atol=0.1) + + def test_temperature_storage_pattern(self) -> None: + """Realistic pattern: store temperature data as uint8. + + Temperature range: -10°C to 45°C + Encode: (temp - (-10)) * (255/55) = (temp + 10) * 4.636... + Use offset=-10, scale=255/55 + """ + store = MemoryStore() + offset = -10.0 + scale = 255.0 / 55.0 + data = np.array([-10.0, 0.0, 20.0, 37.5, 45.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(5,), + codecs=[ + ScaleOffset(offset=offset, scale=scale), + CastValue(data_type="uint8", out_of_range="clamp"), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + # Precision limited by uint8 quantization (~0.22°C step) + np.testing.assert_allclose(result, data, atol=0.25) + + def test_nan_handling_pipeline(self) -> None: + """NaN values should be handled via scalar_map in cast_value.""" + store = MemoryStore() + data = np.array([1.0, float("nan"), 3.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(3,), + fill_value=float("nan"), + codecs=[ + ScaleOffset(offset=0, scale=1), + CastValue( + data_type="uint8", + out_of_range="clamp", + scalar_map={ + "encode": [["NaN", 0]], + "decode": [[0, "NaN"]], + }, + ), + BytesCodec(), + ], + ) + arr[:] = data + result = arr[:] + assert result[0] == 1.0 + assert np.isnan(result[1]) + assert result[2] == 3.0 + + def test_metadata_persistence(self) -> None: + """Array metadata should be correctly persisted and reloaded.""" + store = MemoryStore() + zarr.create( + store=store, + shape=(10,), + dtype="float64", + chunks=(10,), + codecs=[ + ScaleOffset(offset=5, scale=0.5), + CastValue(data_type="int16", out_of_range="clamp"), + BytesCodec(), + ], + ) + # Reopen from same store + arr2 = zarr.open_array(store, mode="r") + assert arr2.dtype == np.dtype("float64") + assert arr2.shape == (10,) diff --git a/tests/test_codecs/test_scale_offset.py b/tests/test_codecs/test_scale_offset.py new file mode 100644 index 0000000000..ad131c6493 --- /dev/null +++ b/tests/test_codecs/test_scale_offset.py @@ -0,0 +1,166 @@ +"""Tests for the scale_offset codec.""" + +from __future__ import annotations + +import numpy as np +import pytest + +import zarr +from zarr.codecs.bytes import BytesCodec +from zarr.codecs.scale_offset import ScaleOffset +from zarr.storage import MemoryStore + + +class TestScaleOffsetCodec: + """Tests for the scale_offset codec.""" + + def test_identity(self) -> None: + """Default parameters (offset=0, scale=1) should be a no-op.""" + store = MemoryStore() + data = np.arange(20, dtype="float64").reshape(4, 5) + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(4, 5), + codecs=[ScaleOffset(), BytesCodec()], + ) + arr[:] = data + np.testing.assert_array_equal(arr[:], data) + + def test_encode_decode_float64(self) -> None: + """Encode/decode round-trip with float64 data.""" + store = MemoryStore() + data = np.array([10.0, 20.0, 30.0, 40.0, 50.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(5,), + codecs=[ScaleOffset(offset=10, scale=0.1), BytesCodec()], + ) + arr[:] = data + result = arr[:] + np.testing.assert_allclose(result, data, rtol=1e-10) + + def test_encode_decode_float32(self) -> None: + """Round-trip with float32 data.""" + store = MemoryStore() + data = np.array([1.0, 2.0, 3.0], dtype="float32") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(3,), + codecs=[ScaleOffset(offset=1, scale=2), BytesCodec()], + ) + arr[:] = data + result = arr[:] + np.testing.assert_allclose(result, data, rtol=1e-6) + + def test_encode_decode_integer(self) -> None: + """Round-trip with integer data (uses integer arithmetic semantics).""" + store = MemoryStore() + data = np.array([10, 20, 30, 40, 50], dtype="int32") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(5,), + codecs=[ScaleOffset(offset=10, scale=1), BytesCodec()], + ) + arr[:] = data + result = arr[:] + np.testing.assert_array_equal(result, data) + + def test_offset_only(self) -> None: + """Test with only offset (scale=1).""" + store = MemoryStore() + data = np.array([100.0, 200.0, 300.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(3,), + codecs=[ScaleOffset(offset=100), BytesCodec()], + ) + arr[:] = data + np.testing.assert_allclose(arr[:], data) + + def test_scale_only(self) -> None: + """Test with only scale (offset=0).""" + store = MemoryStore() + data = np.array([1.0, 2.0, 3.0], dtype="float64") + arr = zarr.create( + store=store, + shape=data.shape, + dtype=data.dtype, + chunks=(3,), + codecs=[ScaleOffset(scale=10), BytesCodec()], + ) + arr[:] = data + np.testing.assert_allclose(arr[:], data) + + def test_fill_value_transformed(self) -> None: + """Fill value should be transformed through the codec.""" + store = MemoryStore() + arr = zarr.create( + store=store, + shape=(5,), + dtype="float64", + chunks=(5,), + fill_value=100.0, + codecs=[ScaleOffset(offset=10, scale=2), BytesCodec()], + ) + # Without writing, reading should return the fill value + result = arr[:] + np.testing.assert_allclose(result, np.full(5, 100.0)) + + def test_validate_rejects_complex(self) -> None: + """Validate should reject complex dtypes.""" + with pytest.raises(ValueError, match="only supports integer and floating-point"): + zarr.create( + store=MemoryStore(), + shape=(5,), + dtype="complex128", + chunks=(5,), + codecs=[ScaleOffset(offset=1, scale=2), BytesCodec()], + ) + + def test_to_dict_no_config(self) -> None: + """Default codec should serialize without configuration.""" + codec = ScaleOffset() + assert codec.to_dict() == {"name": "scale_offset"} + + def test_to_dict_with_config(self) -> None: + """Non-default codec should include configuration.""" + codec = ScaleOffset(offset=5, scale=0.1) + d = codec.to_dict() + assert d == {"name": "scale_offset", "configuration": {"offset": 5, "scale": 0.1}} + + def test_to_dict_offset_only(self) -> None: + """Only offset in config when scale is default.""" + codec = ScaleOffset(offset=5) + d = codec.to_dict() + assert d == {"name": "scale_offset", "configuration": {"offset": 5}} + + def test_from_dict_no_config(self) -> None: + """Parse codec from JSON with no configuration.""" + codec = ScaleOffset.from_dict({"name": "scale_offset"}) + assert codec.offset == 0 + assert codec.scale == 1 + + def test_from_dict_with_config(self) -> None: + """Parse codec from JSON with configuration.""" + codec = ScaleOffset.from_dict( + {"name": "scale_offset", "configuration": {"offset": 5, "scale": 0.1}} + ) + assert codec.offset == 5 + assert codec.scale == 0.1 + + def test_roundtrip_json(self) -> None: + """to_dict -> from_dict should preserve parameters.""" + original = ScaleOffset(offset=3.14, scale=2.71) + restored = ScaleOffset.from_dict(original.to_dict()) + assert restored.offset == original.offset + assert restored.scale == original.scale