-
-
Notifications
You must be signed in to change notification settings - Fork 27
Marker interface example #267
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| <?php | ||
|
|
||
| declare(strict_types=1); | ||
|
|
||
| namespace Yiisoft\Queue\Adapter; | ||
|
|
||
| /** | ||
| * Marker for adapters whose messages must be handled in the same process right after push. | ||
| */ | ||
| interface ImmediateProcessingAdapterInterface {} |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,39 +7,15 @@ | |
| use InvalidArgumentException; | ||
| use Yiisoft\Queue\MessageStatus; | ||
| use Yiisoft\Queue\Message\MessageInterface; | ||
| use Yiisoft\Queue\QueueInterface; | ||
| use Yiisoft\Queue\Worker\WorkerInterface; | ||
| use Yiisoft\Queue\Message\IdEnvelope; | ||
|
|
||
| use function count; | ||
|
|
||
| final class SynchronousAdapter implements AdapterInterface | ||
| final class SynchronousAdapter implements AdapterInterface, ImmediateProcessingAdapterInterface | ||
| { | ||
| private array $messages = []; | ||
| private int $current = 0; | ||
|
|
||
| public function __construct( | ||
| private readonly WorkerInterface $worker, | ||
| private readonly QueueInterface $queue, | ||
| ) {} | ||
|
|
||
| public function __destruct() | ||
| { | ||
| $this->runExisting(function (MessageInterface $message): bool { | ||
| $this->worker->process($message, $this->queue); | ||
|
|
||
| return true; | ||
| }); | ||
| } | ||
| private int $processed = 0; | ||
|
|
||
| public function runExisting(callable $handlerCallback): void | ||
| { | ||
| $result = true; | ||
| while (isset($this->messages[$this->current]) && $result === true) { | ||
| $result = $handlerCallback($this->messages[$this->current]); | ||
| unset($this->messages[$this->current]); | ||
| $this->current++; | ||
| } | ||
| // Messages are handled immediately in Queue::push(). | ||
| } | ||
|
|
||
| public function status(string|int $id): MessageStatus | ||
|
|
@@ -50,21 +26,17 @@ public function status(string|int $id): MessageStatus | |
| throw new InvalidArgumentException('This adapter IDs start with 0.'); | ||
| } | ||
|
|
||
| if ($id < $this->current) { | ||
| if ($id < $this->processed) { | ||
| return MessageStatus::DONE; | ||
| } | ||
|
|
||
| if (isset($this->messages[$id])) { | ||
| return MessageStatus::WAITING; | ||
| } | ||
|
|
||
| throw new InvalidArgumentException('There is no message with the given ID.'); | ||
| } | ||
|
|
||
| public function push(MessageInterface $message): MessageInterface | ||
| { | ||
| $key = count($this->messages) + $this->current; | ||
| $this->messages[] = $message; | ||
| $key = $this->processed; | ||
| $this->processed++; | ||
|
|
||
| return new IdEnvelope($message, $key); | ||
|
Comment on lines
36
to
41
|
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,7 @@ | |||||||||||||||||
| use BackedEnum; | ||||||||||||||||||
| use Psr\Log\LoggerInterface; | ||||||||||||||||||
| use Yiisoft\Queue\Adapter\AdapterInterface; | ||||||||||||||||||
| use Yiisoft\Queue\Adapter\ImmediateProcessingAdapterInterface; | ||||||||||||||||||
| use Yiisoft\Queue\Cli\LoopInterface; | ||||||||||||||||||
| use Yiisoft\Queue\Exception\AdapterConfiguration\AdapterNotConfiguredException; | ||||||||||||||||||
| use Yiisoft\Queue\Message\MessageInterface; | ||||||||||||||||||
|
|
@@ -58,9 +59,9 @@ public function push( | |||||||||||||||||
| ); | ||||||||||||||||||
|
|
||||||||||||||||||
| $request = new PushRequest($message, $this->adapter); | ||||||||||||||||||
| $message = $this->pushMiddlewareDispatcher | ||||||||||||||||||
| ->dispatch($request, $this->createPushHandler(...$middlewareDefinitions)) | ||||||||||||||||||
| ->getMessage(); | ||||||||||||||||||
| $request = $this->pushMiddlewareDispatcher | ||||||||||||||||||
| ->dispatch($request, $this->createPushHandler(...$middlewareDefinitions)); | ||||||||||||||||||
| $message = $request->getMessage(); | ||||||||||||||||||
|
|
||||||||||||||||||
| /** @var string $messageId */ | ||||||||||||||||||
| $messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null'; | ||||||||||||||||||
|
|
@@ -69,6 +70,10 @@ public function push( | |||||||||||||||||
| ['handlerName' => $message->getHandlerName(), 'id' => $messageId], | ||||||||||||||||||
| ); | ||||||||||||||||||
|
|
||||||||||||||||||
| if ($request->getAdapter() instanceof ImmediateProcessingAdapterInterface) { | ||||||||||||||||||
| $this->handle($message); | ||||||||||||||||||
|
||||||||||||||||||
| $this->handle($message); | |
| $queue = $this; | |
| if ($request->getAdapter() !== $this->adapter) { | |
| $queue = clone $this; | |
| $queue->adapter = $request->getAdapter(); | |
| } | |
| $queue->handle($message); |
Copilot
AI
Apr 18, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Immediate-processing path calls $this->handle($message), but handle() always calls $this->loop->canContinue(). Since the return value is ignored here, this adds unnecessary coupling/side effects (e.g., memory-limit checks) during push. Consider calling $this->worker->process($message, $queue) directly (or introducing a dedicated method) to avoid invoking the loop for producer-side immediate handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SynchronousAdapter’s constructor dependencies were removed and processing responsibility moved to Queue::push() via the new marker interface. This is a backward-incompatible change for any code instantiating SynchronousAdapter with (WorkerInterface, QueueInterface) as currently done in this repo’s tests/docs, and will cause runtime/compile errors unless all call sites are updated or a BC shim (e.g., optional constructor parameters or a named constructor) is kept.