Skip to content

Marker interface example#267

Closed
samdark wants to merge 2 commits intodocsfrom
marker-interface
Closed

Marker interface example#267
samdark wants to merge 2 commits intodocsfrom
marker-interface

Conversation

@samdark
Copy link
Copy Markdown
Member

@samdark samdark commented Apr 18, 2026

Q A
Is bugfix? ✔️/❌
New feature? ✔️/❌
Breaks BC? ✔️/❌
Tests pass? ✔️/❌
Fixed issues comma-separated list of tickets # fixed by the PR, if any

Copilot AI review requested due to automatic review settings April 18, 2026 11:42
@samdark samdark marked this pull request as draft April 18, 2026 11:42
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a marker interface to identify adapters whose messages should be handled immediately after Queue::push(), and updates the built-in SynchronousAdapter to use that behavior instead of buffering messages for later processing.

Changes:

  • Added ImmediateProcessingAdapterInterface as a marker for “handle immediately after push” adapters.
  • Updated Queue::push() to invoke the worker immediately when the push adapter implements the marker interface.
  • Refactored SynchronousAdapter to implement the marker interface and stop buffering messages.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
src/Queue.php Triggers immediate handling in push() when the adapter is marked as immediate-processing.
src/Adapter/SynchronousAdapter.php Converts adapter to immediate-processing model and removes buffering/flush-on-destruct behavior.
src/Adapter/ImmediateProcessingAdapterInterface.php Adds a marker interface to opt adapters into immediate handling.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 36 to 41
public function push(MessageInterface $message): MessageInterface
{
$key = count($this->messages) + $this->current;
$this->messages[] = $message;
$key = $this->processed;
$this->processed++;

return new IdEnvelope($message, $key);
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$processed is incremented in push() before the message is actually handled (handling happens in Queue::push()). If Worker::process() throws, callers could still observe status(id) === DONE even though processing failed. Consider updating the design so the adapter’s DONE accounting is updated only after successful processing (e.g., a post-handle hook / explicit acknowledgement), or adjust status() semantics/documentation to avoid reporting DONE on failed pushes.

Copilot uses AI. Check for mistakes.
Comment thread src/Queue.php
Comment on lines +73 to +75
if ($request->getAdapter() instanceof ImmediateProcessingAdapterInterface) {
$this->handle($message);
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Immediate-processing path calls $this->handle($message), but handle() always calls $this->loop->canContinue(). Since the return value is ignored here, this adds unnecessary coupling/side effects (e.g., memory-limit checks) during push. Consider calling $this->worker->process($message, $queue) directly (or introducing a dedicated method) to avoid invoking the loop for producer-side immediate handling.

Copilot uses AI. Check for mistakes.
Comment thread src/Queue.php
);

if ($request->getAdapter() instanceof ImmediateProcessingAdapterInterface) {
$this->handle($message);
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The adapter used for the push is taken from the middleware-dispatched PushRequest, but immediate processing passes $this (which still holds the original queue adapter) into Worker::process(). If any push middleware swaps the adapter via PushRequest::withAdapter(), handlers will run with a Queue instance configured with the wrong adapter. Consider processing using a queue instance that has $request->getAdapter() attached (e.g., clone via withAdapter() when it differs).

Suggested change
$this->handle($message);
$queue = $this;
if ($request->getAdapter() !== $this->adapter) {
$queue = clone $this;
$queue->adapter = $request->getAdapter();
}
$queue->handle($message);

Copilot uses AI. Check for mistakes.
Comment on lines +12 to 19
final class SynchronousAdapter implements AdapterInterface, ImmediateProcessingAdapterInterface
{
private array $messages = [];
private int $current = 0;

public function __construct(
private readonly WorkerInterface $worker,
private readonly QueueInterface $queue,
) {}

public function __destruct()
{
$this->runExisting(function (MessageInterface $message): bool {
$this->worker->process($message, $this->queue);

return true;
});
}
private int $processed = 0;

public function runExisting(callable $handlerCallback): void
{
$result = true;
while (isset($this->messages[$this->current]) && $result === true) {
$result = $handlerCallback($this->messages[$this->current]);
unset($this->messages[$this->current]);
$this->current++;
}
// Messages are handled immediately in Queue::push().
}
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SynchronousAdapter’s constructor dependencies were removed and processing responsibility moved to Queue::push() via the new marker interface. This is a backward-incompatible change for any code instantiating SynchronousAdapter with (WorkerInterface, QueueInterface) as currently done in this repo’s tests/docs, and will cause runtime/compile errors unless all call sites are updated or a BC shim (e.g., optional constructor parameters or a named constructor) is kept.

Copilot uses AI. Check for mistakes.
@samdark samdark closed this Apr 18, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants