Conversation
WalkthroughThis pull request introduces support for partition coalescing in Pramen's metastore persistence layer. A new Changes
Sequence Diagram(s)sequenceDiagram
participant AppConfig as Application Configuration
participant DataFormatParser as DataFormatParser
participant PartitionInfo as PartitionInfo Model
participant MetastorePersistence as MetastorePersistence
AppConfig->>DataFormatParser: provide config with prefer.coalesce setting
DataFormatParser->>DataFormatParser: read prefer.coalesce (or default)
DataFormatParser->>PartitionInfo: create PerRecordCount(rpp, preferCoalesce)
PartitionInfo-->>DataFormatParser: return PartitionInfo instance
DataFormatParser-->>MetastorePersistence: pass PartitionInfo
MetastorePersistence->>MetastorePersistence: pattern match on preferCoalesce
alt preferCoalesce == true
MetastorePersistence->>MetastorePersistence: apply coalesce(numPartitions)
else preferCoalesce == false
MetastorePersistence->>MetastorePersistence: apply repartition(numPartitions)
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala (1)
238-254: Consider adding test coverage forpramen.default.prefer.coalesce.The current tests cover per-table
prefer.coalesceand the implicit default (false), but there's no test verifying thatpramen.default.prefer.coalesce = trueinappConfigis honored when the per-table setting is absent.💡 Suggested test case
"use default prefer coalesce from app config" in { val conf = ConfigFactory.parseString( """format = delta |path = /a/b/c |records.per.partition = 100 |""".stripMargin) val appConf = ConfigFactory.parseString("pramen.default.prefer.coalesce = true") val format = DataFormatParser.fromConfig(conf, appConf) assert(format.asInstanceOf[Delta].partitionInfo == PartitionInfo.PerRecordCount(100, preferCoalesce = true)) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala` around lines 238 - 254, Add a unit test in DataFormatSuite.scala to verify that app-level setting pramen.default.prefer.coalesce is respected when per-table prefer.coalesce is absent: create a config with format = delta, path = /a/b/c and records.per.partition = 100, pass an appConf with "pramen.default.prefer.coalesce = true" to DataFormatParser.fromConfig, and assert that the returned Delta has partitionInfo == PartitionInfo.PerRecordCount(100, preferCoalesce = true) (use DataFormatParser.fromConfig, Delta and PartitionInfo.PerRecordCount to locate the code).pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala (1)
78-96: Consider logging a warning whenprefer.coalesceis set withnumber.of.partitions.The
preferCoalescevalue is parsed on line 81 but silently ignored whennumber.of.partitionsis used (line 88). While technically correct—coalesce only applies to record-count-based partitioning—a warning could help users understand their config has no effect.💡 Optional: Add warning for ignored config
case (Some(nop), None) => + if (ConfigUtils.getOptionBoolean(conf, PREFER_COALESCE_KEY).isDefined) { + log.warn(s"'$PREFER_COALESCE_KEY' is ignored when '$NUMBER_OF_PARTITIONS_KEY' is specified") + } PartitionInfo.Explicit(nop)This would require adding a logger to the object.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala` around lines 78 - 96, The getPartitionInfo method parses preferCoalesce but ignores it when NUMBER_OF_PARTITIONS_KEY is present; add a logger to the DataFormatParser object and emit a warning inside the case (Some(nop), None) branch (or whenever number-of-partitions is used) if preferCoalesce is true to inform users that prefer.coalesce is ignored; reference getPartitionInfo, preferCoalesce, NUMBER_OF_PARTITIONS_KEY and PartitionInfo so the log is placed where the match returns PartitionInfo.Explicit(nop) and keep the behavior unchanged aside from the warning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala`:
- Around line 78-96: The getPartitionInfo method parses preferCoalesce but
ignores it when NUMBER_OF_PARTITIONS_KEY is present; add a logger to the
DataFormatParser object and emit a warning inside the case (Some(nop), None)
branch (or whenever number-of-partitions is used) if preferCoalesce is true to
inform users that prefer.coalesce is ignored; reference getPartitionInfo,
preferCoalesce, NUMBER_OF_PARTITIONS_KEY and PartitionInfo so the log is placed
where the match returns PartitionInfo.Explicit(nop) and keep the behavior
unchanged aside from the warning.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala`:
- Around line 238-254: Add a unit test in DataFormatSuite.scala to verify that
app-level setting pramen.default.prefer.coalesce is respected when per-table
prefer.coalesce is absent: create a config with format = delta, path = /a/b/c
and records.per.partition = 100, pass an appConf with
"pramen.default.prefer.coalesce = true" to DataFormatParser.fromConfig, and
assert that the returned Delta has partitionInfo ==
PartitionInfo.PerRecordCount(100, preferCoalesce = true) (use
DataFormatParser.fromConfig, Delta and PartitionInfo.PerRecordCount to locate
the code).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c46462c3-21cf-45e2-86ac-7d1036e892ec
📒 Files selected for processing (10)
README.mdpramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scalapramen/core/src/main/resources/reference.confpramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scalapramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scalapramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scalapramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala
👮 Files not reviewed due to content moderation or server errors (8)
- pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala
- pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala
- pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala
- pramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scala
- README.md
- pramen/core/src/main/resources/reference.conf
- pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala
- pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala
Unit Test Coverage
Files
|
Closes #718
Summary by CodeRabbit
New Features
number.of.partitionsmetastore table option for specifying partition counts during writesprefer.coalesceconfiguration option to control partitioning strategy (coalesce vs. repartition) for records-per-partition settingsDocumentation