Skip to content

#726 Track critical lazy job failures to enable early failure detection in orchestrator#727

Merged
yruslan merged 2 commits intomainfrom
bugfix/726-fix-critical-lazy-jobs
Mar 23, 2026
Merged

#726 Track critical lazy job failures to enable early failure detection in orchestrator#727
yruslan merged 2 commits intomainfrom
bugfix/726-fix-critical-lazy-jobs

Conversation

@yruslan
Copy link
Copy Markdown
Collaborator

@yruslan yruslan commented Mar 23, 2026

Closes #726

Summary by CodeRabbit

  • Bug Fixes

    • Global critical lazy-job failure state is now cleared on manager reset and halts further lazy task starts once triggered.
    • Pipeline status calculation now considers global lazy-job failures to avoid misreporting success.
  • Improvements

    • Lazy tasks short-circuit when a critical lazy failure exists; related completion/journal updates are skipped.
    • Logging for lazy-job failures improved for clearer traceability.

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 23, 2026

Walkthrough

Adds module-level tracking of critical lazy-job failures and propagates that state so task runners, the orchestrator, and pipeline status logic stop pipeline execution when a critical lazy job fails.

Changes

Cohort / File(s) Summary
Transient Job State Management
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala
Added a synchronized boolean flag criticalLazyJobFailed, a public getter hasCriticalLazyJobFailed: Boolean, a private[core] synchronized setter setCriticalLazyJobFailed(Boolean), and clear it in reset().
Lazy Task Execution
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
runLazyTask short-circuits if TransientJobManager.hasCriticalLazyJobFailed is true. onTaskCompletion detects lazy-job failures (matching LazyJobErrorWrapper) via isFailureOfLazyJob and sets the transient failure flag for critical operations; skips pipeline/journal updates for lazy-job failures when appropriate.
Pipeline Orchestration & State
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala, pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
Orchestrator runJobs now ORs TransientJobManager.hasCriticalLazyJobFailed into hasCriticalJobFailures to halt starting new jobs; pipeline status determination (PipelineStateImpl) treats the transient lazy-job failure flag as a critical failure source when computing final status.

Sequence Diagram

sequenceDiagram
    participant TR as TaskRunnerBase
    participant TJM as TransientJobManager
    participant OI as OrchestratorImpl
    participant PS as PipelineStateImpl

    TR->>TJM: hasCriticalLazyJobFailed()
    alt flag == true
        TR-->>TR: return NotRan (early exit)
    else
        TR->>TR: run lazy task
        TR->>TR: onTaskCompletion(result)
        alt result indicates lazy-job failure & job.isCritical
            TR->>TJM: setCriticalLazyJobFailed(true)
        end
    end

    OI->>TJM: hasCriticalLazyJobFailed()
    alt flag == true
        OI->>OI: hasCriticalJobFailures = true
        OI->>OI: stop starting new jobs / close channel
    end

    PS->>TJM: hasCriticalLazyJobFailed()
    alt flag == true
        PS-->>PS: treat as critical failure when computing pipeline status
    end
Loading

Estimated Code Review Effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly Related PRs

  • #697: Modifies the same core components (TransientJobManager, OrchestratorImpl, TaskRunnerBase, PipelineStateImpl) to introduce/handle critical lazy-job failure propagation.

Poem

🐰 A lazy task tripped in the night,
A flag was set to halt the flight.
TransientManager keeps the score,
Orchestrator stops once more.
Now pipelines hop safe and bright.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically summarizes the main change: tracking critical lazy job failures to enable early failure detection in the orchestrator.
Linked Issues check ✅ Passed All code changes directly address issue #726: TransientJobManager tracks critical lazy job failures, TaskRunnerBase sets the flag when critical lazy jobs fail, and OrchestratorImpl/PipelineStateImpl use this flag to detect failures and prevent further execution.
Out of Scope Changes check ✅ Passed All changes are within scope: four files modified to implement failure tracking for critical lazy jobs with no unrelated refactoring or additional features introduced.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch bugfix/726-fix-critical-lazy-jobs

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (1)

515-524: Consider using type patterns for cleaner matching.

The pattern matching can be simplified using Scala's type patterns instead of isInstanceOf checks.

♻️ Proposed simplification
   private def isFailureOfLazyJob(runStatus: RunStatus): Boolean = {
     runStatus match {
-      case RunStatus.ValidationFailed(ex) if ex.isInstanceOf[LazyJobErrorWrapper] =>
-        true
-      case RunStatus.Failed(ex) if ex.isInstanceOf[LazyJobErrorWrapper] =>
-        true
+      case RunStatus.ValidationFailed(_: LazyJobErrorWrapper) => true
+      case RunStatus.Failed(_: LazyJobErrorWrapper) => true
       case _ =>
         false
     }
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`
around lines 515 - 524, The isFailureOfLazyJob method uses isInstanceOf checks;
replace them with Scala type patterns to simplify the match in
isFailureOfLazyJob: match on RunStatus.ValidationFailed(_: LazyJobErrorWrapper)
and RunStatus.Failed(_: LazyJobErrorWrapper) (or combine them with an
or-pattern) to return true, and keep the default case returning false; update
the pattern arms in the isFailureOfLazyJob function accordingly.
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala (1)

227-227: Consider direct assignment in synchronized context.

Since reset() is already synchronized, calling setCriticalLazyJobFailed(false) acquires the lock redundantly (reentrant but unnecessary overhead). You could directly assign the field instead.

♻️ Proposed simplification
   private[core] def reset(): Unit = synchronized {
     lazyJobs.clear()
     runningJobs.clear()
     taskRunnerOpt = None
-    setCriticalLazyJobFailed(false)
+    criticalLazyJobFailed = false
   }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala`
at line 227, The reset() method is already synchronized, so avoid the redundant
reentrant call to setCriticalLazyJobFailed(false); instead directly assign the
backing field (e.g., criticalLazyJobFailed = false) inside reset() to eliminate
unnecessary lock overhead—update the code in TransientJobManager.reset()
replacing the setCriticalLazyJobFailed(false) call with a direct assignment to
the backing field used by setCriticalLazyJobFailed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`:
- Around line 501-505: The local variable name is misspelled: rename
isFaliedBecauseOfALazyJob to isFailedBecauseOfALazyJob and update all its usages
(e.g., the declaration where it is assigned from
isFailureOfLazyJob(updatedResult.runStatus) and the subsequent if/else that
checks it) so references in TaskRunnerBase (TaskRunnerBase.scala) compile;
search for any other occurrences of the misspelling in this class and update
them to the corrected identifier.

---

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala`:
- Line 227: The reset() method is already synchronized, so avoid the redundant
reentrant call to setCriticalLazyJobFailed(false); instead directly assign the
backing field (e.g., criticalLazyJobFailed = false) inside reset() to eliminate
unnecessary lock overhead—update the code in TransientJobManager.reset()
replacing the setCriticalLazyJobFailed(false) call with a direct assignment to
the backing field used by setCriticalLazyJobFailed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`:
- Around line 515-524: The isFailureOfLazyJob method uses isInstanceOf checks;
replace them with Scala type patterns to simplify the match in
isFailureOfLazyJob: match on RunStatus.ValidationFailed(_: LazyJobErrorWrapper)
and RunStatus.Failed(_: LazyJobErrorWrapper) (or combine them with an
or-pattern) to return true, and keep the default case returning false; update
the pattern arms in the isFailureOfLazyJob function accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: ff67db48-9623-470f-9116-8c9ad37c9ce8

📥 Commits

Reviewing files that changed from the base of the PR and between d16c0fc and c6761eb.

📒 Files selected for processing (3)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 23, 2026

Unit Test Coverage

Overall Project 84.39% -0.04% 🍏
Files changed 76.77%

Module Coverage
pramen:core Jacoco Report 86.35% -0.04%
Files
Module File Coverage
pramen:core Jacoco Report OrchestratorImpl.scala 94.38% 🍏
TransientJobManager.scala 91.11% 🍏
PipelineStateImpl.scala 90.28% -0.19% 🍏
TaskRunnerBase.scala 82.46% -1.3%

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (1)

515-524: Consider using type patterns for more idiomatic Scala.

The current implementation works correctly, but could be simplified using type patterns instead of guards with isInstanceOf.

♻️ More idiomatic pattern matching
 private def isFailureOfLazyJob(runStatus: RunStatus): Boolean = {
   runStatus match {
-    case RunStatus.ValidationFailed(ex) if ex.isInstanceOf[LazyJobErrorWrapper] =>
-      true
-    case RunStatus.Failed(ex) if ex.isInstanceOf[LazyJobErrorWrapper] =>
-      true
+    case RunStatus.ValidationFailed(_: LazyJobErrorWrapper) =>
+      true
+    case RunStatus.Failed(_: LazyJobErrorWrapper) =>
+      true
     case _ =>
       false
   }
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`
around lines 515 - 524, The isFailureOfLazyJob method uses isInstanceOf guards;
replace them with Scala type patterns to be more idiomatic: match the RunStatus
cases directly as RunStatus.ValidationFailed(_: LazyJobErrorWrapper) and
RunStatus.Failed(_: LazyJobErrorWrapper) (or bind the exception if needed) so
you avoid isInstanceOf checks; update the cases in isFailureOfLazyJob to use
these type-pattern matches referencing the RunStatus.ValidationFailed,
RunStatus.Failed and LazyJobErrorWrapper symbols.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`:
- Around line 515-524: The isFailureOfLazyJob method uses isInstanceOf guards;
replace them with Scala type patterns to be more idiomatic: match the RunStatus
cases directly as RunStatus.ValidationFailed(_: LazyJobErrorWrapper) and
RunStatus.Failed(_: LazyJobErrorWrapper) (or bind the exception if needed) so
you avoid isInstanceOf checks; update the cases in isFailureOfLazyJob to use
these type-pattern matches referencing the RunStatus.ValidationFailed,
RunStatus.Failed and LazyJobErrorWrapper symbols.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 4a0ca33e-c46a-48bf-846a-61845812d0f6

📥 Commits

Reviewing files that changed from the base of the PR and between c6761eb and 3aea8e8.

📒 Files selected for processing (4)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
🚧 Files skipped from review as they are similar to previous changes (2)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala

@yruslan yruslan merged commit f4a803d into main Mar 23, 2026
7 checks passed
@yruslan yruslan deleted the bugfix/726-fix-critical-lazy-jobs branch March 23, 2026 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

When critical lazy job fails it doesn't prevent other jobs to run

1 participant