From 0677f8eda483ec8be840e3931296911161bc66a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Thu, 12 Feb 2026 11:02:03 +0100 Subject: [PATCH 1/8] Added lock disable --- pyiceberg/catalog/hive.py | 191 ++++++++++++++++++++----------------- tests/catalog/test_hive.py | 67 +++++++++++++ 2 files changed, 171 insertions(+), 87 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 1bec186ca8..6a3292c15f 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -127,6 +127,9 @@ HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name" HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive" +LOCK_ENABLED = "lock-enabled" +DEFAULT_LOCK_ENABLED = True + LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time" LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time" LOCK_CHECK_RETRIES = "lock-check-retries" @@ -301,6 +304,7 @@ def __init__(self, name: str, **properties: str): super().__init__(name, **properties) self._client = self._create_hive_client(properties) + self._lock_enabled = property_as_bool(properties, LOCK_ENABLED, DEFAULT_LOCK_ENABLED) self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME) self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME) self._lock_check_retries = property_as_float( @@ -499,6 +503,91 @@ def _do_wait_for_lock() -> LockResponse: return _do_wait_for_lock() + def _do_commit( + self, open_client: Client, table_identifier: Identifier, database_name: str, table_name: str, + requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...], + ) -> CommitTableResponse: + """Perform the actual commit logic (get table, update, write metadata, alter/create in HMS). + + This method contains the core commit logic, separated from locking concerns. + """ + hive_table: HiveTable | None + current_table: Table | None + try: + hive_table = self._get_hive_table(open_client, database_name, table_name) + current_table = self._convert_hive_into_iceberg(hive_table) + except NoSuchTableError: + hive_table = None + current_table = None + + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) + if current_table and updated_staged_table.metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + ) + + if hive_table and current_table: + # Table exists, update it. + + # Note on table properties: + # - Iceberg table properties are stored in both HMS and Iceberg metadata JSON. + # - Updates are reflected in both locations + # - Existing HMS table properties (set by external systems like Hive/Spark) are preserved. + # + # While it is possible to modify HMS table properties through this API, it is not recommended: + # - Mixing HMS-specific properties in Iceberg metadata can cause confusion + # - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific) + # - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg + # (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata, + # and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS) + new_iceberg_properties = _construct_parameters( + metadata_location=updated_staged_table.metadata_location, + previous_metadata_location=current_table.metadata_location, + metadata_properties=updated_staged_table.properties, + ) + # Detect properties that were removed from Iceberg metadata + deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys() + + # Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties + existing_hms_parameters = dict(hive_table.parameters or {}) + for key in deleted_iceberg_properties: + existing_hms_parameters.pop(key, None) + existing_hms_parameters.update(new_iceberg_properties) + hive_table.parameters = existing_hms_parameters + + # Update hive's schema and properties + hive_table.sd = _construct_hive_storage_descriptor( + updated_staged_table.schema(), + updated_staged_table.location(), + property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), + ) + open_client.alter_table_with_environment_context( + dbname=database_name, + tbl_name=table_name, + new_tbl=hive_table, + environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}), + ) + else: + # Table does not exist, create it. + hive_table = self._convert_iceberg_into_hive( + StagedTable( + identifier=(database_name, table_name), + metadata=updated_staged_table.metadata, + metadata_location=updated_staged_table.metadata_location, + io=updated_staged_table.io, + catalog=self, + ) + ) + self._create_hive_table(open_client, hive_table) + + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) + def commit_table( self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -521,95 +610,23 @@ def commit_table( # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 with self._client as open_client: - lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) + if self._lock_enabled: + lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) - try: - if lock.state != LockState.ACQUIRED: - if lock.state == LockState.WAITING: - self._wait_for_lock(database_name, table_name, lock.lockid, open_client) - else: - raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") - - hive_table: HiveTable | None - current_table: Table | None try: - hive_table = self._get_hive_table(open_client, database_name, table_name) - current_table = self._convert_hive_into_iceberg(hive_table) - except NoSuchTableError: - hive_table = None - current_table = None - - updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) - if current_table and updated_staged_table.metadata == current_table.metadata: - # no changes, do nothing - return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) - self._write_metadata( - metadata=updated_staged_table.metadata, - io=updated_staged_table.io, - metadata_path=updated_staged_table.metadata_location, - ) - - if hive_table and current_table: - # Table exists, update it. - - # Note on table properties: - # - Iceberg table properties are stored in both HMS and Iceberg metadata JSON. - # - Updates are reflected in both locations - # - Existing HMS table properties (set by external systems like Hive/Spark) are preserved. - # - # While it is possible to modify HMS table properties through this API, it is not recommended: - # - Mixing HMS-specific properties in Iceberg metadata can cause confusion - # - New/updated HMS table properties will also be stored in Iceberg metadata (even though it is HMS-specific) - # - HMS-native properties (set outside Iceberg) cannot be deleted since they are not visible to Iceberg - # (However, if you first SET an HMS property via Iceberg, it becomes tracked in Iceberg metadata, - # and can then be deleted via Iceberg - which removes it from both Iceberg metadata and HMS) - new_iceberg_properties = _construct_parameters( - metadata_location=updated_staged_table.metadata_location, - previous_metadata_location=current_table.metadata_location, - metadata_properties=updated_staged_table.properties, - ) - # Detect properties that were removed from Iceberg metadata - deleted_iceberg_properties = current_table.properties.keys() - updated_staged_table.properties.keys() - - # Merge: preserve HMS-native properties, remove deleted Iceberg properties, apply new Iceberg properties - existing_hms_parameters = dict(hive_table.parameters or {}) - for key in deleted_iceberg_properties: - existing_hms_parameters.pop(key, None) - existing_hms_parameters.update(new_iceberg_properties) - hive_table.parameters = existing_hms_parameters - - # Update hive's schema and properties - hive_table.sd = _construct_hive_storage_descriptor( - updated_staged_table.schema(), - updated_staged_table.location(), - property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), - ) - open_client.alter_table_with_environment_context( - dbname=database_name, - tbl_name=table_name, - new_tbl=hive_table, - environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}), - ) - else: - # Table does not exist, create it. - hive_table = self._convert_iceberg_into_hive( - StagedTable( - identifier=(database_name, table_name), - metadata=updated_staged_table.metadata, - metadata_location=updated_staged_table.metadata_location, - io=updated_staged_table.io, - catalog=self, - ) - ) - self._create_hive_table(open_client, hive_table) - except WaitingForLockException as e: - raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e - finally: - open_client.unlock(UnlockRequest(lockid=lock.lockid)) - - return CommitTableResponse( - metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location - ) + if lock.state != LockState.ACQUIRED: + if lock.state == LockState.WAITING: + self._wait_for_lock(database_name, table_name, lock.lockid, open_client) + else: + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") + + return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates) + except WaitingForLockException as e: + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e + finally: + open_client.unlock(UnlockRequest(lockid=lock.lockid)) + else: + return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates) def load_table(self, identifier: str | Identifier) -> Table: """Load the table's metadata and return the table instance. diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 88b653e44f..29c1ccf943 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -51,6 +51,7 @@ LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, + LOCK_ENABLED, HiveCatalog, _construct_hive_storage_descriptor, _HiveClient, @@ -1407,3 +1408,69 @@ def test_create_hive_client_with_kerberos_using_context_manager( # closing and re-opening work as expected. with client as open_client: assert open_client._iprot.trans.isOpen() + + +def test_lock_enabled_defaults_to_true() -> None: + """Verify that lock-enabled defaults to True for backward compatibility.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + assert catalog._lock_enabled is True + + +def test_lock_enabled_can_be_disabled() -> None: + """Verify that lock-enabled can be set to false.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + assert catalog._lock_enabled is False + + +def test_commit_table_skips_locking_when_lock_disabled() -> None: + """When lock-enabled is false, commit_table must not call lock, check_lock, or unlock.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + catalog._client = MagicMock() + + mock_table = MagicMock() + mock_table.name.return_value = ("default", "my_table") + + mock_do_commit = MagicMock() + mock_do_commit.return_value = MagicMock() + + with patch.object(catalog, "_do_commit", mock_do_commit): + catalog.commit_table(mock_table, requirements=(), updates=()) + + # The core commit logic should still be called + mock_do_commit.assert_called_once() + + # But no locking operations should have been performed + catalog._client.__enter__().lock.assert_not_called() + catalog._client.__enter__().check_lock.assert_not_called() + catalog._client.__enter__().unlock.assert_not_called() + + +def test_commit_table_uses_locking_when_lock_enabled() -> None: + """When lock-enabled is true (default), commit_table must call lock and unlock.""" + lockid = 99999 + prop = {"uri": HIVE_METASTORE_FAKE_URL} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + + mock_client = MagicMock() + mock_client.__enter__ = MagicMock(return_value=mock_client) + mock_client.__exit__ = MagicMock(return_value=False) + mock_client.lock.return_value = LockResponse(lockid=lockid, state=LockState.ACQUIRED) + catalog._client = mock_client + + mock_table = MagicMock() + mock_table.name.return_value = ("default", "my_table") + + mock_do_commit = MagicMock() + mock_do_commit.return_value = MagicMock() + + with patch.object(catalog, "_do_commit", mock_do_commit): + catalog.commit_table(mock_table, requirements=(), updates=()) + + # Locking operations should have been performed + mock_client.lock.assert_called_once() + mock_client.unlock.assert_called_once() + # The core commit logic should still be called + mock_do_commit.assert_called_once() From 3f183688b26a0af2bc74187f9234a22f2c752ac7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Thu, 5 Mar 2026 09:54:28 +0100 Subject: [PATCH 2/8] ruff format --- pyiceberg/catalog/hive.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 6a3292c15f..aee1a0d01e 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -504,8 +504,13 @@ def _do_wait_for_lock() -> LockResponse: return _do_wait_for_lock() def _do_commit( - self, open_client: Client, table_identifier: Identifier, database_name: str, table_name: str, - requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...], + self, + open_client: Client, + table_identifier: Identifier, + database_name: str, + table_name: str, + requirements: tuple[TableRequirement, ...], + updates: tuple[TableUpdate, ...], ) -> CommitTableResponse: """Perform the actual commit logic (get table, update, write metadata, alter/create in HMS). From 14ed05622a34ddd429339d08d17fa5d7afc37333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Fri, 6 Mar 2026 12:27:05 +0100 Subject: [PATCH 3/8] Refactored locking --- pyiceberg/catalog/hive.py | 20 ++++++++--- pyiceberg/table/__init__.py | 15 ++++---- tests/catalog/test_hive.py | 71 +++++++++++++++++++++++++++---------- 3 files changed, 75 insertions(+), 31 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index aee1a0d01e..aaaec2e7f3 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -127,9 +127,6 @@ HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name" HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive" -LOCK_ENABLED = "lock-enabled" -DEFAULT_LOCK_ENABLED = True - LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time" LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time" LOCK_CHECK_RETRIES = "lock-check-retries" @@ -304,7 +301,6 @@ def __init__(self, name: str, **properties: str): super().__init__(name, **properties) self._client = self._create_hive_client(properties) - self._lock_enabled = property_as_bool(properties, LOCK_ENABLED, DEFAULT_LOCK_ENABLED) self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME) self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME) self._lock_check_retries = property_as_float( @@ -593,6 +589,19 @@ def _do_commit( metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location ) + @staticmethod + def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool: + """Determine whether HMS locking is enabled for a commit. + + Matches the Java implementation in HiveTableOperations: checks the table property first, + then falls back to catalog properties, then defaults to True. + """ + if TableProperties.HIVE_LOCK_ENABLED in table_properties: + return property_as_bool( + table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT + ) + return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT) + def commit_table( self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -612,10 +621,11 @@ def commit_table( """ table_identifier = table.name() database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + lock_enabled = self._hive_lock_enabled(table.properties, self.properties) # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 with self._client as open_client: - if self._lock_enabled: + if lock_enabled: lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) try: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cc0d9ff341..4733f4a170 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -247,6 +247,9 @@ class TableProperties: MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep" MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1 + HIVE_LOCK_ENABLED = "engine.hive.lock-enabled" + HIVE_LOCK_ENABLED_DEFAULT = True + class Transaction: _table: Table @@ -2002,13 +2005,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) + return lambda datafile: residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), ) @staticmethod diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 29c1ccf943..83e25aea8e 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -51,7 +51,6 @@ LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, - LOCK_ENABLED, HiveCatalog, _construct_hive_storage_descriptor, _HiveClient, @@ -66,6 +65,7 @@ ) from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table import TableProperties from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV1, TableMetadataV2 from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( @@ -1410,28 +1410,65 @@ def test_create_hive_client_with_kerberos_using_context_manager( assert open_client._iprot.trans.isOpen() -def test_lock_enabled_defaults_to_true() -> None: - """Verify that lock-enabled defaults to True for backward compatibility.""" +def test_hive_lock_enabled_defaults_to_true() -> None: + """Without any lock property set, locking should be enabled (backward compatible).""" + assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties={}) is True + + +def test_hive_lock_enabled_table_property_disables_lock() -> None: + """Table property engine.hive.lock-enabled=false disables locking.""" + table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties={}) is False + + +def test_hive_lock_enabled_catalog_property_disables_lock() -> None: + """Catalog property engine.hive.lock-enabled=false disables locking when table doesn't set it.""" + catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties=catalog_props) is False + + +def test_hive_lock_enabled_table_property_overrides_catalog() -> None: + """Table property takes precedence over catalog property.""" + table_props = {TableProperties.HIVE_LOCK_ENABLED: "true"} + catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is True + + table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "true"} + assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is False + + +def test_commit_table_skips_locking_when_table_property_disables_it() -> None: + """When table property engine.hive.lock-enabled=false, commit_table must not lock/unlock.""" prop = {"uri": HIVE_METASTORE_FAKE_URL} catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) - assert catalog._lock_enabled is True + catalog._client = MagicMock() + + mock_table = MagicMock() + mock_table.name.return_value = ("default", "my_table") + mock_table.properties = {TableProperties.HIVE_LOCK_ENABLED: "false"} + mock_do_commit = MagicMock() + mock_do_commit.return_value = MagicMock() -def test_lock_enabled_can_be_disabled() -> None: - """Verify that lock-enabled can be set to false.""" - prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"} - catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) - assert catalog._lock_enabled is False + with patch.object(catalog, "_do_commit", mock_do_commit): + catalog.commit_table(mock_table, requirements=(), updates=()) + + mock_do_commit.assert_called_once() + catalog._client.__enter__().lock.assert_not_called() + catalog._client.__enter__().check_lock.assert_not_called() + catalog._client.__enter__().unlock.assert_not_called() -def test_commit_table_skips_locking_when_lock_disabled() -> None: - """When lock-enabled is false, commit_table must not call lock, check_lock, or unlock.""" - prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"} +def test_commit_table_skips_locking_when_catalog_property_disables_it() -> None: + """When catalog property engine.hive.lock-enabled=false, commit_table must not lock/unlock.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL, TableProperties.HIVE_LOCK_ENABLED: "false"} catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) catalog._client = MagicMock() mock_table = MagicMock() mock_table.name.return_value = ("default", "my_table") + mock_table.properties = {} mock_do_commit = MagicMock() mock_do_commit.return_value = MagicMock() @@ -1439,17 +1476,14 @@ def test_commit_table_skips_locking_when_lock_disabled() -> None: with patch.object(catalog, "_do_commit", mock_do_commit): catalog.commit_table(mock_table, requirements=(), updates=()) - # The core commit logic should still be called mock_do_commit.assert_called_once() - - # But no locking operations should have been performed catalog._client.__enter__().lock.assert_not_called() catalog._client.__enter__().check_lock.assert_not_called() catalog._client.__enter__().unlock.assert_not_called() -def test_commit_table_uses_locking_when_lock_enabled() -> None: - """When lock-enabled is true (default), commit_table must call lock and unlock.""" +def test_commit_table_uses_locking_by_default() -> None: + """When no lock property is set, commit_table must acquire and release a lock.""" lockid = 99999 prop = {"uri": HIVE_METASTORE_FAKE_URL} catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) @@ -1462,6 +1496,7 @@ def test_commit_table_uses_locking_when_lock_enabled() -> None: mock_table = MagicMock() mock_table.name.return_value = ("default", "my_table") + mock_table.properties = {} mock_do_commit = MagicMock() mock_do_commit.return_value = MagicMock() @@ -1469,8 +1504,6 @@ def test_commit_table_uses_locking_when_lock_enabled() -> None: with patch.object(catalog, "_do_commit", mock_do_commit): catalog.commit_table(mock_table, requirements=(), updates=()) - # Locking operations should have been performed mock_client.lock.assert_called_once() mock_client.unlock.assert_called_once() - # The core commit logic should still be called mock_do_commit.assert_called_once() From 4c58d42df2b8d93a0e60089e80cc564c2e425b8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Fri, 6 Mar 2026 12:32:18 +0100 Subject: [PATCH 4/8] revoked unnecessary change --- pyiceberg/table/__init__.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 4733f4a170..eb044ed434 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2005,11 +2005,13 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), + return lambda datafile: ( + residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), + ) ) @staticmethod From 3b09ecd43adecf96218c4437e1bc5bda914f310e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Mon, 9 Mar 2026 10:36:55 +0100 Subject: [PATCH 5/8] Adressed PR review - added expected_parameter_key --- pyiceberg/catalog/hive.py | 127 +++++++++++++++++++++---------------- tests/catalog/test_hive.py | 96 ++++++++++++++++++++++++++++ 2 files changed, 168 insertions(+), 55 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index aaaec2e7f3..e18f73a0bc 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -136,6 +136,9 @@ DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS" DO_NOT_UPDATE_STATS_DEFAULT = "true" +NO_LOCK_EXPECTED_KEY = "expected_parameter_key" +NO_LOCK_EXPECTED_VALUE = "expected_parameter_value" + logger = logging.getLogger(__name__) @@ -499,6 +502,66 @@ def _do_wait_for_lock() -> LockResponse: return _do_wait_for_lock() + @staticmethod + def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool: + """Determine whether HMS locking is enabled for a commit. + + Matches the Java implementation in HiveTableOperations: checks the table property first, + then falls back to catalog properties, then defaults to True. + """ + if TableProperties.HIVE_LOCK_ENABLED in table_properties: + return property_as_bool( + table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT + ) + return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT) + + def commit_table( + self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] + ) -> CommitTableResponse: + """Commit updates to a table. + + Args: + table (Table): The table to be updated. + requirements: (Tuple[TableRequirement, ...]): Table requirements. + updates: (Tuple[TableUpdate, ...]): Table updates. + + Returns: + CommitTableResponse: The updated metadata. + + Raises: + NoSuchTableError: If a table with the given identifier does not exist. + CommitFailedException: Requirement not met, or a conflict with a concurrent commit. + """ + table_identifier = table.name() + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + lock_enabled = self._hive_lock_enabled(table.properties, self.properties) + # commit to hive + # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 + with self._client as open_client: + if lock_enabled: + lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) + + try: + if lock.state != LockState.ACQUIRED: + if lock.state == LockState.WAITING: + self._wait_for_lock(database_name, table_name, lock.lockid, open_client) + else: + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") + + return self._do_commit( + open_client, table_identifier, database_name, table_name, requirements, updates, + lock_enabled=True, + ) + except WaitingForLockException as e: + raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e + finally: + open_client.unlock(UnlockRequest(lockid=lock.lockid)) + else: + return self._do_commit( + open_client, table_identifier, database_name, table_name, requirements, updates, + lock_enabled=False, + ) + def _do_commit( self, open_client: Client, @@ -507,10 +570,13 @@ def _do_commit( table_name: str, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...], + lock_enabled: bool = True, ) -> CommitTableResponse: """Perform the actual commit logic (get table, update, write metadata, alter/create in HMS). This method contains the core commit logic, separated from locking concerns. + When lock_enabled is False, an optimistic concurrency check via the HMS EnvironmentContext + is used instead (requires HIVE-26882 on the server). """ hive_table: HiveTable | None current_table: Table | None @@ -566,11 +632,16 @@ def _do_commit( updated_staged_table.location(), property_as_bool(self.properties, HIVE2_COMPATIBLE, HIVE2_COMPATIBLE_DEFAULT), ) + env_context_properties: dict[str, str] = {DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT} + if not lock_enabled: + env_context_properties[NO_LOCK_EXPECTED_KEY] = PROP_METADATA_LOCATION + env_context_properties[NO_LOCK_EXPECTED_VALUE] = current_table.metadata_location + open_client.alter_table_with_environment_context( dbname=database_name, tbl_name=table_name, new_tbl=hive_table, - environment_context=EnvironmentContext(properties={DO_NOT_UPDATE_STATS: DO_NOT_UPDATE_STATS_DEFAULT}), + environment_context=EnvironmentContext(properties=env_context_properties), ) else: # Table does not exist, create it. @@ -589,60 +660,6 @@ def _do_commit( metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location ) - @staticmethod - def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool: - """Determine whether HMS locking is enabled for a commit. - - Matches the Java implementation in HiveTableOperations: checks the table property first, - then falls back to catalog properties, then defaults to True. - """ - if TableProperties.HIVE_LOCK_ENABLED in table_properties: - return property_as_bool( - table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT - ) - return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT) - - def commit_table( - self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] - ) -> CommitTableResponse: - """Commit updates to a table. - - Args: - table (Table): The table to be updated. - requirements: (Tuple[TableRequirement, ...]): Table requirements. - updates: (Tuple[TableUpdate, ...]): Table updates. - - Returns: - CommitTableResponse: The updated metadata. - - Raises: - NoSuchTableError: If a table with the given identifier does not exist. - CommitFailedException: Requirement not met, or a conflict with a concurrent commit. - """ - table_identifier = table.name() - database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) - lock_enabled = self._hive_lock_enabled(table.properties, self.properties) - # commit to hive - # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 - with self._client as open_client: - if lock_enabled: - lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) - - try: - if lock.state != LockState.ACQUIRED: - if lock.state == LockState.WAITING: - self._wait_for_lock(database_name, table_name, lock.lockid, open_client) - else: - raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") - - return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates) - except WaitingForLockException as e: - raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") from e - finally: - open_client.unlock(UnlockRequest(lockid=lock.lockid)) - else: - return self._do_commit(open_client, table_identifier, database_name, table_name, requirements, updates) - def load_table(self, identifier: str | Identifier) -> Table: """Load the table's metadata and return the table instance. diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 83e25aea8e..05689b0415 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -51,6 +51,9 @@ LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, + NO_LOCK_EXPECTED_KEY, + NO_LOCK_EXPECTED_VALUE, + PROP_METADATA_LOCATION, HiveCatalog, _construct_hive_storage_descriptor, _HiveClient, @@ -1455,6 +1458,8 @@ def test_commit_table_skips_locking_when_table_property_disables_it() -> None: catalog.commit_table(mock_table, requirements=(), updates=()) mock_do_commit.assert_called_once() + _, kwargs = mock_do_commit.call_args + assert kwargs["lock_enabled"] is False catalog._client.__enter__().lock.assert_not_called() catalog._client.__enter__().check_lock.assert_not_called() catalog._client.__enter__().unlock.assert_not_called() @@ -1507,3 +1512,94 @@ def test_commit_table_uses_locking_by_default() -> None: mock_client.lock.assert_called_once() mock_client.unlock.assert_called_once() mock_do_commit.assert_called_once() + _, kwargs = mock_do_commit.call_args + assert kwargs["lock_enabled"] is True + + +def test_do_commit_env_context_includes_expected_params_when_lock_disabled() -> None: + """When lock_enabled=False, alter_table_with_environment_context must include + expected_parameter_key and expected_parameter_value for optimistic concurrency.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + + current_metadata_location = "s3://bucket/db/table/metadata/v1.metadata.json" + + mock_client = MagicMock() + mock_hive_table = MagicMock() + mock_hive_table.parameters = { + "table_type": "ICEBERG", + "metadata_location": current_metadata_location, + } + mock_client.get_table.return_value = mock_hive_table + + with ( + patch.object(catalog, "_convert_hive_into_iceberg") as mock_convert, + patch.object(catalog, "_update_and_stage_table") as mock_stage, + patch.object(catalog, "_write_metadata"), + patch.object(catalog, "_convert_iceberg_into_hive"), + patch("pyiceberg.catalog.hive.CommitTableResponse"), + ): + mock_current_table = MagicMock() + mock_current_table.metadata_location = current_metadata_location + mock_current_table.metadata = MagicMock() + mock_current_table.properties = {} + mock_convert.return_value = mock_current_table + + mock_staged = MagicMock() + mock_staged.metadata = MagicMock() + mock_staged.properties = {} + mock_stage.return_value = mock_staged + + catalog._do_commit( + mock_client, ("default", "my_table"), "default", "my_table", + requirements=(), updates=(), lock_enabled=False, + ) + + mock_client.alter_table_with_environment_context.assert_called_once() + env_ctx = mock_client.alter_table_with_environment_context.call_args[1]["environment_context"] + assert env_ctx.properties[NO_LOCK_EXPECTED_KEY] == PROP_METADATA_LOCATION + assert env_ctx.properties[NO_LOCK_EXPECTED_VALUE] == current_metadata_location + assert env_ctx.properties[DO_NOT_UPDATE_STATS] == DO_NOT_UPDATE_STATS_DEFAULT + + +def test_do_commit_env_context_excludes_expected_params_when_lock_enabled() -> None: + """When lock_enabled=True (default), alter_table_with_environment_context must NOT include + expected_parameter_key or expected_parameter_value.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL} + catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) + + mock_client = MagicMock() + mock_hive_table = MagicMock() + mock_hive_table.parameters = { + "table_type": "ICEBERG", + "metadata_location": "s3://bucket/db/table/metadata/v1.metadata.json", + } + mock_client.get_table.return_value = mock_hive_table + + with ( + patch.object(catalog, "_convert_hive_into_iceberg") as mock_convert, + patch.object(catalog, "_update_and_stage_table") as mock_stage, + patch.object(catalog, "_write_metadata"), + patch.object(catalog, "_convert_iceberg_into_hive"), + patch("pyiceberg.catalog.hive.CommitTableResponse"), + ): + mock_current_table = MagicMock() + mock_current_table.metadata = MagicMock() + mock_current_table.properties = {} + mock_convert.return_value = mock_current_table + + mock_staged = MagicMock() + mock_staged.metadata = MagicMock() + mock_staged.properties = {} + mock_stage.return_value = mock_staged + + catalog._do_commit( + mock_client, ("default", "my_table"), "default", "my_table", + requirements=(), updates=(), lock_enabled=True, + ) + + mock_client.alter_table_with_environment_context.assert_called_once() + env_ctx = mock_client.alter_table_with_environment_context.call_args[1]["environment_context"] + assert NO_LOCK_EXPECTED_KEY not in env_ctx.properties + assert NO_LOCK_EXPECTED_VALUE not in env_ctx.properties + assert env_ctx.properties[DO_NOT_UPDATE_STATS] == DO_NOT_UPDATE_STATS_DEFAULT From b955e940da5d42d0527cbc2ee9d472c42ded9d11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Mon, 9 Mar 2026 10:47:21 +0100 Subject: [PATCH 6/8] Style fixes --- pyiceberg/catalog/hive.py | 12 +++++++----- pyiceberg/table/__init__.py | 15 +++++---------- tests/catalog/test_hive.py | 36 +++++++++++++++++++++++------------- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index e18f73a0bc..780abdf523 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -133,12 +133,16 @@ DEFAULT_LOCK_CHECK_MIN_WAIT_TIME = 0.1 # 100 milliseconds DEFAULT_LOCK_CHECK_MAX_WAIT_TIME = 60 # 1 min DEFAULT_LOCK_CHECK_RETRIES = 4 + DO_NOT_UPDATE_STATS = "DO_NOT_UPDATE_STATS" DO_NOT_UPDATE_STATS_DEFAULT = "true" NO_LOCK_EXPECTED_KEY = "expected_parameter_key" NO_LOCK_EXPECTED_VALUE = "expected_parameter_value" +HIVE_LOCK_ENABLED = "engine.hive.lock-enabled" +HIVE_LOCK_ENABLED_DEFAULT = True + logger = logging.getLogger(__name__) @@ -509,11 +513,9 @@ def _hive_lock_enabled(table_properties: Properties, catalog_properties: Propert Matches the Java implementation in HiveTableOperations: checks the table property first, then falls back to catalog properties, then defaults to True. """ - if TableProperties.HIVE_LOCK_ENABLED in table_properties: - return property_as_bool( - table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT - ) - return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT) + if HIVE_LOCK_ENABLED in table_properties: + return property_as_bool(table_properties, HIVE_LOCK_ENABLED, HIVE_LOCK_ENABLED_DEFAULT) + return property_as_bool(catalog_properties, HIVE_LOCK_ENABLED, HIVE_LOCK_ENABLED_DEFAULT) def commit_table( self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index eb044ed434..c527f3c44c 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -247,9 +247,6 @@ class TableProperties: MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep" MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1 - HIVE_LOCK_ENABLED = "engine.hive.lock-enabled" - HIVE_LOCK_ENABLED_DEFAULT = True - class Transaction: _table: Table @@ -2005,13 +2002,11 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) + return lambda datafile: residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), ) @staticmethod diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 05689b0415..2984868cf2 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -48,6 +48,7 @@ DO_NOT_UPDATE_STATS_DEFAULT, HIVE_KERBEROS_AUTH, HIVE_KERBEROS_SERVICE_NAME, + HIVE_LOCK_ENABLED, LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, @@ -68,7 +69,6 @@ ) from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema -from pyiceberg.table import TableProperties from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV1, TableMetadataV2 from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( @@ -1420,24 +1420,24 @@ def test_hive_lock_enabled_defaults_to_true() -> None: def test_hive_lock_enabled_table_property_disables_lock() -> None: """Table property engine.hive.lock-enabled=false disables locking.""" - table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + table_props = {HIVE_LOCK_ENABLED: "false"} assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties={}) is False def test_hive_lock_enabled_catalog_property_disables_lock() -> None: """Catalog property engine.hive.lock-enabled=false disables locking when table doesn't set it.""" - catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + catalog_props = {HIVE_LOCK_ENABLED: "false"} assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties=catalog_props) is False def test_hive_lock_enabled_table_property_overrides_catalog() -> None: """Table property takes precedence over catalog property.""" - table_props = {TableProperties.HIVE_LOCK_ENABLED: "true"} - catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + table_props = {HIVE_LOCK_ENABLED: "true"} + catalog_props = {HIVE_LOCK_ENABLED: "false"} assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is True - table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} - catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "true"} + table_props = {HIVE_LOCK_ENABLED: "false"} + catalog_props = {HIVE_LOCK_ENABLED: "true"} assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is False @@ -1449,7 +1449,7 @@ def test_commit_table_skips_locking_when_table_property_disables_it() -> None: mock_table = MagicMock() mock_table.name.return_value = ("default", "my_table") - mock_table.properties = {TableProperties.HIVE_LOCK_ENABLED: "false"} + mock_table.properties = {HIVE_LOCK_ENABLED: "false"} mock_do_commit = MagicMock() mock_do_commit.return_value = MagicMock() @@ -1467,7 +1467,7 @@ def test_commit_table_skips_locking_when_table_property_disables_it() -> None: def test_commit_table_skips_locking_when_catalog_property_disables_it() -> None: """When catalog property engine.hive.lock-enabled=false, commit_table must not lock/unlock.""" - prop = {"uri": HIVE_METASTORE_FAKE_URL, TableProperties.HIVE_LOCK_ENABLED: "false"} + prop = {"uri": HIVE_METASTORE_FAKE_URL, HIVE_LOCK_ENABLED: "false"} catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) catalog._client = MagicMock() @@ -1551,8 +1551,13 @@ def test_do_commit_env_context_includes_expected_params_when_lock_disabled() -> mock_stage.return_value = mock_staged catalog._do_commit( - mock_client, ("default", "my_table"), "default", "my_table", - requirements=(), updates=(), lock_enabled=False, + mock_client, + ("default", "my_table"), + "default", + "my_table", + requirements=(), + updates=(), + lock_enabled=False, ) mock_client.alter_table_with_environment_context.assert_called_once() @@ -1594,8 +1599,13 @@ def test_do_commit_env_context_excludes_expected_params_when_lock_enabled() -> N mock_stage.return_value = mock_staged catalog._do_commit( - mock_client, ("default", "my_table"), "default", "my_table", - requirements=(), updates=(), lock_enabled=True, + mock_client, + ("default", "my_table"), + "default", + "my_table", + requirements=(), + updates=(), + lock_enabled=True, ) mock_client.alter_table_with_environment_context.assert_called_once() From 77121d98ac948be56a2f0421c30950ec30be5437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Mon, 9 Mar 2026 10:51:19 +0100 Subject: [PATCH 7/8] fix --- pyiceberg/table/__init__.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index c527f3c44c..cc0d9ff341 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2002,11 +2002,13 @@ def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], Residu # The lambda created here is run in multiple threads. # So we avoid creating _EvaluatorExpression methods bound to a single # shared instance across multiple threads. - return lambda datafile: residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), + return lambda datafile: ( + residual_evaluator_of( + spec=spec, + expr=self.row_filter, + case_sensitive=self.case_sensitive, + schema=self.table_metadata.schema(), + ) ) @staticmethod From fc0586b4d1a01f044dc4154df4a8a5ac866e32fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jaros=C5=82aw=20Cellary?= Date: Mon, 9 Mar 2026 11:00:32 +0100 Subject: [PATCH 8/8] reformatted --- pyiceberg/catalog/hive.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index 780abdf523..175eb5bd6d 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -551,7 +551,12 @@ def commit_table( raise CommitFailedException(f"Failed to acquire lock for {table_identifier}, state: {lock.state}") return self._do_commit( - open_client, table_identifier, database_name, table_name, requirements, updates, + open_client, + table_identifier, + database_name, + table_name, + requirements, + updates, lock_enabled=True, ) except WaitingForLockException as e: @@ -560,7 +565,12 @@ def commit_table( open_client.unlock(UnlockRequest(lockid=lock.lockid)) else: return self._do_commit( - open_client, table_identifier, database_name, table_name, requirements, updates, + open_client, + table_identifier, + database_name, + table_name, + requirements, + updates, lock_enabled=False, )