Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions import-automation/executor/app/service/file_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,8 @@ def upload_file(self, src: str, dest: str) -> None:
"""
_strings_not_empty(src, dest)
dest = self._fix_path(dest)
logging.info('GCSFileUploader.upload_file: Uploading %s to %s', src,
dest)
blob = self.bucket.blob(dest)
blob.upload_from_filename(src)
logging.info('GCSFileUploader.upload_file: Uploaded %s to %s', src,
dest)

def upload_string(self, string: str, dest: str) -> None:
"""Uploads a string to a file in the bucket, overwriting it.
Expand All @@ -102,10 +98,8 @@ def upload_string(self, string: str, dest: str) -> None:
"""
_strings_not_empty(dest)
dest = self._fix_path(dest)
logging.info('GCSFileUploader.upload_string: Uploading to %s', dest)
blob = self.bucket.blob(dest)
blob.upload_from_string(string)
logging.info('GCSFileUploader.upload_string: Uploaded to %s', dest)

def _fix_path(self, path):
"""Returns {self.path_prefix}/{path}."""
Expand Down
5 changes: 3 additions & 2 deletions import-automation/workflow/import-helper/import_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ def invoke_import_automation_workflow(import_name: str,
"""
import_config = {
"user_script_args": [f"--version={latest_version}"],
"import_version_override": latest_version
"import_version_override": latest_version,
"graph_data_path": "/**/*.mcf*"
}
workflow_args = {
"importName": import_name,
Expand All @@ -74,7 +75,7 @@ def invoke_import_automation_workflow(import_name: str,
}

logging.info(
f"Invoking {IMPORT_AUTOMATION_SPANNER_INGESTION_WORKFLOW_ID} for {import_name}"
f"Invoking {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}"
)
execution_client = executions_v1.ExecutionsClient()
parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}"
Expand Down
2 changes: 1 addition & 1 deletion import-automation/workflow/import-helper/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def handle_feed_event(request):
if import_step == 'spanner_ingestion_workflow':
# Update import status in spanner
import_status = 'STAGING'
graph_path = attributes.get('graph_path', "/**/*.*")
graph_path = attributes.get('graph_path', "/**/*.mcf*")
Comment thread
vish-cs marked this conversation as resolved.
job_id = attributes.get('feed_name', 'cda_feed')
cron_schedule = attributes.get('cron_schedule', '')
helper.update_import_status(import_name, import_status, latest_version,
Expand Down
2 changes: 1 addition & 1 deletion scripts/entities/download.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}/${VERSION}"

echo "Downloading import ${ENTITY} for version ${VERSION} from ${GCS_PATH} to $(pwd)"
mkdir -p "${ENTITY}"
gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/"
gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/" &> copy.log
Comment thread
vish-cs marked this conversation as resolved.
echo "Successfully downloaded ${ENTITY} version ${VERSION}"

