Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/Adapter/ImmediateProcessingAdapterInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Adapter;

/**
* Marker for adapters whose messages must be handled in the same process right after push.
*/
interface ImmediateProcessingAdapterInterface {}
40 changes: 6 additions & 34 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,15 @@
use InvalidArgumentException;
use Yiisoft\Queue\MessageStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;

use function count;

final class SynchronousAdapter implements AdapterInterface
final class SynchronousAdapter implements AdapterInterface, ImmediateProcessingAdapterInterface
{
private array $messages = [];
private int $current = 0;

public function __construct(
private readonly WorkerInterface $worker,
private readonly QueueInterface $queue,
) {}

public function __destruct()
{
$this->runExisting(function (MessageInterface $message): bool {
$this->worker->process($message, $this->queue);

return true;
});
}
private int $processed = 0;

public function runExisting(callable $handlerCallback): void
{
$result = true;
while (isset($this->messages[$this->current]) && $result === true) {
$result = $handlerCallback($this->messages[$this->current]);
unset($this->messages[$this->current]);
$this->current++;
}
// Messages are handled immediately in Queue::push().
}
Comment on lines +12 to 19
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SynchronousAdapter’s constructor dependencies were removed and processing responsibility moved to Queue::push() via the new marker interface. This is a backward-incompatible change for any code instantiating SynchronousAdapter with (WorkerInterface, QueueInterface) as currently done in this repo’s tests/docs, and will cause runtime/compile errors unless all call sites are updated or a BC shim (e.g., optional constructor parameters or a named constructor) is kept.

Copilot uses AI. Check for mistakes.

public function status(string|int $id): MessageStatus
Expand All @@ -50,21 +26,17 @@ public function status(string|int $id): MessageStatus
throw new InvalidArgumentException('This adapter IDs start with 0.');
}

if ($id < $this->current) {
if ($id < $this->processed) {
return MessageStatus::DONE;
}

if (isset($this->messages[$id])) {
return MessageStatus::WAITING;
}

throw new InvalidArgumentException('There is no message with the given ID.');
}

public function push(MessageInterface $message): MessageInterface
{
$key = count($this->messages) + $this->current;
$this->messages[] = $message;
$key = $this->processed;
$this->processed++;

return new IdEnvelope($message, $key);
Comment on lines 36 to 41
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$processed is incremented in push() before the message is actually handled (handling happens in Queue::push()). If Worker::process() throws, callers could still observe status(id) === DONE even though processing failed. Consider updating the design so the adapter’s DONE accounting is updated only after successful processing (e.g., a post-handle hook / explicit acknowledgement), or adjust status() semantics/documentation to avoid reporting DONE on failed pushes.

Copilot uses AI. Check for mistakes.
}
Expand Down
11 changes: 8 additions & 3 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use BackedEnum;
use Psr\Log\LoggerInterface;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Adapter\ImmediateProcessingAdapterInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Exception\AdapterConfiguration\AdapterNotConfiguredException;
use Yiisoft\Queue\Message\MessageInterface;
Expand Down Expand Up @@ -58,9 +59,9 @@ public function push(
);

$request = new PushRequest($message, $this->adapter);
$message = $this->pushMiddlewareDispatcher
->dispatch($request, $this->createPushHandler(...$middlewareDefinitions))
->getMessage();
$request = $this->pushMiddlewareDispatcher
->dispatch($request, $this->createPushHandler(...$middlewareDefinitions));
$message = $request->getMessage();

/** @var string $messageId */
$messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
Expand All @@ -69,6 +70,10 @@ public function push(
['handlerName' => $message->getHandlerName(), 'id' => $messageId],
);

if ($request->getAdapter() instanceof ImmediateProcessingAdapterInterface) {
$this->handle($message);
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The adapter used for the push is taken from the middleware-dispatched PushRequest, but immediate processing passes $this (which still holds the original queue adapter) into Worker::process(). If any push middleware swaps the adapter via PushRequest::withAdapter(), handlers will run with a Queue instance configured with the wrong adapter. Consider processing using a queue instance that has $request->getAdapter() attached (e.g., clone via withAdapter() when it differs).

Suggested change
$this->handle($message);
$queue = $this;
if ($request->getAdapter() !== $this->adapter) {
$queue = clone $this;
$queue->adapter = $request->getAdapter();
}
$queue->handle($message);

Copilot uses AI. Check for mistakes.
}
Comment on lines +73 to +75
Copy link

Copilot AI Apr 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Immediate-processing path calls $this->handle($message), but handle() always calls $this->loop->canContinue(). Since the return value is ignored here, this adds unnecessary coupling/side effects (e.g., memory-limit checks) during push. Consider calling $this->worker->process($message, $queue) directly (or introducing a dedicated method) to avoid invoking the loop for producer-side immediate handling.

Copilot uses AI. Check for mistakes.

return $message;
}

Expand Down
Loading