Skip to content

[Dataflow Streaming] Prevent commit threads from sharing commit streams#37847

Open
arunpandianp wants to merge 4 commits intoapache:masterfrom
arunpandianp:fixMultiCommitStreams
Open

[Dataflow Streaming] Prevent commit threads from sharing commit streams#37847
arunpandianp wants to merge 4 commits intoapache:masterfrom
arunpandianp:fixMultiCommitStreams

Conversation

@arunpandianp
Copy link
Contributor

Since the commit threads were sharing the WindmillStreamPool, the different commit streams can end up sharing commit streams.

This change gives each commit thread its own WindmillStreamPool and avoids the commit stream sharing problem.

Since the commit threads were sharing the WindmillStreamPool, the
different commit streams can end up sharing commit streams.

This change gives each commit thread its own WindmillStreamPool and
avoids the commit stream sharing problem.
@arunpandianp
Copy link
Contributor Author

R: @scwhittle

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request resolves a critical concurrency problem in Dataflow Streaming where multiple commit threads were inadvertently sharing commit streams due to a common WindmillStreamPool. By refactoring the stream factory mechanism, each commit thread is now provisioned with its own isolated WindmillStreamPool, thereby eliminating stream contention and enhancing the reliability and stability of commit operations within the streaming pipeline.

Highlights

  • Prevented Commit Stream Sharing: Addressed a concurrency issue where commit threads were sharing WindmillStreamPool instances, leading to unintended sharing of commit streams.
  • Dedicated Stream Pools: Ensured that each commit thread now receives its own dedicated WindmillStreamPool, isolating commit operations.
  • Refactored Stream Factory: Modified the commitWorkStreamFactory to commitWorkStreamFactoryFactory, changing its type from Supplier<CloseableStream<CommitWorkStream>> to Supplier<Supplier<CloseableStream<CommitWorkStream>>> to facilitate per-thread stream pool creation.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
    • Updated the StreamingEngineWorkerHarnessFactoryOutput builder to use setCommitWorkStreamFactoryFactory instead of setCommitWorkStreamFactory.
    • Modified the WorkCommitter builder to instantiate WindmillStreamPool with a single stream and provide a factory for it via setCommitWorkStreamFactoryFactory.
  • runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java
    • Changed the constructor parameter commitWorkStreamFactory to commitWorkStreamFactoryFactory with a nested Supplier type.
    • Updated the Builder interface to rename setCommitWorkStreamFactory to setCommitWorkStreamFactoryFactory and adjust its parameter type accordingly.
  • runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
    • Refactored the test class field commitWorkStreamFactory to commitWorkStreamFactoryFactory to match the new nested Supplier type.
    • Adjusted the setUp method and various test cases to correctly initialize and utilize the commitWorkStreamFactoryFactory.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@arunpandianp arunpandianp requested a review from scwhittle March 13, 2026 23:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants