Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions mkdocs/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,27 @@ The environment variable picked up by Iceberg starts with `PYICEBERG_` and then

For example, `PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID`, sets `s3.access-key-id` on the `default` catalog.

## Manifest Caching

PyIceberg caches `ManifestFile` objects locally and uses an LRU policy to bound the cache size. By default, up to `128`
manifest entries are retained.

You can tune the `manifest-cache-size` configuration in `.pyiceberg.yaml`:

```yaml
manifest-cache-size: 256
```
You can also set it with the `PYICEBERG_MANIFEST_CACHE_SIZE` environment variable:

```sh
export PYICEBERG_MANIFEST_CACHE_SIZE=256
```

The memory used by this cache depends on the size and number of distinct manifests your workload touches. Lower the value
if you want a tighter memory bound, or call `clear_manifest_cache()` to proactively release cached manifest metadata in
long-lived processes. Setting `manifest-cache-size` or `PYICEBERG_MANIFEST_CACHE_SIZE` to `0` disables manifest caching entirely.

## Tables

Iceberg tables support table properties to configure table behavior.
Expand Down
23 changes: 21 additions & 2 deletions pyiceberg/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import math
import threading
from abc import ABC, abstractmethod
from collections.abc import Iterator
from collections.abc import Iterator, MutableMapping
from copy import copy
from enum import Enum
from types import TracebackType
Expand Down Expand Up @@ -51,6 +51,7 @@
StringType,
StructType,
)
from pyiceberg.utils.config import Config

UNASSIGNED_SEQ = -1
DEFAULT_BLOCK_SIZE = 67108864 # 64 * 1024 * 1024
Expand Down Expand Up @@ -894,10 +895,25 @@ def __hash__(self) -> int:
# Global cache for ManifestFile objects, keyed by manifest_path.
# This deduplicates ManifestFile objects across manifest lists, which commonly
# share manifests after append operations.
_manifest_cache: LRUCache[str, ManifestFile] = LRUCache(maxsize=128)
_DEFAULT_MANIFEST_CACHE_SIZE = 128
_configured_manifest_cache_size = Config().get_int("manifest-cache-size")
_manifest_cache_size = (
max(_configured_manifest_cache_size, 0) if _configured_manifest_cache_size is not None else _DEFAULT_MANIFEST_CACHE_SIZE
)

# Lock for thread-safe cache access
_manifest_cache_lock = threading.RLock()
_manifest_cache: MutableMapping[str, ManifestFile] = LRUCache(maxsize=_manifest_cache_size) if _manifest_cache_size > 0 else {}


def clear_manifest_cache() -> None:
"""Clear cached ManifestFile objects.

This is primarily useful in long-lived or memory-sensitive processes that
want to release cached manifest metadata between bursts of table reads.
"""
with _manifest_cache_lock:
_manifest_cache.clear()


def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
Expand Down Expand Up @@ -927,6 +943,9 @@ def _manifests(io: FileIO, manifest_list: str) -> tuple[ManifestFile, ...]:
file = io.new_input(manifest_list)
manifest_files = list(read_manifest_list(file))

if _manifest_cache_size == 0:
return tuple(manifest_files)

result = []
with _manifest_cache_lock:
for manifest_file in manifest_files:
Expand Down
16 changes: 9 additions & 7 deletions tests/benchmark/test_memory_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@
import pyarrow as pa
import pytest

from pyiceberg import manifest as manifest_module
from pyiceberg.catalog.memory import InMemoryCatalog
from pyiceberg.manifest import _manifest_cache
from pyiceberg.manifest import clear_manifest_cache


def generate_test_dataframe() -> pa.Table:
Expand Down Expand Up @@ -64,7 +65,7 @@ def memory_catalog(tmp_path_factory: pytest.TempPathFactory) -> InMemoryCatalog:
@pytest.fixture(autouse=True)
def clear_caches() -> None:
"""Clear caches before each test."""
_manifest_cache.clear()
clear_manifest_cache()
gc.collect()


Expand Down Expand Up @@ -95,7 +96,7 @@ def test_manifest_cache_memory_growth(memory_catalog: InMemoryCatalog) -> None:
# Sample memory at intervals
if (i + 1) % 10 == 0:
current, _ = tracemalloc.get_traced_memory()
cache_size = len(_manifest_cache)
cache_size = len(manifest_module._manifest_cache)

memory_samples.append((i + 1, current, cache_size))
print(f" Iteration {i + 1}: Memory={current / 1024:.1f} KB, Cache entries={cache_size}")
Expand Down Expand Up @@ -150,13 +151,13 @@ def test_memory_after_gc_with_cache_cleared(memory_catalog: InMemoryCatalog) ->

gc.collect()
before_clear_memory, _ = tracemalloc.get_traced_memory()
cache_size_before = len(_manifest_cache)
cache_size_before = len(manifest_module._manifest_cache)
print(f" Memory before clear: {before_clear_memory / 1024:.1f} KB")
print(f" Cache size: {cache_size_before}")

# Phase 2: Clear cache and GC
print("\nPhase 2: Clearing cache and running GC...")
_manifest_cache.clear()
clear_manifest_cache()
gc.collect()
gc.collect() # Multiple GC passes for thorough cleanup

Expand Down Expand Up @@ -192,6 +193,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
ManifestEntry,
ManifestEntryStatus,
_manifests,
clear_manifest_cache,
write_manifest,
write_manifest_list,
)
Expand Down Expand Up @@ -245,7 +247,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
num_lists = 10
print(f"Creating {num_lists} manifest lists with overlapping manifests...")

_manifest_cache.clear()
clear_manifest_cache()

for i in range(num_lists):
list_path = f"{tmp_dir}/manifest-list_{i}.avro"
Expand All @@ -265,7 +267,7 @@ def test_manifest_cache_deduplication_efficiency() -> None:
_manifests(io, list_path)

# Analyze cache efficiency
cache_entries = len(_manifest_cache)
cache_entries = len(manifest_module._manifest_cache)
# List i contains manifests 0..i, so only the first num_lists manifests are actually used
manifests_actually_used = num_lists

Expand Down
186 changes: 177 additions & 9 deletions tests/utils/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
# specific language governing permissions and limitations
# under the License.
# pylint: disable=redefined-outer-name,arguments-renamed,fixme
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any

import fastavro
import pytest

import pyiceberg.manifest as manifest_module
from pyiceberg.avro.codecs import AvroCompressionCodec
from pyiceberg.io import load_file_io
from pyiceberg.io.pyarrow import PyArrowFileIO
Expand All @@ -32,8 +36,8 @@
ManifestEntryStatus,
ManifestFile,
PartitionFieldSummary,
_manifest_cache,
_manifests,
clear_manifest_cache,
read_manifest_list,
write_manifest,
write_manifest_list,
Expand All @@ -46,9 +50,8 @@


@pytest.fixture(autouse=True)
def clear_global_manifests_cache() -> None:
# Clear the global cache before each test
_manifest_cache.clear()
def reset_global_manifests_cache() -> None:
clear_manifest_cache()


def _verify_metadata_with_fastavro(avro_file: str, expected_metadata: dict[str, str]) -> None:
Expand Down Expand Up @@ -804,9 +807,9 @@ def test_manifest_cache_deduplicates_manifest_files() -> None:

# Verify cache size - should only have 3 unique ManifestFile objects
# instead of 1 + 2 + 3 = 6 objects as with the old approach
assert len(_manifest_cache) == 3, (
f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(_manifest_cache)}"
)
cache = manifest_module._manifest_cache
assert cache is not None, "Manifest cache should be enabled for this test"
assert len(cache) == 3, f"Cache should contain exactly 3 unique ManifestFile objects, but has {len(cache)}"


def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
Expand Down Expand Up @@ -879,9 +882,11 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
# With the new approach, we should have exactly N objects

