Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions import-automation/workflow/ingestion-helper/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM python:3.12-slim

# Copy uv binary
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

# Allow statements and log messages to immediately appear in the logs
ENV PYTHONUNBUFFERED True

WORKDIR /app

# Install protobuf compiler and curl
RUN apt-get update && apt-get install -y protobuf-compiler curl && rm -rf /var/lib/apt/lists/*

# Install production dependencies.
COPY requirements.txt .
RUN uv pip install --system --no-cache -r requirements.txt

# Copy local code to the container image.
COPY . .

# Fetch proto file from GitHub
RUN curl -o storage.proto https://raw.githubusercontent.com/datacommonsorg/import/master/pipeline/data/src/main/proto/storage.proto

# Generate proto descriptor set
RUN protoc --include_imports --descriptor_set_out=storage.pb storage.proto

# Run the functions framework
CMD ["functions-framework", "--target", "ingestion_helper"]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the gcp functions-framework utility is the right tool to use here for hosting a rest service.

Is the plan to migrate all of these functions to cloud run services?

If so- i think it makes sense to put them in the DCP service as a new endpoint:

https://github.com/datacommonsorg/datacommons/tree/main/packages/datacommons-api/datacommons_api/

This way we wont need a new image or ci/cd pipeline for building the import automation docker image.

Also- moving from functions-framework to fastapi gives us better logging, request & response validations with Pydantic objects, and puts these APIs in a single place.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that functions-framework is a bit limited and odd in this case, but this is essentially the minimum work to migrate the existing cloud run function to a cloud run service executing the cloud run function. It allows dockerization for DCP, without bloating Base DC infra/

I think there needs to be much more discussion before moving this to DCP API, it certainly has some pros but also some major drawbacks such as moving away ingestion code used across base DC in this new DCP repo. It might be premature to move to DCP service, especially as DCP service is out of scope forM1.

If we leave it like this M1 ships 4 artifacts:

  • MCP PyPi package
  • Mixer image
  • ingestion helper image
  • dataflow template
    all connected through the Terraform deployment. If we move ingestion helper to DCP service, then we just need to maintain the DCP service image. Agreed functions framework is limited and not ideal, I think that migration can happen at any point in the future. There is no need to do it now, and there would be no complexity in migrating it to within DCP service when we launch DCP service (in q3/q4).
    Especially as ingestion helper is required for base DC, I dont think it makes sense for us to take this on now and we would have to deploy DCP service to Base DC for ingestion helper support.

Happy to hop on a call to discuss furhter!

26 changes: 26 additions & 0 deletions import-automation/workflow/ingestion-helper/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,29 @@ Updates the version of an import, records version history, and updates the statu
* `version` (Required): The version string. If set to `'STAGING'`, it resolves to the current staging version.
* `comment` (Required): A comment for the audit log explaining the version update.
* `override` (Optional): Override version without checking import status (boolean)

#### `initialize_database`
Initializes the Spanner database by creating all necessary tables and uploading proto descriptors.

* This action requires no payload parameters. It automatically reads `schema.sql` and `storage.pb` from the container directory to provision the database schema and proto descriptors.
Comment thread
dwnoble marked this conversation as resolved.
* **Note on Protos**: The `storage.pb` file is generated during the Docker build process. The `Dockerfile` fetches `storage.proto` from the `datacommonsorg/import` GitHub repository and compiles it into `storage.pb`.

## Local Development and Testing

To run the helper service locally and test its functionality:

### Running the Server
Ensure you have installed the requirements (`uv pip install -r requirements.txt`), then start the functions framework:

```bash
uv run functions-framework --target ingestion_helper
```
By default, this will start serving on `http://localhost:8080`.

### Triggering Actions
You can test specific actions by sending a POST request with a JSON payload. For example, to trigger database initialization locally:
```bash
curl -X POST http://localhost:8080 \
-H "Content-Type: application/json" \
-d '{"actionType": "initialize_database"}'
```
48 changes: 48 additions & 0 deletions import-automation/workflow/ingestion-helper/cloudbuild.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

steps:
# Build the container image
- name: 'gcr.io/cloud-builders/docker'
args: ['build', '-t', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG', '-t', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:latest', '.']

# Push the container image - Needed before deployment
- name: 'gcr.io/cloud-builders/docker'
args: ['push', '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG']

# Deploy container image to Cloud Run (Conditional)
- name: 'gcr.io/google.com/cloudsdktool/cloud-sdk'
entrypoint: 'bash'
args:
- '-c'
- |
if [ "$_DEPLOY" = "true" ]; then
gcloud run deploy '$_SERVICE_NAME' \
--image '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG' \
--region '$_AR_REGION' \
--project '$PROJECT_ID'
else
echo "Skipping deployment as _DEPLOY is not set to true"
fi

substitutions:
_AR_REGION: 'us'
_AR_REPO: 'gcr.io'
_SERVICE_NAME: 'spanner-ingestion-helper'
_DEPLOY: 'false'
_TAG: 'latest'

images:
- '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:$_TAG'
- '$_AR_REGION-docker.pkg.dev/datcom-ci/$_AR_REPO/datacommons-ingestion-helper:latest'
5 changes: 5 additions & 0 deletions import-automation/workflow/ingestion-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,5 +193,10 @@ def ingestion_helper(request):
f"OK [Import: {import_name} Version: {version} Status: {params['status']}]",
200)

elif actionType == 'initialize_database':
Comment thread
gmechali marked this conversation as resolved.
# Initializes the database by creating all required tables and proto bundles.
logging.info("Action: initialize_database")
spanner.initialize_database()
return ('OK', 200)
else:
return (f'Unknown actionType: {actionType}', 400)
110 changes: 110 additions & 0 deletions import-automation/workflow/ingestion-helper/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
-- Copyright 2026 Google LLC
Comment thread
gmechali marked this conversation as resolved.
--
-- Licensed under the Apache License, Version 2.0 (the "License")
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.

CREATE PROTO BUNDLE (
`org.datacommons.Observations`
);

CREATE TABLE Node (
Comment thread
gmechali marked this conversation as resolved.
subject_id STRING(1024) NOT NULL,
value STRING(MAX),
bytes BYTES(MAX),
name STRING(MAX),
types ARRAY<STRING(1024)>,
name_tokenlist TOKENLIST AS (TOKENIZE_FULLTEXT(name)) HIDDEN,
) PRIMARY KEY(subject_id);

CREATE TABLE Edge (
subject_id STRING(1024) NOT NULL,
predicate STRING(1024) NOT NULL,
object_id STRING(1024) NOT NULL,
provenance STRING(1024) NOT NULL,
) PRIMARY KEY(subject_id, predicate, object_id, provenance),
INTERLEAVE IN Node;

CREATE TABLE Observation (
observation_about STRING(1024) NOT NULL,
variable_measured STRING(1024) NOT NULL,
facet_id STRING(1024) NOT NULL,
observation_period STRING(1024),
measurement_method STRING(1024),
unit STRING(1024),
scaling_factor STRING(1024),
observations org.datacommons.Observations,
import_name STRING(1024),
provenance_url STRING(1024),
is_dc_aggregate BOOL,
) PRIMARY KEY(observation_about, variable_measured, facet_id);

CREATE TABLE ImportStatus (
ImportName STRING(MAX) NOT NULL,
LatestVersion STRING(MAX),
GraphPath STRING(MAX),
State STRING(1024) NOT NULL,
JobId STRING(1024),
WorkflowId STRING(1024),
ExecutionTime INT64,
DataVolume INT64,
DataImportTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
StatusUpdateTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
NextRefreshTimestamp TIMESTAMP,
) PRIMARY KEY(ImportName);

CREATE TABLE IngestionHistory (
CompletionTimestamp TIMESTAMP NOT NULL OPTIONS ( allow_commit_timestamp = TRUE ),
IngestionFailure Bool NOT NULL,
WorkflowExecutionID STRING(1024) NOT NULL,
DataflowJobID STRING(1024),
IngestedImports ARRAY<STRING(MAX)>,
ExecutionTime INT64,
NodeCount INT64,
EdgeCount INT64,
ObservationCount INT64,
) PRIMARY KEY(CompletionTimestamp DESC);

CREATE TABLE ImportVersionHistory (
ImportName STRING(MAX) NOT NULL,
Version STRING(MAX) NOT NULL,
UpdateTimestamp TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
Comment STRING(MAX),
) PRIMARY KEY (ImportName, UpdateTimestamp DESC);

CREATE TABLE IngestionLock (
LockID STRING(1024) NOT NULL,
LockOwner STRING(1024),
AcquiredTimestamp TIMESTAMP OPTIONS ( allow_commit_timestamp = TRUE ),
) PRIMARY KEY(LockID);

CREATE PROPERTY GRAPH DCGraph
NODE TABLES(
Node
KEY(subject_id)
LABEL Node PROPERTIES(
bytes,
name,
subject_id,
types,
value)
)
EDGE TABLES(
Edge
KEY(subject_id, predicate, object_id, provenance)
SOURCE KEY(subject_id) REFERENCES Node(subject_id)
DESTINATION KEY(object_id) REFERENCES Node(subject_id)
LABEL Edge PROPERTIES(
object_id,
predicate,
provenance,
subject_id)
);
97 changes: 88 additions & 9 deletions import-automation/workflow/ingestion-helper/spanner_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import logging
import os
from google.cloud import spanner
from google.cloud.spanner_admin_database_v1 import DatabaseAdminClient
from google.cloud.spanner_admin_database_v1.types import UpdateDatabaseDdlRequest
from google.cloud.spanner_v1 import Transaction
from google.cloud.spanner_v1.param_types import STRING, TIMESTAMP, Array, INT64
from datetime import datetime, timezone
Expand Down Expand Up @@ -57,13 +59,16 @@ def _acquire(transaction: Transaction) -> bool:
params = {"lockId": self._LOCK_ID}
param_types = {"lockId": STRING}

current_owner, acquired_at = None, None
row_found = False
results = transaction.execute_sql(sql, params, param_types)
for row in results:
row_found = True
current_owner, acquired_at = row[0], row[1]

lock_is_available = False
if current_owner is None:
if not row_found:
lock_is_available = True
elif current_owner is None:
lock_is_available = True
else:
timeout_threshold = datetime.now(timezone.utc) - acquired_at
Expand All @@ -72,13 +77,23 @@ def _acquire(transaction: Transaction) -> bool:
f"Stale lock found, owned by {current_owner}. Acquiring."
)
lock_is_available = True

if lock_is_available:
update_sql = """
UPDATE IngestionLock
SET LockOwner = @workflowId, AcquiredTimestamp = PENDING_COMMIT_TIMESTAMP()
WHERE LockID = @lockId
"""
transaction.execute_update(update_sql,
if not row_found:
sql_statement = """
INSERT INTO IngestionLock (LockID, LockOwner, AcquiredTimestamp)
VALUES (@lockId, @workflowId, PENDING_COMMIT_TIMESTAMP())
"""
log_msg = f"Lock successfully acquired by {workflow_id} (new row created)"
else:
sql_statement = """
UPDATE IngestionLock
SET LockOwner = @workflowId, AcquiredTimestamp = PENDING_COMMIT_TIMESTAMP()
WHERE LockID = @lockId
"""
log_msg = f"Lock successfully acquired by {workflow_id} (existing row updated)"

transaction.execute_update(sql_statement,
params={
"workflowId": workflow_id,
"lockId": self._LOCK_ID
Expand All @@ -87,7 +102,7 @@ def _acquire(transaction: Transaction) -> bool:
"workflowId": STRING,
"lockId": STRING
})
logging.info(f"Lock successfully acquired by {workflow_id}")
logging.info(log_msg)
return True
else:
logging.info(f"Lock is currently held by {current_owner}")
Expand Down Expand Up @@ -383,3 +398,67 @@ def _record(transaction: Transaction):
logging.error(
f'Error updating version history for {import_name}: {e}')
raise

def initialize_database(self):
"""Initializes the database by creating all required tables and proto bundles."""
logging.info("Initializing database...")

with self.database.snapshot() as snapshot:
results = snapshot.execute_sql(
"SELECT table_name FROM information_schema.tables WHERE table_schema = ''"
)
existing_tables = [row[0] for row in results]

logging.info(f"Existing tables: {existing_tables}")

required_tables = ["Node", "Edge", "Observation", "ImportStatus", "IngestionHistory", "ImportVersionHistory", "IngestionLock"]

missing_tables = [t for t in required_tables if t not in existing_tables]

if not missing_tables:
logging.info("All tables already exist.")
return

if len(missing_tables) < len(required_tables):
raise RuntimeError(
f"Database inconsistent state. Missing tables: {missing_tables}. Please clean up manually."
)

logging.info("Creating all tables and proto bundles...")

schema_path = os.path.join(os.path.dirname(__file__), 'schema.sql')
logging.info(f"Reading schema from {schema_path}")
try:
with open(schema_path, 'r') as f:
schema_content = f.read()
ddl_statements = [s.strip() for s in schema_content.split(';') if s.strip()]
except Exception as e:
logging.error(f"Failed to read schema file: {e}")
raise

proto_path = os.path.join(os.path.dirname(__file__), 'storage.pb')
Comment thread
dwnoble marked this conversation as resolved.
logging.info(f"Reading proto descriptors from {proto_path}")
try:
with open(proto_path, 'rb') as f:
proto_descriptors_bytes = f.read()
except Exception as e:
logging.error(f"Failed to read proto descriptors file: {e}")
raise

database_path = self.database.name
logging.info(f"Updating DDL for {database_path} with protos")

try:
admin_client = DatabaseAdminClient()
request = UpdateDatabaseDdlRequest(
database=database_path,
statements=ddl_statements,
proto_descriptors=proto_descriptors_bytes
)
operation = admin_client.update_database_ddl(request=request)
operation.result()
logging.info("Database initialized successfully with protos.")
except Exception as e:
logging.error(f"Failed to update DDL with protos: {e}")
raise
Comment thread
gmechali marked this conversation as resolved.

Loading
Loading