From e02f44fa83756eb49f3d37c3bd5f0fba80abf933 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Mon, 20 Apr 2026 12:26:00 -0400 Subject: [PATCH 1/4] feat(ingestion): automate DB initialization and fix lock mechanism --- .../workflow/ingestion-helper/Dockerfile | 39 +++++ .../workflow/ingestion-helper/cloudbuild.yaml | 48 +++++ .../workflow/ingestion-helper/main.py | 5 + .../workflow/ingestion-helper/schema.sql | 110 ++++++++++++ .../ingestion-helper/spanner_client.py | 124 +++++++++++-- .../ingestion-helper/spanner_client_test.py | 164 ++++++++++++++++++ 6 files changed, 473 insertions(+), 17 deletions(-) create mode 100644 import-automation/workflow/ingestion-helper/Dockerfile create mode 100644 import-automation/workflow/ingestion-helper/cloudbuild.yaml create mode 100644 import-automation/workflow/ingestion-helper/schema.sql create mode 100644 import-automation/workflow/ingestion-helper/spanner_client_test.py diff --git a/import-automation/workflow/ingestion-helper/Dockerfile b/import-automation/workflow/ingestion-helper/Dockerfile new file mode 100644 index 0000000000..21c7e6f9c4 --- /dev/null +++ b/import-automation/workflow/ingestion-helper/Dockerfile @@ -0,0 +1,39 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.12-slim + +# Allow statements and log messages to immediately appear in the logs +ENV PYTHONUNBUFFERED True + +WORKDIR /app + +# Install protobuf compiler and curl +RUN apt-get update && apt-get install -y protobuf-compiler curl && rm -rf /var/lib/apt/lists/* + +# Install production dependencies. +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy local code to the container image. +COPY . . + +# Fetch proto file from GitHub +RUN curl -o storage.proto https://raw.githubusercontent.com/datacommonsorg/import/master/pipeline/data/src/main/proto/storage.proto + +# Generate proto descriptor set +RUN protoc --include_imports --descriptor_set_out=storage.pb storage.proto + +# Run the functions framework +CMD ["functions-framework", "--target", "ingestion_helper"] diff --git a/import-automation/workflow/ingestion-helper/cloudbuild.yaml b/import-automation/workflow/ingestion-helper/cloudbuild.yaml new file mode 100644 index 0000000000..f68759d10b --- /dev/null +++ b/import-automation/workflow/ingestion-helper/cloudbuild.yaml @@ -0,0 +1,48 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +steps: + # Build the container image + - name: 'gcr.io/cloud-builders/docker' + args: ['build', '-t', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG', '-t', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:latest', '.'] + + # Push the container image - Needed before deployment + - name: 'gcr.io/cloud-builders/docker' + args: ['push', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG'] + + # Deploy container image to Cloud Run (Conditional) + - name: 'gcr.io/google.com/cloudsdktool/cloud-sdk' + entrypoint: 'bash' + args: + - '-c' + - | + if [ "$_DEPLOY" = "true" ]; then + gcloud run deploy '$_SERVICE_NAME' \ + --image '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG' \ + --region '$_AR_REGION' \ + --project '$PROJECT_ID' + else + echo "Skipping deployment as _DEPLOY is not set to true" + fi + +substitutions: + _AR_REGION: 'us' + _AR_REPO: 'gcr.io' + _SERVICE_NAME: 'spanner-ingestion-helper' + _DEPLOY: 'false' + _TAG: 'latest' + +images: + - '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG' + - '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:latest' diff --git a/import-automation/workflow/ingestion-helper/main.py b/import-automation/workflow/ingestion-helper/main.py index 923eefbca7..47ba1717f8 100644 --- a/import-automation/workflow/ingestion-helper/main.py +++ b/import-automation/workflow/ingestion-helper/main.py @@ -193,5 +193,10 @@ def ingestion_helper(request): f"OK [Import: {import_name} Version: {version} Status: {params['status']}]", 200) + elif actionType == 'initialize_database': + # Initializes the database by creating all required tables and proto bundles. + logging.info("Action: initialize_database") + spanner.initialize_database() + return ('OK', 200) else: return (f'Unknown actionType: {actionType}', 400) diff --git a/import-automation/workflow/ingestion-helper/schema.sql b/import-automation/workflow/ingestion-helper/schema.sql new file mode 100644 index 0000000000..e8c8d7978a --- /dev/null +++ b/import-automation/workflow/ingestion-helper/schema.sql @@ -0,0 +1,110 @@ +-- Copyright 2026 Google LLC +-- +-- Licensed under the Apache License, Version 2.0 (the "License") +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. + +CREATE PROTO BUNDLE ( + `org.datacommons.Observations` +); + +CREATE TABLE Node ( + subject_id STRING(1024) NOT NULL, + value STRING(MAX), + bytes BYTES(MAX), + name STRING(MAX), + types ARRAY, + name_tokenlist TOKENLIST AS (TOKENIZE_FULLTEXT(name)) HIDDEN, +) PRIMARY KEY(subject_id); + +CREATE TABLE Edge ( + subject_id STRING(1024) NOT NULL, + predicate STRING(1024) NOT NULL, + object_id STRING(1024) NOT NULL, + provenance STRING(1024) NOT NULL, +) PRIMARY KEY(subject_id, predicate, object_id, provenance), +INTERLEAVE IN Node; + +CREATE TABLE Observation ( + observation_about STRING(1024) NOT NULL, + variable_measured STRING(1024) NOT NULL, + facet_id STRING(1024) NOT NULL, + observation_period STRING(1024), + measurement_method STRING(1024), + unit STRING(1024), + scaling_factor STRING(1024), + observations org.datacommons.Observations, + import_name STRING(1024), + provenance_url STRING(1024), + is_dc_aggregate BOOL, +) PRIMARY KEY(observation_about, variable_measured, facet_id); + +CREATE TABLE ImportStatus ( + ImportName STRING(MAX) NOT NULL, + LatestVersion STRING(MAX), + GraphPath STRING(MAX), + State STRING(1024) NOT NULL, + JobId STRING(1024), + WorkflowId STRING(1024), + ExecutionTime INT64, + DataVolume INT64, + DataImportTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), + StatusUpdateTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), + NextRefreshTimestamp TIMESTAMP, +) PRIMARY KEY(ImportName); + +CREATE TABLE IngestionHistory ( + CompletionTimestamp TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = TRUE ), + IngestionFailure Bool NOT NULL, + WorkflowExecutionID STRING(1024) NOT NULL, + DataflowJobID STRING(1024), + IngestedImports ARRAY, + ExecutionTime INT64, + NodeCount INT64, + EdgeCount INT64, + ObservationCount INT64, +) PRIMARY KEY(CompletionTimestamp DESC); + +CREATE TABLE ImportVersionHistory ( + ImportName STRING(MAX) NOT NULL, + Version STRING(MAX) NOT NULL, + UpdateTimestamp TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true), + Comment STRING(MAX), +) PRIMARY KEY (ImportName, UpdateTimestamp DESC); + +CREATE TABLE IngestionLock ( + LockID STRING(1024) NOT NULL, + LockOwner STRING(1024), + AcquiredTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ), +) PRIMARY KEY(LockID); + +CREATE PROPERTY GRAPH DCGraph + NODE TABLES( + Node + KEY(subject_id) + LABEL Node PROPERTIES( + bytes, + name, + subject_id, + types, + value) + ) + EDGE TABLES( + Edge + KEY(subject_id, predicate, object_id, provenance) + SOURCE KEY(subject_id) REFERENCES Node(subject_id) + DESTINATION KEY(object_id) REFERENCES Node(subject_id) + LABEL Edge PROPERTIES( + object_id, + predicate, + provenance, + subject_id) + ); diff --git a/import-automation/workflow/ingestion-helper/spanner_client.py b/import-automation/workflow/ingestion-helper/spanner_client.py index b97998fae6..3ca179d2de 100644 --- a/import-automation/workflow/ingestion-helper/spanner_client.py +++ b/import-automation/workflow/ingestion-helper/spanner_client.py @@ -15,6 +15,8 @@ import logging import os from google.cloud import spanner +from google.cloud.spanner_admin_database_v1 import DatabaseAdminClient +from google.cloud.spanner_admin_database_v1.types import UpdateDatabaseDdlRequest from google.cloud.spanner_v1 import Transaction from google.cloud.spanner_v1.param_types import STRING, TIMESTAMP, Array, INT64 from datetime import datetime, timezone @@ -57,13 +59,16 @@ def _acquire(transaction: Transaction) -> bool: params = {"lockId": self._LOCK_ID} param_types = {"lockId": STRING} - current_owner, acquired_at = None, None + row_found = False results = transaction.execute_sql(sql, params, param_types) for row in results: + row_found = True current_owner, acquired_at = row[0], row[1] lock_is_available = False - if current_owner is None: + if not row_found: + lock_is_available = True + elif current_owner is None: lock_is_available = True else: timeout_threshold = datetime.now(timezone.utc) - acquired_at @@ -72,22 +77,43 @@ def _acquire(transaction: Transaction) -> bool: f"Stale lock found, owned by {current_owner}. Acquiring." ) lock_is_available = True + if lock_is_available: - update_sql = """ - UPDATE IngestionLock - SET LockOwner = @workflowId, AcquiredTimestamp = PENDING_COMMIT_TIMESTAMP() - WHERE LockID = @lockId - """ - transaction.execute_update(update_sql, - params={ - "workflowId": workflow_id, - "lockId": self._LOCK_ID - }, - param_types={ - "workflowId": STRING, - "lockId": STRING - }) - logging.info(f"Lock successfully acquired by {workflow_id}") + if not row_found: + insert_sql = """ + INSERT INTO IngestionLock (LockID, LockOwner, AcquiredTimestamp) + VALUES (@lockId, @workflowId, PENDING_COMMIT_TIMESTAMP()) + """ + transaction.execute_update(insert_sql, + params={ + "workflowId": workflow_id, + "lockId": self._LOCK_ID + }, + param_types={ + "workflowId": STRING, + "lockId": STRING + }) + logging.info( + f"Lock successfully acquired by {workflow_id} (new row created)" + ) + else: + update_sql = """ + UPDATE IngestionLock + SET LockOwner = @workflowId, AcquiredTimestamp = PENDING_COMMIT_TIMESTAMP() + WHERE LockID = @lockId + """ + transaction.execute_update(update_sql, + params={ + "workflowId": workflow_id, + "lockId": self._LOCK_ID + }, + param_types={ + "workflowId": STRING, + "lockId": STRING + }) + logging.info( + f"Lock successfully acquired by {workflow_id} (existing row updated)" + ) return True else: logging.info(f"Lock is currently held by {current_owner}") @@ -383,3 +409,67 @@ def _record(transaction: Transaction): logging.error( f'Error updating version history for {import_name}: {e}') raise + + def initialize_database(self): + """Initializes the database by creating all required tables and proto bundles.""" + logging.info("Initializing database...") + + with self.database.snapshot() as snapshot: + results = snapshot.execute_sql( + "SELECT table_name FROM information_schema.tables WHERE table_schema = ''" + ) + existing_tables = [row[0] for row in results] + + logging.info(f"Existing tables: {existing_tables}") + + required_tables = ["Node", "Edge", "Observation", "ImportStatus", "IngestionHistory", "ImportVersionHistory", "IngestionLock"] + + missing_tables = [t for t in required_tables if t not in existing_tables] + + if not missing_tables: + logging.info("All tables already exist.") + return + + if len(missing_tables) < len(required_tables): + raise RuntimeError( + f"Database inconsistent state. Missing tables: {missing_tables}. Please clean up manually." + ) + + logging.info("Creating all tables and proto bundles...") + + schema_path = os.path.join(os.path.dirname(__file__), 'schema.sql') + logging.info(f"Reading schema from {schema_path}") + try: + with open(schema_path, 'r') as f: + schema_content = f.read() + ddl_statements = [s.strip() for s in schema_content.split(';') if s.strip()] + except Exception as e: + logging.error(f"Failed to read schema file: {e}") + raise + + proto_path = os.path.join(os.path.dirname(__file__), 'storage.pb') + logging.info(f"Reading proto descriptors from {proto_path}") + try: + with open(proto_path, 'rb') as f: + proto_descriptors_bytes = f.read() + except Exception as e: + logging.error(f"Failed to read proto descriptors file: {e}") + raise + + database_path = self.database.name + logging.info(f"Updating DDL for {database_path} with protos") + + try: + admin_client = DatabaseAdminClient() + request = UpdateDatabaseDdlRequest( + database=database_path, + statements=ddl_statements, + proto_descriptors=proto_descriptors_bytes + ) + operation = admin_client.update_database_ddl(request=request) + operation.result() + logging.info("Database initialized successfully with protos.") + except Exception as e: + logging.error(f"Failed to update DDL with protos: {e}") + raise + diff --git a/import-automation/workflow/ingestion-helper/spanner_client_test.py b/import-automation/workflow/ingestion-helper/spanner_client_test.py new file mode 100644 index 0000000000..ac21a7418e --- /dev/null +++ b/import-automation/workflow/ingestion-helper/spanner_client_test.py @@ -0,0 +1,164 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from unittest.mock import MagicMock, patch +import sys +import os + +# Add the current directory to path so we can import spanner_client +sys.path.append(os.path.dirname(__file__)) +from spanner_client import SpannerClient + +class TestSpannerClient(unittest.TestCase): + + @patch('google.cloud.spanner.Client') + def test_initialize_database_all_exist(self, mock_spanner_client): + # Setup mock + mock_instance = MagicMock() + mock_db = MagicMock() + mock_spanner_client.return_value.instance.return_value = mock_instance + mock_instance.database.return_value = mock_db + + # Mock snapshot results (all tables exist) + mock_snapshot = MagicMock() + mock_db.snapshot.return_value.__enter__.return_value = mock_snapshot + mock_snapshot.execute_sql.return_value = [ + ["Node"], ["Edge"], ["Observation"], ["ImportStatus"], + ["IngestionHistory"], ["ImportVersionHistory"], ["IngestionLock"] + ] + + client = SpannerClient("project", "instance", "database") + + # Run method + client.initialize_database() + + # Verify update_ddl was NOT called + mock_db.update_ddl.assert_not_called() + + @patch('spanner_client.DatabaseAdminClient') + @patch('google.cloud.spanner.Client') + def test_initialize_database_none_exist(self, mock_spanner_client, + mock_admin_client): + # Setup mock + mock_instance = MagicMock() + mock_db = MagicMock() + mock_db.name = "projects/test-project/instances/test-instance/databases/test-db" + mock_spanner_client.return_value.instance.return_value = mock_instance + mock_instance.database.return_value = mock_db + + # Mock DatabaseAdminClient + mock_admin_instance = MagicMock() + mock_admin_client.return_value = mock_admin_instance + mock_operation = MagicMock() + mock_admin_instance.update_database_ddl.return_value = mock_operation + + # Mock snapshot results (no tables exist) + mock_snapshot = MagicMock() + mock_db.snapshot.return_value.__enter__.return_value = mock_snapshot + mock_snapshot.execute_sql.return_value = [] + + client = SpannerClient("project", "instance", "database") + + def open_side_effect(file_path, mode='r', *args, **kwargs): + m = MagicMock() + if 'storage.pb' in str(file_path): + m.__enter__.return_value.read.return_value = b'dummy proto data' + else: + m.__enter__.return_value.read.return_value = 'CREATE TABLE Node;' + return m + + # Run method with patched open + with patch('builtins.open', side_effect=open_side_effect): + client.initialize_database() + + # Verify update_database_ddl WAS called + mock_admin_instance.update_database_ddl.assert_called_once() + mock_operation.result.assert_called_once() + + @patch('google.cloud.spanner.Client') + def test_initialize_database_inconsistent_state(self, mock_spanner_client): + # Setup mock + mock_instance = MagicMock() + mock_db = MagicMock() + mock_spanner_client.return_value.instance.return_value = mock_instance + mock_instance.database.return_value = mock_db + + # Mock snapshot results (some tables exist) + mock_snapshot = MagicMock() + mock_db.snapshot.return_value.__enter__.return_value = mock_snapshot + mock_snapshot.execute_sql.return_value = [["Node"]] + + client = SpannerClient("project", "instance", "database") + + # Run method and expect exception + with self.assertRaises(RuntimeError): + client.initialize_database() + + @patch('google.cloud.spanner.Client') + def test_acquire_lock_new_row(self, mock_spanner_client): + # Setup mock + mock_instance = MagicMock() + mock_db = MagicMock() + mock_spanner_client.return_value.instance.return_value = mock_instance + mock_instance.database.return_value = mock_db + + mock_transaction = MagicMock() + def run_in_transaction_side_effect(callback, *args, **kwargs): + return callback(mock_transaction, *args, **kwargs) + mock_db.run_in_transaction.side_effect = run_in_transaction_side_effect + + # Mock execute_sql to return empty results (no row found) + mock_transaction.execute_sql.return_value = [] + + client = SpannerClient("project", "instance", "database") + + # Run method + result = client.acquire_lock("workflow-123", 3600) + + # Verify + self.assertTrue(result) + mock_transaction.execute_update.assert_called_once() + args, _ = mock_transaction.execute_update.call_args + self.assertIn("INSERT INTO IngestionLock", args[0]) + + @patch('google.cloud.spanner.Client') + def test_acquire_lock_existing_row(self, mock_spanner_client): + # Setup mock + mock_instance = MagicMock() + mock_db = MagicMock() + mock_spanner_client.return_value.instance.return_value = mock_instance + mock_instance.database.return_value = mock_db + + mock_transaction = MagicMock() + def run_in_transaction_side_effect(callback, *args, **kwargs): + return callback(mock_transaction, *args, **kwargs) + mock_db.run_in_transaction.side_effect = run_in_transaction_side_effect + + # Mock execute_sql to return existing lock (owner is None) + mock_transaction.execute_sql.return_value = [[None, None]] + + client = SpannerClient("project", "instance", "database") + + # Run method + result = client.acquire_lock("workflow-123", 3600) + + # Verify + self.assertTrue(result) + mock_transaction.execute_update.assert_called_once() + args, _ = mock_transaction.execute_update.call_args + self.assertIn("UPDATE IngestionLock", args[0]) + +if __name__ == '__main__': + unittest.main() From 92af9b16c972f418e84385ba5ac7c6cf7153e989 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Mon, 20 Apr 2026 12:37:38 -0400 Subject: [PATCH 2/4] Remove redundancy --- .../ingestion-helper/spanner_client.py | 41 +++++++------------ 1 file changed, 15 insertions(+), 26 deletions(-) diff --git a/import-automation/workflow/ingestion-helper/spanner_client.py b/import-automation/workflow/ingestion-helper/spanner_client.py index 3ca179d2de..bd879a37ad 100644 --- a/import-automation/workflow/ingestion-helper/spanner_client.py +++ b/import-automation/workflow/ingestion-helper/spanner_client.py @@ -80,40 +80,29 @@ def _acquire(transaction: Transaction) -> bool: if lock_is_available: if not row_found: - insert_sql = """ + sql_statement = """ INSERT INTO IngestionLock (LockID, LockOwner, AcquiredTimestamp) VALUES (@lockId, @workflowId, PENDING_COMMIT_TIMESTAMP()) """ - transaction.execute_update(insert_sql, - params={ - "workflowId": workflow_id, - "lockId": self._LOCK_ID - }, - param_types={ - "workflowId": STRING, - "lockId": STRING - }) - logging.info( - f"Lock successfully acquired by {workflow_id} (new row created)" - ) + log_msg = f"Lock successfully acquired by {workflow_id} (new row created)" else: - update_sql = """ + sql_statement = """ UPDATE IngestionLock SET LockOwner = @workflowId, AcquiredTimestamp = PENDING_COMMIT_TIMESTAMP() WHERE LockID = @lockId """ - transaction.execute_update(update_sql, - params={ - "workflowId": workflow_id, - "lockId": self._LOCK_ID - }, - param_types={ - "workflowId": STRING, - "lockId": STRING - }) - logging.info( - f"Lock successfully acquired by {workflow_id} (existing row updated)" - ) + log_msg = f"Lock successfully acquired by {workflow_id} (existing row updated)" + + transaction.execute_update(sql_statement, + params={ + "workflowId": workflow_id, + "lockId": self._LOCK_ID + }, + param_types={ + "workflowId": STRING, + "lockId": STRING + }) + logging.info(log_msg) return True else: logging.info(f"Lock is currently held by {current_owner}") From eb9d02e10019ceaf639e3ce48de5b6e3465e97d3 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Tue, 21 Apr 2026 08:34:46 -0400 Subject: [PATCH 3/4] Adds readme --- import-automation/workflow/ingestion-helper/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/import-automation/workflow/ingestion-helper/README.md b/import-automation/workflow/ingestion-helper/README.md index 3e9b1eec5a..b38e9e3d64 100644 --- a/import-automation/workflow/ingestion-helper/README.md +++ b/import-automation/workflow/ingestion-helper/README.md @@ -56,3 +56,8 @@ Updates the version of an import, records version history, and updates the statu * `version` (Required): The version string. If set to `'STAGING'`, it resolves to the current staging version. * `comment` (Required): A comment for the audit log explaining the version update. * `override` (Optional): Override version without checking import status (boolean) + +#### `initialize_database` +Initializes the Spanner database by creating all necessary tables and uploading proto descriptors. + +* This action requires no payload parameters. It automatically reads `schema.sql` and `storage.pb` from the container directory to provision the database schema and proto descriptors. From 84b2cafe1b133ff1ee0f858b4103baf1f3933240 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Wed, 22 Apr 2026 11:04:19 -0400 Subject: [PATCH 4/4] Comments --- .../workflow/ingestion-helper/Dockerfile | 5 ++++- .../workflow/ingestion-helper/README.md | 21 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/import-automation/workflow/ingestion-helper/Dockerfile b/import-automation/workflow/ingestion-helper/Dockerfile index 21c7e6f9c4..c2c1ccf396 100644 --- a/import-automation/workflow/ingestion-helper/Dockerfile +++ b/import-automation/workflow/ingestion-helper/Dockerfile @@ -14,6 +14,9 @@ FROM python:3.12-slim +# Copy uv binary +COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ + # Allow statements and log messages to immediately appear in the logs ENV PYTHONUNBUFFERED True @@ -24,7 +27,7 @@ RUN apt-get update && apt-get install -y protobuf-compiler curl && rm -rf /var/l # Install production dependencies. COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt +RUN uv pip install --system --no-cache -r requirements.txt # Copy local code to the container image. COPY . . diff --git a/import-automation/workflow/ingestion-helper/README.md b/import-automation/workflow/ingestion-helper/README.md index b38e9e3d64..33857f206b 100644 --- a/import-automation/workflow/ingestion-helper/README.md +++ b/import-automation/workflow/ingestion-helper/README.md @@ -61,3 +61,24 @@ Updates the version of an import, records version history, and updates the statu Initializes the Spanner database by creating all necessary tables and uploading proto descriptors. * This action requires no payload parameters. It automatically reads `schema.sql` and `storage.pb` from the container directory to provision the database schema and proto descriptors. +* **Note on Protos**: The `storage.pb` file is generated during the Docker build process. The `Dockerfile` fetches `storage.proto` from the `datacommonsorg/import` GitHub repository and compiles it into `storage.pb`. + +## Local Development and Testing + +To run the helper service locally and test its functionality: + +### Running the Server +Ensure you have installed the requirements (`uv pip install -r requirements.txt`), then start the functions framework: + +```bash +uv run functions-framework --target ingestion_helper +``` +By default, this will start serving on `http://localhost:8080`. + +### Triggering Actions +You can test specific actions by sending a POST request with a JSON payload. For example, to trigger database initialization locally: +```bash +curl -X POST http://localhost:8080 \ + -H "Content-Type: application/json" \ + -d '{"actionType": "initialize_database"}' +```