diff --git a/composer.json b/composer.json index 4017e49..5fb217a 100644 --- a/composer.json +++ b/composer.json @@ -15,7 +15,9 @@ "psr-4": {"Utopia\\Queue\\": "src/Queue"} }, "autoload-dev": { - "psr-4": {"Tests\\E2E\\": "tests/Queue/E2E"} + "psr-4": { + "Tests\\E2E\\": "tests/Queue/E2E" + } }, "scripts":{ "test": "phpunit", diff --git a/src/Queue/Broker/AMQP.php b/src/Queue/Broker/AMQP.php index 62b2774..80e10da 100644 --- a/src/Queue/Broker/AMQP.php +++ b/src/Queue/Broker/AMQP.php @@ -135,7 +135,7 @@ public function close(): void $this->channel?->getConnection()?->close(); } - public function enqueue(Queue $queue, array $payload): bool + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { $payload = [ 'pid' => \uniqid(more_entropy: true), diff --git a/src/Queue/Broker/Pool.php b/src/Queue/Broker/Pool.php index aa7cf92..5fcdcc7 100644 --- a/src/Queue/Broker/Pool.php +++ b/src/Queue/Broker/Pool.php @@ -15,7 +15,7 @@ public function __construct( ) { } - public function enqueue(Queue $queue, array $payload): bool + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { return $this->delegatePublish(__FUNCTION__, \func_get_args()); } diff --git a/src/Queue/Broker/Redis.php b/src/Queue/Broker/Redis.php index e036147..b36e1b3 100644 --- a/src/Queue/Broker/Redis.php +++ b/src/Queue/Broker/Redis.php @@ -104,7 +104,7 @@ public function close(): void $this->closed = true; } - public function enqueue(Queue $queue, array $payload): bool + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool { $payload = [ 'pid' => \uniqid(more_entropy: true), @@ -112,6 +112,9 @@ public function enqueue(Queue $queue, array $payload): bool 'timestamp' => time(), 'payload' => $payload ]; + if ($priority) { + return $this->connection->rightPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); + } return $this->connection->leftPushArray("{$queue->namespace}.queue.{$queue->name}", $payload); } diff --git a/src/Queue/Publisher.php b/src/Queue/Publisher.php index 1778656..9ccda90 100644 --- a/src/Queue/Publisher.php +++ b/src/Queue/Publisher.php @@ -11,7 +11,7 @@ interface Publisher * @param array $payload * @return bool */ - public function enqueue(Queue $queue, array $payload): bool; + public function enqueue(Queue $queue, array $payload, bool $priority = false): bool; /** * Retries failed jobs. diff --git a/tests/Queue/E2E/Adapter/Base.php b/tests/Queue/E2E/Adapter/Base.php index e507d0d..12ab6fd 100644 --- a/tests/Queue/E2E/Adapter/Base.php +++ b/tests/Queue/E2E/Adapter/Base.php @@ -86,6 +86,15 @@ public function testConcurrency(): void }); } + public function testEnqueuePriority(): void + { + $publisher = $this->getPublisher(); + + $result = $publisher->enqueue($this->getQueue(), ['type' => 'test_string', 'value' => 'priority'], priority: true); + + $this->assertTrue($result); + } + /** * @depends testEvents */ diff --git a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php index b1a744c..d7a6e2e 100644 --- a/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php +++ b/tests/Queue/E2E/Adapter/SwooleRedisClusterTest.php @@ -9,14 +9,59 @@ class SwooleRedisClusterTest extends Base { + private function getConnection(): RedisCluster + { + return new RedisCluster([ + 'redis-cluster-0:6379', + 'redis-cluster-1:6379', + 'redis-cluster-2:6379', + ]); + } + protected function getPublisher(): Publisher { - $connection = new RedisCluster(['redis-cluster-0:6379', 'redis-cluster-1:6379', 'redis-cluster-2:6379']); - return new Redis($connection); + return new Redis($this->getConnection()); } protected function getQueue(): Queue { return new Queue('swoole-redis-cluster'); } + + public function testPriorityJobIsConsumedBeforeNormalJobs(): void + { + $connection = $this->getConnection(); + $key = "{$this->getQueue()->namespace}.queue.{$this->getQueue()->name}"; + + // Flush any leftover state from previous runs. + while ($connection->rightPopArray($key, 1) !== false) { + // drain + } + + // Enqueue three normal jobs (pushed to head/left). + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-1']); + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-2']); + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-3']); + + // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'priority'], priority: true); + + // The first pop should yield the priority job. + $first = $connection->rightPopArray($key, 1); + $this->assertNotFalse($first, 'Expected a job but queue was empty'); + $this->assertSame('priority', $first['payload']['order'], 'Priority job should be consumed first'); + + // The remaining three should be normal jobs (consumed oldest-first). + $second = $connection->rightPopArray($key, 1); + $this->assertSame('normal-1', $second['payload']['order']); + + $third = $connection->rightPopArray($key, 1); + $this->assertSame('normal-2', $third['payload']['order']); + + $fourth = $connection->rightPopArray($key, 1); + $this->assertSame('normal-3', $fourth['payload']['order']); + + // Queue should now be empty. + $this->assertFalse($connection->rightPopArray($key, 1)); + } } diff --git a/tests/Queue/E2E/Adapter/SwooleTest.php b/tests/Queue/E2E/Adapter/SwooleTest.php index 9a3f183..641b1c3 100644 --- a/tests/Queue/E2E/Adapter/SwooleTest.php +++ b/tests/Queue/E2E/Adapter/SwooleTest.php @@ -9,14 +9,55 @@ class SwooleTest extends Base { + private function getConnection(): Redis + { + return new Redis('redis', 6379); + } + protected function getPublisher(): Publisher { - $connection = new Redis('redis', 6379); - return new RedisBroker($connection); + return new RedisBroker($this->getConnection()); } protected function getQueue(): Queue { return new Queue('swoole'); } + + public function testPriorityJobIsConsumedBeforeNormalJobs(): void + { + $connection = $this->getConnection(); + $key = "{$this->getQueue()->namespace}.queue.{$this->getQueue()->name}"; + + // Flush any leftover state from previous runs. + while ($connection->rightPopArray($key, 1) !== false) { + // drain + } + + // Enqueue three normal jobs (pushed to head/left). + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-1']); + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-2']); + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'normal-3']); + + // Enqueue one priority job (pushed to tail/right — same end BRPOP reads from). + $this->getPublisher()->enqueue($this->getQueue(), ['order' => 'priority'], priority: true); + + // The first pop should yield the priority job. + $first = $connection->rightPopArray($key, 1); + $this->assertNotFalse($first, 'Expected a job but queue was empty'); + $this->assertSame('priority', $first['payload']['order'], 'Priority job should be consumed first'); + + // The remaining three should be normal jobs (consumed oldest-first). + $second = $connection->rightPopArray($key, 1); + $this->assertSame('normal-1', $second['payload']['order']); + + $third = $connection->rightPopArray($key, 1); + $this->assertSame('normal-2', $third['payload']['order']); + + $fourth = $connection->rightPopArray($key, 1); + $this->assertSame('normal-3', $fourth['payload']['order']); + + // Queue should now be empty. + $this->assertFalse($connection->rightPopArray($key, 1)); + } }