From fe67342cfd0bf6f1ce689d00f56bde1cc9fd920e Mon Sep 17 00:00:00 2001 From: Alexander Makarov Date: Sat, 18 Apr 2026 14:42:18 +0300 Subject: [PATCH 1/2] Marker interface example --- .../ImmediateProcessingAdapterInterface.php | 12 ++++++ src/Adapter/SynchronousAdapter.php | 40 +++---------------- src/Queue.php | 11 +++-- 3 files changed, 26 insertions(+), 37 deletions(-) create mode 100644 src/Adapter/ImmediateProcessingAdapterInterface.php diff --git a/src/Adapter/ImmediateProcessingAdapterInterface.php b/src/Adapter/ImmediateProcessingAdapterInterface.php new file mode 100644 index 00000000..9f43a46e --- /dev/null +++ b/src/Adapter/ImmediateProcessingAdapterInterface.php @@ -0,0 +1,12 @@ +runExisting(function (MessageInterface $message): bool { - $this->worker->process($message, $this->queue); - - return true; - }); - } + private int $processed = 0; public function runExisting(callable $handlerCallback): void { - $result = true; - while (isset($this->messages[$this->current]) && $result === true) { - $result = $handlerCallback($this->messages[$this->current]); - unset($this->messages[$this->current]); - $this->current++; - } + // Messages are handled immediately in Queue::push(). } public function status(string|int $id): MessageStatus @@ -50,21 +26,17 @@ public function status(string|int $id): MessageStatus throw new InvalidArgumentException('This adapter IDs start with 0.'); } - if ($id < $this->current) { + if ($id < $this->processed) { return MessageStatus::DONE; } - if (isset($this->messages[$id])) { - return MessageStatus::WAITING; - } - throw new InvalidArgumentException('There is no message with the given ID.'); } public function push(MessageInterface $message): MessageInterface { - $key = count($this->messages) + $this->current; - $this->messages[] = $message; + $key = $this->processed; + $this->processed++; return new IdEnvelope($message, $key); } diff --git a/src/Queue.php b/src/Queue.php index 7e2c7531..087a5fdb 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -7,6 +7,7 @@ use BackedEnum; use Psr\Log\LoggerInterface; use Yiisoft\Queue\Adapter\AdapterInterface; +use Yiisoft\Queue\Adapter\ImmediateProcessingAdapterInterface; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Exception\AdapterConfiguration\AdapterNotConfiguredException; use Yiisoft\Queue\Message\MessageInterface; @@ -58,9 +59,9 @@ public function push( ); $request = new PushRequest($message, $this->adapter); - $message = $this->pushMiddlewareDispatcher - ->dispatch($request, $this->createPushHandler(...$middlewareDefinitions)) - ->getMessage(); + $request = $this->pushMiddlewareDispatcher + ->dispatch($request, $this->createPushHandler(...$middlewareDefinitions)); + $message = $request->getMessage(); /** @var string $messageId */ $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null'; @@ -69,6 +70,10 @@ public function push( ['handlerName' => $message->getHandlerName(), 'id' => $messageId], ); + if ($request->getAdapter() instanceof ImmediateProcessingAdapterInterface) { + $this->handle($message); + } + return $message; } From f1109650f6dfbf58adb71a49f7b4234c0042c4e3 Mon Sep 17 00:00:00 2001 From: samdark <47294+samdark@users.noreply.github.com> Date: Sat, 18 Apr 2026 11:43:07 +0000 Subject: [PATCH 2/2] Apply PHP CS Fixer and Rector changes (CI) --- src/Adapter/ImmediateProcessingAdapterInterface.php | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Adapter/ImmediateProcessingAdapterInterface.php b/src/Adapter/ImmediateProcessingAdapterInterface.php index 9f43a46e..19239d8a 100644 --- a/src/Adapter/ImmediateProcessingAdapterInterface.php +++ b/src/Adapter/ImmediateProcessingAdapterInterface.php @@ -7,6 +7,4 @@ /** * Marker for adapters whose messages must be handled in the same process right after push. */ -interface ImmediateProcessingAdapterInterface -{ -} +interface ImmediateProcessingAdapterInterface {}