Skip to content

[Schema Testing] Modification of ingestion pipeline for testing full load.#500

Open
gmechali wants to merge 1 commit intodatacommonsorg:masterfrom
gmechali:fullload
Open

[Schema Testing] Modification of ingestion pipeline for testing full load.#500
gmechali wants to merge 1 commit intodatacommonsorg:masterfrom
gmechali:fullload

Conversation

@gmechali
Copy link
Copy Markdown
Contributor

No description provided.

@codacy-production
Copy link
Copy Markdown

Not up to standards ⛔

🔴 Issues 1 critical

Alerts:
⚠ 1 issue (≤ 0 issues of at least minor severity)

Results:
1 new issue

Category Results
Security 1 critical

View in Codacy

🟢 Metrics 20 complexity · 0 duplication

Metric Results
Complexity 20
Duplication 0

View in Codacy

NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes. Give us feedback

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new schema for observation mutations and refactors the ingestion pipeline to handle nodes, edges, and observations separately. Key changes include the implementation of toNewSchemaMutations and filterNewSchemaMutations, as well as updating the pipeline to ensure edges are written only after nodes are committed. Feedback highlights a critical issue where database validation was commented out and a regression in the pipeline's return value that ignores node and edge write completions. Additionally, suggestions were made to avoid hardcoding table names and to centralize the logic for generating series DCIDs to prevent duplication.


LOGGER.info("Starting Spanner DDL creation...");
spannerClient.validateOrInitializeDatabase();
// spannerClient.validateOrInitializeDatabase();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Commenting out spannerClient.validateOrInitializeDatabase() disables critical database schema validation and initialization. If this was done for testing purposes, please ensure it is reverted before merging, or implement a conditional check if skipping is intended for specific environments.

Comment on lines +262 to +271
var writtenNodes =
nodeMutations.apply("WriteNodesToSpanner", spannerClient.getWriteTransform());

var waitingEdges = edgeMutations.apply("EdgesWaitOnNodes", Wait.on(writtenNodes.getOutput()));

waitingEdges.apply("WriteEdgesToSpanner", spannerClient.getWriteTransform());

var writeObs =
observations.apply("WriteObsToSpanner", spannerClient.getWriteGroupedTransform());
return writeObs.getOutput();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The method now only returns the output of writeObs, ignoring the completion of writtenNodes and the edge writes. This is a regression from the previous implementation which merged all mutation writes. If a downstream transform depends on the output of groupByGraphOnly, it may start before nodes and edges are fully committed to Spanner. You should merge the outputs of all write transforms before returning.

      var writtenNodes =
          nodeMutations.apply("WriteNodesToSpanner", spannerClient.getWriteTransform());

      var writtenEdges =
          edgeMutations
              .apply("EdgesWaitOnNodes", Wait.on(writtenNodes.getOutput()))
              .apply("WriteEdgesToSpanner", spannerClient.getWriteTransform());

      var writtenObs =
          observations.apply("WriteObsToSpanner", spannerClient.getWriteGroupedTransform());

      return PCollectionList.of(writtenNodes.getOutput())
          .and(writtenEdges.getOutput())
          .and(writtenObs.getOutput())
          .apply("MergeOutputs", Flatten.pCollections());


var nodeMutations =
graphMutations
.apply("FilterNodes", Filter.by(kv -> kv.getValue().getTable().equals("Node")))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid hardcoding the table name "Node". Use spannerClient.getNodeTableName() to ensure the filter remains correct if the table name is customized via configuration.

Suggested change
.apply("FilterNodes", Filter.by(kv -> kv.getValue().getTable().equals("Node")))
.apply("FilterNodes", Filter.by(kv -> kv.getValue().getTable().equals(spannerClient.getNodeTableName())))


var edgeMutations =
graphMutations
.apply("FilterEdges", Filter.by(kv -> kv.getValue().getTable().equals("Edge")))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Avoid hardcoding the table name "Edge". Use spannerClient.getEdgeTableName() to ensure the filter remains correct if the table name is customized via configuration.

Suggested change
.apply("FilterEdges", Filter.by(kv -> kv.getValue().getTable().equals("Edge")))
.apply("FilterEdges", Filter.by(kv -> kv.getValue().getTable().equals(spannerClient.getEdgeTableName())))

Comment on lines +329 to +335
String seriesDcid =
"dc/os/"
+ Joiner.on("_")
.join(
obs.getVariableMeasured().replace('/', '_'),
obs.getObservationAbout().replace('/', '_'),
obs.getFacetId());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for generating seriesDcid is duplicated from Observation.toObsGraph(). To ensure consistency and simplify maintenance, this logic should be centralized, for example by adding a public static method to the Observation class that returns the series DCID.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant