diff --git a/import-automation/executor/app/service/file_uploader.py b/import-automation/executor/app/service/file_uploader.py index c03d52a09b..15e582c8e8 100644 --- a/import-automation/executor/app/service/file_uploader.py +++ b/import-automation/executor/app/service/file_uploader.py @@ -82,12 +82,8 @@ def upload_file(self, src: str, dest: str) -> None: """ _strings_not_empty(src, dest) dest = self._fix_path(dest) - logging.info('GCSFileUploader.upload_file: Uploading %s to %s', src, - dest) blob = self.bucket.blob(dest) blob.upload_from_filename(src) - logging.info('GCSFileUploader.upload_file: Uploaded %s to %s', src, - dest) def upload_string(self, string: str, dest: str) -> None: """Uploads a string to a file in the bucket, overwriting it. @@ -102,10 +98,8 @@ def upload_string(self, string: str, dest: str) -> None: """ _strings_not_empty(dest) dest = self._fix_path(dest) - logging.info('GCSFileUploader.upload_string: Uploading to %s', dest) blob = self.bucket.blob(dest) blob.upload_from_string(string) - logging.info('GCSFileUploader.upload_string: Uploaded to %s', dest) def _fix_path(self, path): """Returns {self.path_prefix}/{path}.""" diff --git a/import-automation/workflow/import-helper/import_helper.py b/import-automation/workflow/import-helper/import_helper.py index 08babd0019..41919dfe46 100644 --- a/import-automation/workflow/import-helper/import_helper.py +++ b/import-automation/workflow/import-helper/import_helper.py @@ -65,7 +65,8 @@ def invoke_import_automation_workflow(import_name: str, """ import_config = { "user_script_args": [f"--version={latest_version}"], - "import_version_override": latest_version + "import_version_override": latest_version, + "graph_data_path": "/**/*.mcf*" } workflow_args = { "importName": import_name, @@ -74,7 +75,7 @@ def invoke_import_automation_workflow(import_name: str, } logging.info( - f"Invoking {IMPORT_AUTOMATION_SPANNER_INGESTION_WORKFLOW_ID} for {import_name}" + f"Invoking {IMPORT_AUTOMATION_WORKFLOW_ID} for {import_name}" ) execution_client = executions_v1.ExecutionsClient() parent = f"projects/{PROJECT_ID}/locations/{LOCATION}/workflows/{IMPORT_AUTOMATION_WORKFLOW_ID}" diff --git a/import-automation/workflow/import-helper/main.py b/import-automation/workflow/import-helper/main.py index c04f4c5115..5b4787d16b 100644 --- a/import-automation/workflow/import-helper/main.py +++ b/import-automation/workflow/import-helper/main.py @@ -48,7 +48,7 @@ def handle_feed_event(request): if import_step == 'spanner_ingestion_workflow': # Update import status in spanner import_status = 'STAGING' - graph_path = attributes.get('graph_path', "/**/*.*") + graph_path = attributes.get('graph_path', "/**/*.mcf*") job_id = attributes.get('feed_name', 'cda_feed') cron_schedule = attributes.get('cron_schedule', '') helper.update_import_status(import_name, import_status, latest_version, diff --git a/scripts/entities/download.sh b/scripts/entities/download.sh index 700c08ce02..46ae39bff5 100755 --- a/scripts/entities/download.sh +++ b/scripts/entities/download.sh @@ -37,7 +37,7 @@ GCS_PATH="gs://${BUCKET_NAME}/${GCS_FOLDER_PREFIX}/${VERSION}" echo "Downloading import ${ENTITY} for version ${VERSION} from ${GCS_PATH} to $(pwd)" mkdir -p "${ENTITY}" -gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/" +gcloud storage cp -r "${GCS_PATH}" "${ENTITY}/" &> copy.log echo "Successfully downloaded ${ENTITY} version ${VERSION}" # TODO: remove after scrpts are checked in diff --git a/scripts/entities/manifest.json b/scripts/entities/manifest.json index 85d0983789..288cb6d153 100644 --- a/scripts/entities/manifest.json +++ b/scripts/entities/manifest.json @@ -77,7 +77,7 @@ "invoke_import_validation": false, "invoke_import_tool": false, "invoke_differ_tool": false, - "skip_input_upload": true + "skip_input_upload": false } }, { diff --git a/tools/import_validation/README.md b/tools/import_validation/README.md index 397b721bf1..34f4f3baf8 100644 --- a/tools/import_validation/README.md +++ b/tools/import_validation/README.md @@ -138,7 +138,7 @@ The following validations are currently supported: | Validator Name | Description | Required Data | `params` Configuration | | ------------------------- | ------------------------------------------------------------------------ | ----------------- | ------------------------------------------------------ | | `SQL_VALIDATOR` | Runs a user-defined SQL query to perform complex validations. | `stats`, `differ` | `query` (string), `condition` (string) | -| `EMPTY_IMPORT_CHECK` | Checks if the import is empty (no observations and no schema). | `differ` | None | +| `EMPTY_IMPORT_CHECK` | Checks if the import is empty (no observations and no schema). | `lint` | None | | `MAX_DATE_LATEST` | Checks that the latest date in the data is from the current year. | `stats` | None | | `MAX_DATE_CONSISTENT` | Checks that the latest date is the same for all StatVars. | `stats` | None | | `MISSING_REFS_COUNT` | Checks that the total number of missing references is within a threshold. | `lint` | `threshold` (integer, defaults to 0) | diff --git a/tools/import_validation/runner.py b/tools/import_validation/runner.py index 9dcc3cbe17..f1364518e6 100644 --- a/tools/import_validation/runner.py +++ b/tools/import_validation/runner.py @@ -63,7 +63,7 @@ def __init__(self, validation_config_path: str, differ_output: str, 'DELETED_RECORDS_PERCENT': (self.validator.validate_deleted_records_percent, 'differ'), 'EMPTY_IMPORT_CHECK': - (self.validator.validate_empty_import, 'differ'), + (self.validator.validate_empty_import, 'lint'), 'MISSING_REFS_COUNT': (self.validator.validate_missing_refs_count, 'lint'), 'LINT_ERROR_COUNT': @@ -216,9 +216,7 @@ def run_validations(self) -> tuple[bool, list[ValidationResult]]: result = validation_func(self.data_sources['stats'], self.data_sources['differ'], rule_params) - elif validator_name in [ - 'DELETED_RECORDS_PERCENT', 'EMPTY_IMPORT_CHECK' - ]: + elif validator_name in ['DELETED_RECORDS_PERCENT']: result = validation_func( self.data_sources['differ'], self.data_sources.get('differ_summary'), rule_params) diff --git a/tools/import_validation/validator.py b/tools/import_validation/validator.py index e16e13555a..712a14787e 100644 --- a/tools/import_validation/validator.py +++ b/tools/import_validation/validator.py @@ -283,44 +283,45 @@ def validate_deleted_records_percent(self, differ_df: pd.DataFrame, 'threshold': threshold }) - def validate_empty_import(self, differ_df: pd.DataFrame, summary: dict, + def validate_empty_import(self, report: dict, params: dict) -> ValidationResult: """Checks if the import is empty (no observations and no schema). Args: - differ_df: A DataFrame containing the differ output (unused but passed for consistency). - summary: A dictionary containing the differ summary. + report: A json object containing the lint report. params: A dictionary containing the validation parameters. Returns: A ValidationResult object. """ - if summary is None: + if report is None: return ValidationResult(ValidationStatus.DATA_ERROR, 'EMPTY_IMPORT_CHECK', - message="Differ summary is missing.") + message="Lint report is missing.") - current_obs_size = summary['current_obs_size'] - current_schema_size = summary['current_schema_size'] + counters = report.get('levelSummary', {}).get('LEVEL_INFO', + {}).get('counters', {}) - if current_obs_size == 0 and current_schema_size == 0: + num_nodes = int(counters.get('NumNodeSuccesses', 0)) + num_rows = int(counters.get('NumRowSuccesses', 0)) + + if num_nodes == 0 and num_rows == 0: return ValidationResult( ValidationStatus.FAILED, 'EMPTY_IMPORT_CHECK', message= - "The import is empty: both current_obs_size and current_schema_size are 0.", + "The import is empty: both NumNodeSuccesses and NumRowSuccesses are 0 in the lint report.", details={ - 'current_obs_size': int(current_obs_size), - 'current_schema_size': int(current_schema_size) + 'num_nodes': num_nodes, + 'num_rows': num_rows }) - return ValidationResult( - ValidationStatus.PASSED, - 'EMPTY_IMPORT_CHECK', - details={ - 'current_obs_size': int(current_obs_size), - 'current_schema_size': int(current_schema_size) - }) + return ValidationResult(ValidationStatus.PASSED, + 'EMPTY_IMPORT_CHECK', + details={ + 'num_nodes': num_nodes, + 'num_rows': num_rows + }) def validate_missing_refs_count(self, report: dict, params: dict) -> ValidationResult: diff --git a/tools/import_validation/validator_test.py b/tools/import_validation/validator_test.py index 124f61d8b1..eadaff88fc 100644 --- a/tools/import_validation/validator_test.py +++ b/tools/import_validation/validator_test.py @@ -187,29 +187,53 @@ def setUp(self): self.validator = Validator() def test_empty_import_fails_when_both_zero(self): - summary = {'current_obs_size': 0, 'current_schema_size': 0} - result = self.validator.validate_empty_import(pd.DataFrame(), summary, - {}) + report = { + 'levelSummary': { + 'LEVEL_INFO': { + 'counters': { + 'NumNodeSuccesses': "0", + 'NumRowSuccesses': "0" + } + } + } + } + result = self.validator.validate_empty_import(report, {}) self.assertEqual(result.status, ValidationStatus.FAILED) - self.assertEqual(result.details['current_obs_size'], 0) - self.assertEqual(result.details['current_schema_size'], 0) + self.assertEqual(result.details['num_nodes'], 0) + self.assertEqual(result.details['num_rows'], 0) - def test_empty_import_passes_when_obs_not_zero(self): - summary = {'current_obs_size': 100, 'current_schema_size': 0} - result = self.validator.validate_empty_import(pd.DataFrame(), summary, - {}) + def test_empty_import_passes_when_rows_not_zero(self): + report = { + 'levelSummary': { + 'LEVEL_INFO': { + 'counters': { + 'NumNodeSuccesses': "0", + 'NumRowSuccesses': "100" + } + } + } + } + result = self.validator.validate_empty_import(report, {}) self.assertEqual(result.status, ValidationStatus.PASSED) - self.assertEqual(result.details['current_obs_size'], 100) + self.assertEqual(result.details['num_rows'], 100) - def test_empty_import_passes_when_schema_not_zero(self): - summary = {'current_obs_size': 0, 'current_schema_size': 5} - result = self.validator.validate_empty_import(pd.DataFrame(), summary, - {}) + def test_empty_import_passes_when_nodes_not_zero(self): + report = { + 'levelSummary': { + 'LEVEL_INFO': { + 'counters': { + 'NumNodeSuccesses': "5", + 'NumRowSuccesses': "0" + } + } + } + } + result = self.validator.validate_empty_import(report, {}) self.assertEqual(result.status, ValidationStatus.PASSED) - self.assertEqual(result.details['current_schema_size'], 5) + self.assertEqual(result.details['num_nodes'], 5) - def test_empty_import_fails_on_missing_summary(self): - result = self.validator.validate_empty_import(pd.DataFrame(), None, {}) + def test_empty_import_fails_on_missing_report(self): + result = self.validator.validate_empty_import(None, {}) self.assertEqual(result.status, ValidationStatus.DATA_ERROR)