diff --git a/src/Adapter/ImmediateProcessingAdapterInterface.php b/src/Adapter/ImmediateProcessingAdapterInterface.php new file mode 100644 index 00000000..19239d8a --- /dev/null +++ b/src/Adapter/ImmediateProcessingAdapterInterface.php @@ -0,0 +1,10 @@ +runExisting(function (MessageInterface $message): bool { - $this->worker->process($message, $this->queue); - - return true; - }); - } + private int $processed = 0; public function runExisting(callable $handlerCallback): void { - $result = true; - while (isset($this->messages[$this->current]) && $result === true) { - $result = $handlerCallback($this->messages[$this->current]); - unset($this->messages[$this->current]); - $this->current++; - } + // Messages are handled immediately in Queue::push(). } public function status(string|int $id): MessageStatus @@ -50,21 +26,17 @@ public function status(string|int $id): MessageStatus throw new InvalidArgumentException('This adapter IDs start with 0.'); } - if ($id < $this->current) { + if ($id < $this->processed) { return MessageStatus::DONE; } - if (isset($this->messages[$id])) { - return MessageStatus::WAITING; - } - throw new InvalidArgumentException('There is no message with the given ID.'); } public function push(MessageInterface $message): MessageInterface { - $key = count($this->messages) + $this->current; - $this->messages[] = $message; + $key = $this->processed; + $this->processed++; return new IdEnvelope($message, $key); } diff --git a/src/Queue.php b/src/Queue.php index 7e2c7531..087a5fdb 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -7,6 +7,7 @@ use BackedEnum; use Psr\Log\LoggerInterface; use Yiisoft\Queue\Adapter\AdapterInterface; +use Yiisoft\Queue\Adapter\ImmediateProcessingAdapterInterface; use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Exception\AdapterConfiguration\AdapterNotConfiguredException; use Yiisoft\Queue\Message\MessageInterface; @@ -58,9 +59,9 @@ public function push( ); $request = new PushRequest($message, $this->adapter); - $message = $this->pushMiddlewareDispatcher - ->dispatch($request, $this->createPushHandler(...$middlewareDefinitions)) - ->getMessage(); + $request = $this->pushMiddlewareDispatcher + ->dispatch($request, $this->createPushHandler(...$middlewareDefinitions)); + $message = $request->getMessage(); /** @var string $messageId */ $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null'; @@ -69,6 +70,10 @@ public function push( ['handlerName' => $message->getHandlerName(), 'id' => $messageId], ); + if ($request->getAdapter() instanceof ImmediateProcessingAdapterInterface) { + $this->handle($message); + } + return $message; }