Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/Middleware/Push/AdapterPushHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@

namespace Yiisoft\Queue\Middleware\Push;

use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Message\MessageInterface;

/**
* @internal
*/
final class AdapterPushHandler implements MessageHandlerPushInterface
{
public function handlePush(PushRequest $request): PushRequest
public function __construct(
private readonly AdapterInterface $adapter,
) {}

public function handlePush(MessageInterface $message): MessageInterface
{
return $request->withMessage(
$request->getAdapter()->push(
$request->getMessage(),
),
);
return $this->adapter->push($message);
}
}
12 changes: 6 additions & 6 deletions src/Middleware/Push/Implementation/IdMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@
namespace Yiisoft\Queue\Middleware\Push\Implementation;

use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Queue\Middleware\Push\PushRequest;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;

/**
* A middleware for message ID setting.
*/
final class IdMiddleware implements MiddlewarePushInterface
{
public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
public function processPush(MessageInterface $message, MessageHandlerPushInterface $handler): MessageInterface
{
$meta = $request->getMessage()->getMetadata();
$meta = $message->getMetadata();
if (empty($meta[IdEnvelope::MESSAGE_ID_KEY])) {
$request = $request->withMessage(new IdEnvelope($request->getMessage(), uniqid('yii3-message-', true)));
$message = new IdEnvelope($message, uniqid('yii3-message-', true));
}

return $handler->handlePush($request);
return $handler->handlePush($message);
}
}
4 changes: 3 additions & 1 deletion src/Middleware/Push/MessageHandlerPushInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

namespace Yiisoft\Queue\Middleware\Push;

use Yiisoft\Queue\Message\MessageInterface;

