-
Notifications
You must be signed in to change notification settings - Fork 453
fix(glue): Support create_table for S3 Tables federated databases #3058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
jamesbornholt
wants to merge
5
commits into
apache:main
Choose a base branch
from
jamesbornholt:fix-glue-create-table-s3tables
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
9708909
fix(glue): Support create_table for S3 Tables federated databases
jamesbornholt 4f9299e
test(glue): Add unit tests for S3 Tables create_table support
jamesbornholt de9c1b2
fix(glue): Extract S3 Tables connection type constant and simplify _c…
jamesbornholt 355f242
fix(glue): Return Table directly from create_table instead of re-loading
jamesbornholt 420da60
fix(glue): Simplify S3 Tables connection type check in _is_s3tables_d…
jamesbornholt File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| # under the License. | ||
|
|
||
|
|
||
| import logging | ||
| from typing import ( | ||
| TYPE_CHECKING, | ||
| Any, | ||
|
|
@@ -120,13 +121,16 @@ | |
| ICEBERG_FIELD_OPTIONAL = "iceberg.field.optional" | ||
| ICEBERG_FIELD_CURRENT = "iceberg.field.current" | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| GLUE_PROFILE_NAME = "glue.profile-name" | ||
| GLUE_REGION = "glue.region" | ||
| GLUE_ACCESS_KEY_ID = "glue.access-key-id" | ||
| GLUE_SECRET_ACCESS_KEY = "glue.secret-access-key" | ||
| GLUE_SESSION_TOKEN = "glue.session-token" | ||
| GLUE_MAX_RETRIES = "glue.max-retries" | ||
| GLUE_RETRY_MODE = "glue.retry-mode" | ||
| GLUE_CONNECTION_S3_TABLES = "aws:s3tables" | ||
|
|
||
| MAX_RETRIES = 10 | ||
| STANDARD_RETRY_MODE = "standard" | ||
|
|
@@ -417,6 +421,116 @@ def _get_glue_table(self, database_name: str, table_name: str) -> "TableTypeDef" | |
| except self.glue.exceptions.EntityNotFoundException as e: | ||
| raise NoSuchTableError(f"Table does not exist: {database_name}.{table_name}") from e | ||
|
|
||
| def _is_s3tables_database(self, database_name: str) -> bool: | ||
| """Check if a Glue database is federated with S3 Tables. | ||
|
|
||
| S3 Tables databases have a FederatedDatabase property with | ||
| ConnectionType set to aws:s3tables. | ||
|
|
||
| Args: | ||
| database_name: The name of the Glue database. | ||
|
|
||
| Returns: | ||
| True if the database is an S3 Tables federated database. | ||
| """ | ||
| try: | ||
| database_response = self.glue.get_database(Name=database_name) | ||
| except self.glue.exceptions.EntityNotFoundException: | ||
| return False | ||
| database = database_response["Database"] | ||
| federated = database.get("FederatedDatabase", {}) | ||
| return federated.get("ConnectionType", "") == GLUE_CONNECTION_S3_TABLES | ||
|
|
||
| def _create_table_s3tables( | ||
| self, | ||
| identifier: str | Identifier, | ||
| schema: Union[Schema, "pa.Schema"], | ||
| location: str | None, | ||
| partition_spec: PartitionSpec, | ||
| sort_order: SortOrder, | ||
| properties: Properties, | ||
| ) -> Table: | ||
| """Create an Iceberg table in an S3 Tables federated database. | ||
|
|
||
| S3 Tables manages storage internally, so the table location is not known until the | ||
| table is created in the service. This method: | ||
| 1. Creates a minimal table entry in Glue (format=ICEBERG), which causes S3 Tables | ||
| to allocate storage. | ||
| 2. Retrieves the managed storage location via GetTable. | ||
| 3. Writes Iceberg metadata to that location. | ||
| 4. Updates the Glue table entry with the metadata pointer. | ||
|
|
||
| On failure, the table created in step 1 is deleted. | ||
| """ | ||
| database_name, table_name = self.identifier_to_database_and_table(identifier) | ||
|
|
||
| if location is not None: | ||
| raise ValueError( | ||
| f"Cannot specify a location for S3 Tables table {database_name}.{table_name}. " | ||
| "S3 Tables manages the storage location automatically." | ||
| ) | ||
|
|
||
| # Create a minimal table in Glue so S3 Tables allocates storage. | ||
| self._create_glue_table( | ||
| database_name=database_name, | ||
| table_name=table_name, | ||
| table_input={ | ||
| "Name": table_name, | ||
| "Parameters": {"format": "ICEBERG"}, | ||
| }, | ||
| ) | ||
|
|
||
| try: | ||
| # Retrieve the managed storage location. | ||
| glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) | ||
| storage_descriptor = glue_table.get("StorageDescriptor", {}) | ||
| managed_location = storage_descriptor.get("Location") | ||
| if not managed_location: | ||
| raise ValueError(f"S3 Tables did not assign a storage location for {database_name}.{table_name}") | ||
|
|
||
| # Build the Iceberg metadata targeting the managed location. | ||
| staged_table = self._create_staged_table( | ||
| identifier=identifier, | ||
| schema=schema, | ||
| location=managed_location, | ||
| partition_spec=partition_spec, | ||
| sort_order=sort_order, | ||
| properties=properties, | ||
| ) | ||
|
|
||
| # Write metadata and update the Glue table with the metadata pointer. | ||
| self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. S3 Tables managed storage doesn't support ListObjectsV2. This path will add an exist check before writing we should be able to skip that check. |
||
| table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata) | ||
| version_id = glue_table.get("VersionId") | ||
| if not version_id: | ||
| raise CommitFailedException( | ||
| f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" | ||
| ) | ||
| self._update_glue_table( | ||
| database_name=database_name, | ||
| table_name=table_name, | ||
| table_input=table_input, | ||
| version_id=version_id, | ||
| ) | ||
| except Exception: | ||
| # Clean up the table created in step 1. | ||
| try: | ||
| self.glue.delete_table(DatabaseName=database_name, Name=table_name) | ||
jamesbornholt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| except Exception: | ||
| logger.warning( | ||
| f"Failed to clean up S3 Tables table {database_name}.{table_name}", | ||
| exc_info=logger.isEnabledFor(logging.DEBUG), | ||
| ) | ||
| raise | ||
|
|
||
| return Table( | ||
| identifier=self.identifier_to_tuple(identifier), | ||
| metadata=staged_table.metadata, | ||
| metadata_location=staged_table.metadata_location, | ||
| io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location), | ||
| catalog=self, | ||
| ) | ||
|
|
||
| def create_table( | ||
| self, | ||
| identifier: str | Identifier, | ||
|
|
@@ -433,6 +547,7 @@ def create_table( | |
| identifier: Table identifier. | ||
| schema: Table's schema. | ||
| location: Location for the table. Optional Argument. | ||
| Must not be set for S3 Tables, which manage their own storage. | ||
| partition_spec: PartitionSpec for the table. | ||
| sort_order: SortOrder for the table. | ||
| properties: Table properties that can be a string based dictionary. | ||
|
|
@@ -442,9 +557,22 @@ def create_table( | |
|
|
||
| Raises: | ||
| AlreadyExistsError: If a table with the name already exists. | ||
| ValueError: If the identifier is invalid, or no path is given to store metadata. | ||
| ValueError: If the identifier is invalid, no path is given to store metadata, | ||
| or a location is specified for an S3 Tables table. | ||
|
|
||
| """ | ||
| database_name, table_name = self.identifier_to_database_and_table(identifier) | ||
|
|
||
| if self._is_s3tables_database(database_name): | ||
| return self._create_table_s3tables( | ||
| identifier=identifier, | ||
| schema=schema, | ||
| location=location, | ||
| partition_spec=partition_spec, | ||
| sort_order=sort_order, | ||
| properties=properties, | ||
| ) | ||
|
|
||
| staged_table = self._create_staged_table( | ||
| identifier=identifier, | ||
| schema=schema, | ||
|
|
@@ -453,13 +581,18 @@ def create_table( | |
| sort_order=sort_order, | ||
| properties=properties, | ||
| ) | ||
| database_name, table_name = self.identifier_to_database_and_table(identifier) | ||
|
|
||
| self._write_metadata(staged_table.metadata, staged_table.io, staged_table.metadata_location) | ||
| table_input = _construct_table_input(table_name, staged_table.metadata_location, properties, staged_table.metadata) | ||
| self._create_glue_table(database_name=database_name, table_name=table_name, table_input=table_input) | ||
|
|
||
| return self.load_table(identifier=identifier) | ||
| return Table( | ||
| identifier=self.identifier_to_tuple(identifier), | ||
| metadata=staged_table.metadata, | ||
| metadata_location=staged_table.metadata_location, | ||
| io=self._load_file_io(staged_table.metadata.properties, staged_table.metadata_location), | ||
| catalog=self, | ||
| ) | ||
|
|
||
| def register_table(self, identifier: str | Identifier, metadata_location: str) -> Table: | ||
| """Register a new table using existing metadata. | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we have some redundancy with these params passed in:
ident,db_name,tbl_namemaybe we can derive?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, done