Skip to content

#718 Add support for "prefer.coalesce" when repartitioning metastore tables.#728

Merged
yruslan merged 1 commit intomainfrom
feature/718-add-support-for-partition-colescing
Mar 26, 2026
Merged

#718 Add support for "prefer.coalesce" when repartitioning metastore tables.#728
yruslan merged 1 commit intomainfrom
feature/718-add-support-for-partition-colescing

Conversation

@yruslan
Copy link
Copy Markdown
Collaborator

@yruslan yruslan commented Mar 26, 2026

Closes #718

Summary by CodeRabbit

  • New Features

    • Added number.of.partitions metastore table option for specifying partition counts during writes
    • Added prefer.coalesce configuration option to control partitioning strategy (coalesce vs. repartition) for records-per-partition settings
  • Documentation

    • Updated configuration documentation with new metastore table partitioning options and example configurations

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 26, 2026

Walkthrough

This pull request introduces support for partition coalescing in Pramen's metastore persistence layer. A new preferCoalesce boolean flag is added to PartitionInfo.PerRecordCount alongside configuration keys (prefer.coalesce, pramen.default.prefer.coalesce) that allow users to choose between coalesce() and repartition() strategies when partitioning based on record count.

Changes

Cohort / File(s) Summary
Documentation & Configuration
README.md, pramen/core/src/main/resources/reference.conf
Added configuration documentation and default values for number.of.partitions and prefer.coalesce options in metastore table settings. New configuration keys pramen.prefer.coalesce and pramen.default.prefer.coalesce default to false.
API Definition
pramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scala
Extended PartitionInfo.PerRecordCount case class from single parameter to two parameters: added preferCoalesce: Boolean field alongside existing recordsPerPartition: Long.
Core Configuration Parsing
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala
Added configuration key constants and logic to extract defaultPreferCoalesce from app config, pass it through partition parsing methods, and construct PartitionInfo.PerRecordCount with the new preferCoalesce parameter from config overrides or defaults.
Persistence Implementation
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala
Updated pattern matching in applyPartitioning to handle preferCoalesce flag; uses coalesce(numPartitions) when flag is true, otherwise uses repartition(numPartitions).
Pipeline YAML Generation
pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala
Modified YAML generation in getPartitionJaml to include prefer_coalesce field alongside records_per_partition in metastore table output.
Test Updates
pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala, MetaTableSuite.scala, MetastorePersistenceSuite.scala, PythonTransformationJobSuite.scala
Updated test assertions to expect preferCoalesce parameter in PartitionInfo.PerRecordCount constructor calls; added new test case verifying prefer.coalesce = true configuration parsing behavior.

Sequence Diagram(s)

sequenceDiagram
    participant AppConfig as Application Configuration
    participant DataFormatParser as DataFormatParser
    participant PartitionInfo as PartitionInfo Model
    participant MetastorePersistence as MetastorePersistence

    AppConfig->>DataFormatParser: provide config with prefer.coalesce setting
    DataFormatParser->>DataFormatParser: read prefer.coalesce (or default)
    DataFormatParser->>PartitionInfo: create PerRecordCount(rpp, preferCoalesce)
    PartitionInfo-->>DataFormatParser: return PartitionInfo instance
    DataFormatParser-->>MetastorePersistence: pass PartitionInfo
    MetastorePersistence->>MetastorePersistence: pattern match on preferCoalesce
    alt preferCoalesce == true
        MetastorePersistence->>MetastorePersistence: apply coalesce(numPartitions)
    else preferCoalesce == false
        MetastorePersistence->>MetastorePersistence: apply repartition(numPartitions)
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • jozefbakus

Poem

🐰 A coalesce charm we now bring,
No shuffle dance, just merge on wing!
Records aligned, partitions neat,
Pramen's performance now complete.

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely summarizes the main change: adding support for 'prefer.coalesce' option when repartitioning metastore tables, which matches the core objective of the changeset.
Linked Issues check ✅ Passed The PR fully implements the requirements from issue #718: it adds support for coalescing partitions during repartitioning via the 'prefer.coalesce' option, allowing users to use coalesce() instead of repartition() to avoid shuffle overhead.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing the 'prefer.coalesce' feature: configuration keys, API updates, logic implementation, documentation, and comprehensive test coverage. No unrelated changes detected.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feature/718-add-support-for-partition-colescing

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala (1)

238-254: Consider adding test coverage for pramen.default.prefer.coalesce.

The current tests cover per-table prefer.coalesce and the implicit default (false), but there's no test verifying that pramen.default.prefer.coalesce = true in appConfig is honored when the per-table setting is absent.

💡 Suggested test case
"use default prefer coalesce from app config" in {
  val conf = ConfigFactory.parseString(
    """format = delta
      |path = /a/b/c
      |records.per.partition = 100
      |""".stripMargin)

  val appConf = ConfigFactory.parseString("pramen.default.prefer.coalesce = true")

  val format = DataFormatParser.fromConfig(conf, appConf)

  assert(format.asInstanceOf[Delta].partitionInfo == PartitionInfo.PerRecordCount(100, preferCoalesce = true))
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala`
around lines 238 - 254, Add a unit test in DataFormatSuite.scala to verify that
app-level setting pramen.default.prefer.coalesce is respected when per-table
prefer.coalesce is absent: create a config with format = delta, path = /a/b/c
and records.per.partition = 100, pass an appConf with
"pramen.default.prefer.coalesce = true" to DataFormatParser.fromConfig, and
assert that the returned Delta has partitionInfo ==
PartitionInfo.PerRecordCount(100, preferCoalesce = true) (use
DataFormatParser.fromConfig, Delta and PartitionInfo.PerRecordCount to locate
the code).
pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala (1)

78-96: Consider logging a warning when prefer.coalesce is set with number.of.partitions.

The preferCoalesce value is parsed on line 81 but silently ignored when number.of.partitions is used (line 88). While technically correct—coalesce only applies to record-count-based partitioning—a warning could help users understand their config has no effect.

💡 Optional: Add warning for ignored config
       case (Some(nop), None) =>
+        if (ConfigUtils.getOptionBoolean(conf, PREFER_COALESCE_KEY).isDefined) {
+          log.warn(s"'$PREFER_COALESCE_KEY' is ignored when '$NUMBER_OF_PARTITIONS_KEY' is specified")
+        }
         PartitionInfo.Explicit(nop)

This would require adding a logger to the object.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala`
around lines 78 - 96, The getPartitionInfo method parses preferCoalesce but
ignores it when NUMBER_OF_PARTITIONS_KEY is present; add a logger to the
DataFormatParser object and emit a warning inside the case (Some(nop), None)
branch (or whenever number-of-partitions is used) if preferCoalesce is true to
inform users that prefer.coalesce is ignored; reference getPartitionInfo,
preferCoalesce, NUMBER_OF_PARTITIONS_KEY and PartitionInfo so the log is placed
where the match returns PartitionInfo.Explicit(nop) and keep the behavior
unchanged aside from the warning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala`:
- Around line 78-96: The getPartitionInfo method parses preferCoalesce but
ignores it when NUMBER_OF_PARTITIONS_KEY is present; add a logger to the
DataFormatParser object and emit a warning inside the case (Some(nop), None)
branch (or whenever number-of-partitions is used) if preferCoalesce is true to
inform users that prefer.coalesce is ignored; reference getPartitionInfo,
preferCoalesce, NUMBER_OF_PARTITIONS_KEY and PartitionInfo so the log is placed
where the match returns PartitionInfo.Explicit(nop) and keep the behavior
unchanged aside from the warning.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala`:
- Around line 238-254: Add a unit test in DataFormatSuite.scala to verify that
app-level setting pramen.default.prefer.coalesce is respected when per-table
prefer.coalesce is absent: create a config with format = delta, path = /a/b/c
and records.per.partition = 100, pass an appConf with
"pramen.default.prefer.coalesce = true" to DataFormatParser.fromConfig, and
assert that the returned Delta has partitionInfo ==
PartitionInfo.PerRecordCount(100, preferCoalesce = true) (use
DataFormatParser.fromConfig, Delta and PartitionInfo.PerRecordCount to locate
the code).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c46462c3-21cf-45e2-86ac-7d1036e892ec

📥 Commits

Reviewing files that changed from the base of the PR and between 558630a and b3e3caf.

📒 Files selected for processing (10)
  • README.md
  • pramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scala
  • pramen/core/src/main/resources/reference.conf
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/model/DataFormatParser.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/DataFormatSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala
👮 Files not reviewed due to content moderation or server errors (8)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/model/MetaTableSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala
  • pramen/api/src/main/scala/za/co/absa/pramen/api/PartitionInfo.scala
  • README.md
  • pramen/core/src/main/resources/reference.conf
  • pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala
  • pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala

@github-actions
Copy link
Copy Markdown

Unit Test Coverage

Overall Project 84.4% -0.01% 🍏
Files changed 95.68% 🍏

Module Coverage
pramen:core Jacoco Report 86.35% -0.01% 🍏
Files
Module File Coverage
pramen:core Jacoco Report PythonTransformationJob.scala 90.58% -0.14% 🍏
DataFormatParser.scala 89.43% 🍏
MetastorePersistenceParquet.scala 84.75% -0.53% 🍏

@yruslan yruslan merged commit e4d189f into main Mar 26, 2026
7 checks passed
@yruslan yruslan deleted the feature/718-add-support-for-partition-colescing branch March 26, 2026 14:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support coalescing of partitions

1 participant