[Schema Testing] Modification of ingestion pipeline for testing full load.#500
[Schema Testing] Modification of ingestion pipeline for testing full load.#500gmechali wants to merge 1 commit intodatacommonsorg:masterfrom
Conversation
Not up to standards ⛔🔴 Issues
|
| Category | Results |
|---|---|
| Security | 1 critical |
🟢 Metrics 20 complexity · 0 duplication
Metric Results Complexity 20 Duplication 0
NEW Get contextual insights on your PRs based on Codacy's metrics, along with PR and Jira context, without leaving GitHub. Enable AI reviewer
TIP This summary will be updated as you push new changes. Give us feedback
There was a problem hiding this comment.
Code Review
This pull request introduces a new schema for observation mutations and refactors the ingestion pipeline to handle nodes, edges, and observations separately. Key changes include the implementation of toNewSchemaMutations and filterNewSchemaMutations, as well as updating the pipeline to ensure edges are written only after nodes are committed. Feedback highlights a critical issue where database validation was commented out and a regression in the pipeline's return value that ignores node and edge write completions. Additionally, suggestions were made to avoid hardcoding table names and to centralize the logic for generating series DCIDs to prevent duplication.
|
|
||
| LOGGER.info("Starting Spanner DDL creation..."); | ||
| spannerClient.validateOrInitializeDatabase(); | ||
| // spannerClient.validateOrInitializeDatabase(); |
There was a problem hiding this comment.
| var writtenNodes = | ||
| nodeMutations.apply("WriteNodesToSpanner", spannerClient.getWriteTransform()); | ||
|
|
||
| var waitingEdges = edgeMutations.apply("EdgesWaitOnNodes", Wait.on(writtenNodes.getOutput())); | ||
|
|
||
| waitingEdges.apply("WriteEdgesToSpanner", spannerClient.getWriteTransform()); | ||
|
|
||
| var writeObs = | ||
| observations.apply("WriteObsToSpanner", spannerClient.getWriteGroupedTransform()); | ||
| return writeObs.getOutput(); |
There was a problem hiding this comment.
The method now only returns the output of writeObs, ignoring the completion of writtenNodes and the edge writes. This is a regression from the previous implementation which merged all mutation writes. If a downstream transform depends on the output of groupByGraphOnly, it may start before nodes and edges are fully committed to Spanner. You should merge the outputs of all write transforms before returning.
var writtenNodes =
nodeMutations.apply("WriteNodesToSpanner", spannerClient.getWriteTransform());
var writtenEdges =
edgeMutations
.apply("EdgesWaitOnNodes", Wait.on(writtenNodes.getOutput()))
.apply("WriteEdgesToSpanner", spannerClient.getWriteTransform());
var writtenObs =
observations.apply("WriteObsToSpanner", spannerClient.getWriteGroupedTransform());
return PCollectionList.of(writtenNodes.getOutput())
.and(writtenEdges.getOutput())
.and(writtenObs.getOutput())
.apply("MergeOutputs", Flatten.pCollections());|
|
||
| var nodeMutations = | ||
| graphMutations | ||
| .apply("FilterNodes", Filter.by(kv -> kv.getValue().getTable().equals("Node"))) |
There was a problem hiding this comment.
Avoid hardcoding the table name "Node". Use spannerClient.getNodeTableName() to ensure the filter remains correct if the table name is customized via configuration.
| .apply("FilterNodes", Filter.by(kv -> kv.getValue().getTable().equals("Node"))) | |
| .apply("FilterNodes", Filter.by(kv -> kv.getValue().getTable().equals(spannerClient.getNodeTableName()))) |
|
|
||
| var edgeMutations = | ||
| graphMutations | ||
| .apply("FilterEdges", Filter.by(kv -> kv.getValue().getTable().equals("Edge"))) |
There was a problem hiding this comment.
Avoid hardcoding the table name "Edge". Use spannerClient.getEdgeTableName() to ensure the filter remains correct if the table name is customized via configuration.
| .apply("FilterEdges", Filter.by(kv -> kv.getValue().getTable().equals("Edge"))) | |
| .apply("FilterEdges", Filter.by(kv -> kv.getValue().getTable().equals(spannerClient.getEdgeTableName()))) |
| String seriesDcid = | ||
| "dc/os/" | ||
| + Joiner.on("_") | ||
| .join( | ||
| obs.getVariableMeasured().replace('/', '_'), | ||
| obs.getObservationAbout().replace('/', '_'), | ||
| obs.getFacetId()); |
There was a problem hiding this comment.
No description provided.