Conversation
…on in orchestrator
WalkthroughAdds module-level tracking of critical lazy-job failures and propagates that state so task runners, the orchestrator, and pipeline status logic stop pipeline execution when a critical lazy job fails. Changes
Sequence DiagramsequenceDiagram
participant TR as TaskRunnerBase
participant TJM as TransientJobManager
participant OI as OrchestratorImpl
participant PS as PipelineStateImpl
TR->>TJM: hasCriticalLazyJobFailed()
alt flag == true
TR-->>TR: return NotRan (early exit)
else
TR->>TR: run lazy task
TR->>TR: onTaskCompletion(result)
alt result indicates lazy-job failure & job.isCritical
TR->>TJM: setCriticalLazyJobFailed(true)
end
end
OI->>TJM: hasCriticalLazyJobFailed()
alt flag == true
OI->>OI: hasCriticalJobFailures = true
OI->>OI: stop starting new jobs / close channel
end
PS->>TJM: hasCriticalLazyJobFailed()
alt flag == true
PS-->>PS: treat as critical failure when computing pipeline status
end
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (1)
515-524: Consider using type patterns for cleaner matching.The pattern matching can be simplified using Scala's type patterns instead of
isInstanceOfchecks.♻️ Proposed simplification
private def isFailureOfLazyJob(runStatus: RunStatus): Boolean = { runStatus match { - case RunStatus.ValidationFailed(ex) if ex.isInstanceOf[LazyJobErrorWrapper] => - true - case RunStatus.Failed(ex) if ex.isInstanceOf[LazyJobErrorWrapper] => - true + case RunStatus.ValidationFailed(_: LazyJobErrorWrapper) => true + case RunStatus.Failed(_: LazyJobErrorWrapper) => true case _ => false } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala` around lines 515 - 524, The isFailureOfLazyJob method uses isInstanceOf checks; replace them with Scala type patterns to simplify the match in isFailureOfLazyJob: match on RunStatus.ValidationFailed(_: LazyJobErrorWrapper) and RunStatus.Failed(_: LazyJobErrorWrapper) (or combine them with an or-pattern) to return true, and keep the default case returning false; update the pattern arms in the isFailureOfLazyJob function accordingly.pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala (1)
227-227: Consider direct assignment in synchronized context.Since
reset()is alreadysynchronized, callingsetCriticalLazyJobFailed(false)acquires the lock redundantly (reentrant but unnecessary overhead). You could directly assign the field instead.♻️ Proposed simplification
private[core] def reset(): Unit = synchronized { lazyJobs.clear() runningJobs.clear() taskRunnerOpt = None - setCriticalLazyJobFailed(false) + criticalLazyJobFailed = false }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala` at line 227, The reset() method is already synchronized, so avoid the redundant reentrant call to setCriticalLazyJobFailed(false); instead directly assign the backing field (e.g., criticalLazyJobFailed = false) inside reset() to eliminate unnecessary lock overhead—update the code in TransientJobManager.reset() replacing the setCriticalLazyJobFailed(false) call with a direct assignment to the backing field used by setCriticalLazyJobFailed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`:
- Around line 501-505: The local variable name is misspelled: rename
isFaliedBecauseOfALazyJob to isFailedBecauseOfALazyJob and update all its usages
(e.g., the declaration where it is assigned from
isFailureOfLazyJob(updatedResult.runStatus) and the subsequent if/else that
checks it) so references in TaskRunnerBase (TaskRunnerBase.scala) compile;
search for any other occurrences of the misspelling in this class and update
them to the corrected identifier.
---
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala`:
- Line 227: The reset() method is already synchronized, so avoid the redundant
reentrant call to setCriticalLazyJobFailed(false); instead directly assign the
backing field (e.g., criticalLazyJobFailed = false) inside reset() to eliminate
unnecessary lock overhead—update the code in TransientJobManager.reset()
replacing the setCriticalLazyJobFailed(false) call with a direct assignment to
the backing field used by setCriticalLazyJobFailed.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`:
- Around line 515-524: The isFailureOfLazyJob method uses isInstanceOf checks;
replace them with Scala type patterns to simplify the match in
isFailureOfLazyJob: match on RunStatus.ValidationFailed(_: LazyJobErrorWrapper)
and RunStatus.Failed(_: LazyJobErrorWrapper) (or combine them with an
or-pattern) to return true, and keep the default case returning false; update
the pattern arms in the isFailureOfLazyJob function accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ff67db48-9623-470f-9116-8c9ad37c9ce8
📒 Files selected for processing (3)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scalapramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala
Outdated
Show resolved
Hide resolved
Unit Test Coverage
Files
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala (1)
515-524: Consider using type patterns for more idiomatic Scala.The current implementation works correctly, but could be simplified using type patterns instead of guards with
isInstanceOf.♻️ More idiomatic pattern matching
private def isFailureOfLazyJob(runStatus: RunStatus): Boolean = { runStatus match { - case RunStatus.ValidationFailed(ex) if ex.isInstanceOf[LazyJobErrorWrapper] => - true - case RunStatus.Failed(ex) if ex.isInstanceOf[LazyJobErrorWrapper] => - true + case RunStatus.ValidationFailed(_: LazyJobErrorWrapper) => + true + case RunStatus.Failed(_: LazyJobErrorWrapper) => + true case _ => false } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala` around lines 515 - 524, The isFailureOfLazyJob method uses isInstanceOf guards; replace them with Scala type patterns to be more idiomatic: match the RunStatus cases directly as RunStatus.ValidationFailed(_: LazyJobErrorWrapper) and RunStatus.Failed(_: LazyJobErrorWrapper) (or bind the exception if needed) so you avoid isInstanceOf checks; update the cases in isFailureOfLazyJob to use these type-pattern matches referencing the RunStatus.ValidationFailed, RunStatus.Failed and LazyJobErrorWrapper symbols.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala`:
- Around line 515-524: The isFailureOfLazyJob method uses isInstanceOf guards;
replace them with Scala type patterns to be more idiomatic: match the RunStatus
cases directly as RunStatus.ValidationFailed(_: LazyJobErrorWrapper) and
RunStatus.Failed(_: LazyJobErrorWrapper) (or bind the exception if needed) so
you avoid isInstanceOf checks; update the cases in isFailureOfLazyJob to use
these type-pattern matches referencing the RunStatus.ValidationFailed,
RunStatus.Failed and LazyJobErrorWrapper symbols.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4a0ca33e-c46a-48bf-846a-61845812d0f6
📒 Files selected for processing (4)
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scalapramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scalapramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scalapramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala
🚧 Files skipped from review as they are similar to previous changes (2)
- pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala
- pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala
Closes #726
Summary by CodeRabbit
Bug Fixes
Improvements