# Verify cache has exactly N unique entries
assert len(_manifest_cache) == num_manifests, (
cache = manifest_module._manifest_cache
assert cache is not None, "Manifest cache should be enabled for this test"
assert len(cache) == num_manifests, (
f"Cache should contain exactly {num_manifests} ManifestFile objects, "
f"but has {len(_manifest_cache)}. "
f"but has {len(cache)}. "
f"Old approach would have {num_manifests * (num_manifests + 1) // 2} objects."
)

Expand Down Expand Up @@ -932,3 +937,166 @@ def test_manifest_writer_tell(format_version: TableVersion) -> None:
after_entry_bytes = writer.tell()

assert after_entry_bytes > initial_bytes, "Bytes should increase after adding entry"


def _create_test_manifest_list(module: Any, io: PyArrowFileIO, tmp_dir: str, name: str, snapshot_id: int) -> str:
schema = Schema(NestedField(field_id=1, name="id", field_type=IntegerType(), required=True))
spec = UNPARTITIONED_PARTITION_SPEC

manifest_path = f"{tmp_dir}/manifest-{name}.avro"
with module.write_manifest(
format_version=2,
spec=spec,
schema=schema,
output_file=io.new_output(manifest_path),
snapshot_id=snapshot_id,
avro_compression="zstandard",
) as writer:
data_file = module.DataFile.from_args(
content=module.DataFileContent.DATA,
file_path=f"{tmp_dir}/data-{name}.parquet",
file_format=module.FileFormat.PARQUET,
partition=Record(),
record_count=100,
file_size_in_bytes=1000,
)
writer.add_entry(
module.ManifestEntry.from_args(
status=module.ManifestEntryStatus.ADDED,
snapshot_id=snapshot_id,
data_file=data_file,
)
)
manifest_file = writer.to_manifest_file()

list_path = f"{tmp_dir}/manifest-list-{name}.avro"
with module.write_manifest_list(
format_version=2,
output_file=io.new_output(list_path),
snapshot_id=snapshot_id,
parent_snapshot_id=snapshot_id - 1 if snapshot_id > 1 else None,
sequence_number=snapshot_id,
avro_compression="zstandard",
) as list_writer:
list_writer.add_manifests([manifest_file])

return list_path


def test_clear_manifest_cache() -> None:
"""Test that clear_manifest_cache() clears cache entries while keeping cache enabled."""
io = PyArrowFileIO()

with TemporaryDirectory() as tmp_dir:
list_path = _create_test_manifest_list(manifest_module, io, tmp_dir, name="clear", snapshot_id=1)

# Populate the cache
_manifests(io, list_path)

# Verify cache has entries
cache = manifest_module._manifest_cache
assert cache is not None, "Cache should be enabled"
assert len(cache) > 0, "Cache should have entries after reading manifests"

# Clear the cache
clear_manifest_cache()

# Verify cache is empty but still enabled
cache_after = manifest_module._manifest_cache
assert cache_after is not None, "Cache should still be enabled after clear"
assert len(cache_after) == 0, "Cache should be empty after clear"


@pytest.mark.parametrize("cache_size", ["0", "-1"])
def test_manifest_cache_can_be_disabled_with_non_positive_size(monkeypatch: pytest.MonkeyPatch, cache_size: str) -> None:
"""Test that non-positive manifest-cache-size values disable caching."""
monkeypatch.setenv("PYICEBERG_MANIFEST_CACHE_SIZE", cache_size)
importlib.reload(manifest_module)

try:
assert manifest_module._manifest_cache_size == 0
assert len(manifest_module._manifest_cache) == 0

io = PyArrowFileIO()

with TemporaryDirectory() as tmp_dir:
list_path = _create_test_manifest_list(manifest_module, io, tmp_dir, name="disabled", snapshot_id=1)

manifests_first_call = manifest_module._manifests(io, list_path)
manifests_second_call = manifest_module._manifests(io, list_path)

assert len(manifest_module._manifest_cache) == 0
assert manifests_first_call[0] is not manifests_second_call[0]
finally:
monkeypatch.delenv("PYICEBERG_MANIFEST_CACHE_SIZE", raising=False)
importlib.reload(manifest_module)


def test_manifest_cache_respects_positive_env_size(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that a positive manifest-cache-size enables a bounded cache."""
monkeypatch.setenv("PYICEBERG_MANIFEST_CACHE_SIZE", "1")
importlib.reload(manifest_module)

try:
assert manifest_module._manifest_cache_size == 1

io = PyArrowFileIO()

with TemporaryDirectory() as tmp_dir:
first_list_path = _create_test_manifest_list(manifest_module, io, tmp_dir, name="first", snapshot_id=1)
second_list_path = _create_test_manifest_list(manifest_module, io, tmp_dir, name="second", snapshot_id=2)

manifests_first_call = manifest_module._manifests(io, first_list_path)
manifests_second_call = manifest_module._manifests(io, first_list_path)

assert manifests_first_call[0] is manifests_second_call[0]
assert len(manifest_module._manifest_cache) == 1

manifest_module._manifests(io, second_list_path)

assert len(manifest_module._manifest_cache) == 1
finally:
monkeypatch.delenv("PYICEBERG_MANIFEST_CACHE_SIZE", raising=False)
importlib.reload(manifest_module)


def test_manifest_cache_reads_size_from_configuration_file(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
"""Test that manifest-cache-size can be loaded from .pyiceberg.yaml."""
config_dir = tmp_path / "config"
config_dir.mkdir()
(config_dir / ".pyiceberg.yaml").write_text("manifest-cache-size: 2\n", encoding="utf-8")

monkeypatch.delenv("PYICEBERG_MANIFEST_CACHE_SIZE", raising=False)
monkeypatch.setenv("PYICEBERG_HOME", str(config_dir))
importlib.reload(manifest_module)

try:
assert manifest_module._manifest_cache_size == 2

io = PyArrowFileIO()

with TemporaryDirectory() as tmp_dir:
first_list_path = _create_test_manifest_list(manifest_module, io, tmp_dir, name="first", snapshot_id=1)
second_list_path = _create_test_manifest_list(manifest_module, io, tmp_dir, name="second", snapshot_id=2)
third_list_path = _create_test_manifest_list(manifest_module, io, tmp_dir, name="third", snapshot_id=3)

manifest_module._manifests(io, first_list_path)
manifest_module._manifests(io, second_list_path)
manifest_module._manifests(io, third_list_path)

assert len(manifest_module._manifest_cache) == 2
finally:
monkeypatch.delenv("PYICEBERG_HOME", raising=False)
importlib.reload(manifest_module)


def test_invalid_manifest_cache_size_raises_value_error(monkeypatch: pytest.MonkeyPatch) -> None:
"""Test that invalid manifest-cache-size values raise a helpful error."""
monkeypatch.setenv("PYICEBERG_MANIFEST_CACHE_SIZE", "not-an-int")

try:
with pytest.raises(ValueError, match="manifest-cache-size should be an integer or left unset"):
importlib.reload(manifest_module)
finally:
monkeypatch.delenv("PYICEBERG_MANIFEST_CACHE_SIZE", raising=False)
importlib.reload(manifest_module)
Loading