interface MessageHandlerPushInterface
{
public function handlePush(PushRequest $request): PushRequest;
public function handlePush(MessageInterface $message): MessageInterface;
}
17 changes: 9 additions & 8 deletions src/Middleware/Push/MiddlewareFactoryPush.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Yiisoft\Definitions\Exception\InvalidConfigException;
use Yiisoft\Definitions\Helpers\DefinitionValidator;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\CallableFactory;
use Yiisoft\Queue\Middleware\InvalidCallableConfigurationException;
use Yiisoft\Queue\Middleware\InvalidMiddlewareDefinitionException;
Expand All @@ -36,16 +37,16 @@ public function __construct(
*
* - A middleware object.
* - A name of a middleware class. The middleware instance will be obtained from container and executed.
* - A callable with `function(ServerRequestInterface $request, RequestHandlerInterface $handler):
* ResponseInterface` signature.
* - A callable with `function(MessageInterface $message, MessageHandlerPushInterface $handler):
* MessageInterface` signature.
* - A controller handler action in format `[TestController::class, 'index']`. `TestController` instance will
* be created and `index()` method will be executed.
* - A function returning a middleware. The middleware returned will be executed.
*
* For handler action and callable
* typed parameters are automatically injected using dependency injection container.
* Current request and handler could be obtained by type-hinting for {@see ServerRequestInterface}
* and {@see RequestHandlerInterface}.
* Current message and handler could be obtained by type-hinting for {@see MessageInterface}
* and {@see MessageHandlerPushInterface}.
*
* @throws InvalidMiddlewareDefinitionException
*
Expand Down Expand Up @@ -96,15 +97,15 @@ public function __construct(
$this->callback = $callback;
}

public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
public function processPush(MessageInterface $message, MessageHandlerPushInterface $handler): MessageInterface
{
$response = (new Injector($this->container))->invoke($this->callback, [$request, $handler]);
if ($response instanceof PushRequest) {
$response = (new Injector($this->container))->invoke($this->callback, [$message, $handler]);
if ($response instanceof MessageInterface) {
return $response;
}

if ($response instanceof MiddlewarePushInterface) {
return $response->processPush($request, $handler);
return $response->processPush($message, $handler);
}

throw new InvalidMiddlewareDefinitionException($this->callback);
Expand Down
4 changes: 3 additions & 1 deletion src/Middleware/Push/MiddlewarePushInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

namespace Yiisoft\Queue\Middleware\Push;

use Yiisoft\Queue\Message\MessageInterface;

interface MiddlewarePushInterface
{
public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest;
public function processPush(MessageInterface $message, MessageHandlerPushInterface $handler): MessageInterface;
}
9 changes: 5 additions & 4 deletions src/Middleware/Push/MiddlewarePushStack.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Yiisoft\Queue\Middleware\Push;

use Closure;
use Yiisoft\Queue\Message\MessageInterface;

final class MiddlewarePushStack implements MessageHandlerPushInterface
{
Expand All @@ -26,14 +27,14 @@ public function __construct(
private readonly MessageHandlerPushInterface $finishHandler,
) {}

public function handlePush(PushRequest $request): PushRequest
public function handlePush(MessageInterface $message): MessageInterface
{
if ($this->stack === null) {
$this->build();
}

/** @psalm-suppress PossiblyNullReference */
return $this->stack->handlePush($request);
return $this->stack->handlePush($message);
}

private function build(): void
Expand All @@ -60,13 +61,13 @@ public function __construct(
private readonly MessageHandlerPushInterface $handler,
) {}

public function handlePush(PushRequest $request): PushRequest
public function handlePush(MessageInterface $message): MessageInterface
{
if ($this->middleware === null) {
$this->middleware = ($this->middlewareFactory)();
}

return $this->middleware->processPush($request, $this->handler);
return $this->middleware->processPush($message, $this->handler);
}
};
}
Expand Down
11 changes: 6 additions & 5 deletions src/Middleware/Push/PushMiddlewareDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Yiisoft\Queue\Middleware\Push;

use Closure;
use Yiisoft\Queue\Message\MessageInterface;

final class PushMiddlewareDispatcher
{
Expand All @@ -27,20 +28,20 @@ public function __construct(
}

/**
* Dispatch request through middleware to get response.
* Dispatch message through middleware to get response.
*
* @param PushRequest $request Request to pass to middleware.
* @param MessageInterface $message Message to pass to middleware.
* @param MessageHandlerPushInterface $finishHandler Handler to use in case no middleware produced a response.
*/
public function dispatch(
PushRequest $request,
MessageInterface $message,
MessageHandlerPushInterface $finishHandler,
): PushRequest {
): MessageInterface {
if ($this->stack === null) {
$this->stack = new MiddlewarePushStack($this->buildMiddlewares(), $finishHandler);
}

return $this->stack->handlePush($request);
return $this->stack->handlePush($message);
}
Comment thread
vjik marked this conversation as resolved.

/**
Expand Down
42 changes: 0 additions & 42 deletions src/Middleware/Push/PushRequest.php

This file was deleted.

15 changes: 7 additions & 8 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Push\PushRequest;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Provider\QueueProviderInterface;
Expand All @@ -38,7 +37,7 @@ public function __construct(
) {
$this->name = StringNormalizer::normalize($name);
$this->middlewareDefinitions = $middlewareDefinitions;
$this->adapterPushHandler = new AdapterPushHandler();
$this->adapterPushHandler = new AdapterPushHandler($this->adapter);
}

public function getName(): string
Expand All @@ -55,10 +54,10 @@ public function push(
['handlerName' => $message->getHandlerName()],
);

$request = new PushRequest($message, $this->adapter);
$message = $this->pushMiddlewareDispatcher
->dispatch($request, $this->createPushHandler(...$middlewareDefinitions))
->getMessage();
$message = $this->pushMiddlewareDispatcher->dispatch(
Comment thread
vjik marked this conversation as resolved.
$message,
$this->createPushHandler(...$middlewareDefinitions),
);

/** @var string $messageId */
$messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
Expand Down Expand Up @@ -145,11 +144,11 @@ public function __construct(
private readonly array $middlewares,
) {}

public function handlePush(PushRequest $request): PushRequest
public function handlePush(MessageInterface $message): MessageInterface
{
return $this->dispatcher
->withMiddlewares($this->middlewares)
->dispatch($request, $this->adapterPushHandler);
->dispatch($message, $this->adapterPushHandler);
}
};
}
Expand Down
8 changes: 3 additions & 5 deletions tests/Integration/Support/TestMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,23 @@
namespace Yiisoft\Queue\Tests\Integration\Support;

use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Consume\ConsumeRequest;
use Yiisoft\Queue\Middleware\Consume\MessageHandlerConsumeInterface;
use Yiisoft\Queue\Middleware\Consume\MiddlewareConsumeInterface;
use Yiisoft\Queue\Middleware\Push\MessageHandlerPushInterface;
use Yiisoft\Queue\Middleware\Push\MiddlewarePushInterface;
use Yiisoft\Queue\Middleware\Push\PushRequest;

final class TestMiddleware implements MiddlewarePushInterface, MiddlewareConsumeInterface
{
public function __construct(private readonly string $stage) {}

public function processPush(PushRequest $request, MessageHandlerPushInterface $handler): PushRequest
public function processPush(MessageInterface $message, MessageHandlerPushInterface $handler): MessageInterface
{
$message = $request->getMessage();
$stack = $message->getData();
$stack[] = $this->stage;
$messageNew = new Message($message->getHandlerName(), $stack);

return $handler->handlePush($request->withMessage($messageNew));
return $handler->handlePush(new Message($message->getHandlerName(), $stack));
}

public function processConsume(ConsumeRequest $request, MessageHandlerConsumeInterface $handler): ConsumeRequest
Expand Down
8 changes: 3 additions & 5 deletions tests/Unit/Middleware/Push/AdapterPushHandlerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,19 @@
use PHPUnit\Framework\TestCase;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Middleware\Push\AdapterPushHandler;
use Yiisoft\Queue\Middleware\Push\PushRequest;
use Yiisoft\Queue\Tests\App\FakeAdapter;

final class AdapterPushHandlerTest extends TestCase
{
public function testHandlePushUsesAdapter(): void
{
$handler = new AdapterPushHandler();
$adapter = new FakeAdapter();
$handler = new AdapterPushHandler($adapter);
$message = new Message('handler', 'data');
$request = new PushRequest($message, $adapter);

$result = $handler->handlePush($request);
$result = $handler->handlePush($message);

self::assertSame($message, $result->getMessage());
self::assertSame($message, $result);
self::assertSame([$message], $adapter->pushMessages);
}
}
Loading
Loading