# TODO: remove after scrpts are checked in
Expand Down
2 changes: 1 addition & 1 deletion scripts/entities/manifest.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"invoke_import_validation": false,
"invoke_import_tool": false,
"invoke_differ_tool": false,
"skip_input_upload": true
"skip_input_upload": false
}
},
{
Expand Down
2 changes: 1 addition & 1 deletion tools/import_validation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ The following validations are currently supported:
| Validator Name | Description | Required Data | `params` Configuration |
| ------------------------- | ------------------------------------------------------------------------ | ----------------- | ------------------------------------------------------ |
| `SQL_VALIDATOR` | Runs a user-defined SQL query to perform complex validations. | `stats`, `differ` | `query` (string), `condition` (string) |
| `EMPTY_IMPORT_CHECK` | Checks if the import is empty (no observations and no schema). | `differ` | None |
| `EMPTY_IMPORT_CHECK` | Checks if the import is empty (no observations and no schema). | `lint` | None |
| `MAX_DATE_LATEST` | Checks that the latest date in the data is from the current year. | `stats` | None |
| `MAX_DATE_CONSISTENT` | Checks that the latest date is the same for all StatVars. | `stats` | None |
| `MISSING_REFS_COUNT` | Checks that the total number of missing references is within a threshold. | `lint` | `threshold` (integer, defaults to 0) |
Expand Down
6 changes: 2 additions & 4 deletions tools/import_validation/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def __init__(self, validation_config_path: str, differ_output: str,
'DELETED_RECORDS_PERCENT':
(self.validator.validate_deleted_records_percent, 'differ'),
'EMPTY_IMPORT_CHECK':
(self.validator.validate_empty_import, 'differ'),
(self.validator.validate_empty_import, 'lint'),
'MISSING_REFS_COUNT':
(self.validator.validate_missing_refs_count, 'lint'),
'LINT_ERROR_COUNT':
Expand Down Expand Up @@ -216,9 +216,7 @@ def run_validations(self) -> tuple[bool, list[ValidationResult]]:
result = validation_func(self.data_sources['stats'],
self.data_sources['differ'],
rule_params)
elif validator_name in [
'DELETED_RECORDS_PERCENT', 'EMPTY_IMPORT_CHECK'
]:
elif validator_name in ['DELETED_RECORDS_PERCENT']:
result = validation_func(
self.data_sources['differ'],
self.data_sources.get('differ_summary'), rule_params)
Expand Down
37 changes: 19 additions & 18 deletions tools/import_validation/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,44 +283,45 @@ def validate_deleted_records_percent(self, differ_df: pd.DataFrame,
'threshold': threshold
})

def validate_empty_import(self, differ_df: pd.DataFrame, summary: dict,
def validate_empty_import(self, report: dict,
params: dict) -> ValidationResult:
"""Checks if the import is empty (no observations and no schema).

Args:
differ_df: A DataFrame containing the differ output (unused but passed for consistency).
summary: A dictionary containing the differ summary.
report: A json object containing the lint report.
params: A dictionary containing the validation parameters.

Returns:
A ValidationResult object.
"""
if summary is None:
if report is None:
return ValidationResult(ValidationStatus.DATA_ERROR,
'EMPTY_IMPORT_CHECK',
message="Differ summary is missing.")
message="Lint report is missing.")

current_obs_size = summary['current_obs_size']
current_schema_size = summary['current_schema_size']
counters = report.get('levelSummary', {}).get('LEVEL_INFO',
{}).get('counters', {})

if current_obs_size == 0 and current_schema_size == 0:
num_nodes = int(counters.get('NumNodeSuccesses', 0))
num_rows = int(counters.get('NumRowSuccesses', 0))

if num_nodes == 0 and num_rows == 0:
return ValidationResult(
ValidationStatus.FAILED,
'EMPTY_IMPORT_CHECK',
message=
"The import is empty: both current_obs_size and current_schema_size are 0.",
"The import is empty: both NumNodeSuccesses and NumRowSuccesses are 0 in the lint report.",
details={
'current_obs_size': int(current_obs_size),
'current_schema_size': int(current_schema_size)
'num_nodes': num_nodes,
'num_rows': num_rows
})

return ValidationResult(
ValidationStatus.PASSED,
'EMPTY_IMPORT_CHECK',
details={
'current_obs_size': int(current_obs_size),
'current_schema_size': int(current_schema_size)
})
return ValidationResult(ValidationStatus.PASSED,
'EMPTY_IMPORT_CHECK',
details={
'num_nodes': num_nodes,
'num_rows': num_rows
})

def validate_missing_refs_count(self, report: dict,
params: dict) -> ValidationResult:
Expand Down
58 changes: 41 additions & 17 deletions tools/import_validation/validator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,29 +187,53 @@ def setUp(self):
self.validator = Validator()

def test_empty_import_fails_when_both_zero(self):
summary = {'current_obs_size': 0, 'current_schema_size': 0}
result = self.validator.validate_empty_import(pd.DataFrame(), summary,
{})
report = {
'levelSummary': {
'LEVEL_INFO': {
'counters': {
'NumNodeSuccesses': "0",
'NumRowSuccesses': "0"
}
}
}
}
result = self.validator.validate_empty_import(report, {})
self.assertEqual(result.status, ValidationStatus.FAILED)
self.assertEqual(result.details['current_obs_size'], 0)
self.assertEqual(result.details['current_schema_size'], 0)
self.assertEqual(result.details['num_nodes'], 0)
self.assertEqual(result.details['num_rows'], 0)

def test_empty_import_passes_when_obs_not_zero(self):
summary = {'current_obs_size': 100, 'current_schema_size': 0}
result = self.validator.validate_empty_import(pd.DataFrame(), summary,
{})
def test_empty_import_passes_when_rows_not_zero(self):
report = {
'levelSummary': {
'LEVEL_INFO': {
'counters': {
'NumNodeSuccesses': "0",
'NumRowSuccesses': "100"
}
}
}
}
result = self.validator.validate_empty_import(report, {})
self.assertEqual(result.status, ValidationStatus.PASSED)
self.assertEqual(result.details['current_obs_size'], 100)
self.assertEqual(result.details['num_rows'], 100)

def test_empty_import_passes_when_schema_not_zero(self):
summary = {'current_obs_size': 0, 'current_schema_size': 5}
result = self.validator.validate_empty_import(pd.DataFrame(), summary,
{})
def test_empty_import_passes_when_nodes_not_zero(self):
report = {
'levelSummary': {
'LEVEL_INFO': {
'counters': {
'NumNodeSuccesses': "5",
'NumRowSuccesses': "0"
}
}
}
}
result = self.validator.validate_empty_import(report, {})
self.assertEqual(result.status, ValidationStatus.PASSED)
self.assertEqual(result.details['current_schema_size'], 5)
self.assertEqual(result.details['num_nodes'], 5)

def test_empty_import_fails_on_missing_summary(self):
result = self.validator.validate_empty_import(pd.DataFrame(), None, {})
def test_empty_import_fails_on_missing_report(self):
result = self.validator.validate_empty_import(None, {})
self.assertEqual(result.status, ValidationStatus.DATA_ERROR)


Expand Down
Loading