Message consumer is an object that is used for receiving messages sent to a destination (processor or job).
You need to configure $transport and $config to read and processing messages from the queue. Detailed information.
You can use a simple php cli, Symfony/Console or any other component, it really doesn't matter. The main idea is to run Consumer in a separate process in the background.
Use your imagination to handling your messages.
Processor is responsible for processing consumed messages.
<?php
declare(strict_types=1);
include __DIR__ . '/../vendor/autoload.php';
$producer = new \Simple\Queue\Producer($transport, $config);
$consumer = new \Simple\Queue\Consumer($transport, $producer, $config);
echo 'Start consuming' . PHP_EOL;
$consumer->consume();
You can create and run a sub job, which it is executed separately. You can create as many sub jobs as you like.
Also, the job can be from another job. All jobs must be registered in the configuration.
The method $consumer->consume()
will process jobs with priority.
Job example:
<?php
declare(strict_types=1);
namespace Application\Job;
class SendInfoToTelegramJob extends \Simple\Queue\Job
{
public function queue(): string
{
return 'notification';
}
public function attempts(): int
{
return 5;
}
public function handle(\Simple\Queue\Context $context): string
{
// your logic
return \Simple\Queue\Consumer::STATUS_ACK;
}
}
$consumer->consume();
- base realization of consumer.
If the message table does not exist, it will be created.
Next, will start endless loop while(true)
to get the next message from the queue.
if there are no messages, there will be a sustained seconds pause.
When the message is received, it will be processed. Job has priority over the processor.
If an uncaught error occurs, it will be caught and increment first processing attempt.
After several unsuccessful attempts, the message will status \Simple\Queue\Status::FAILURE
.
If there are no handlers for the message, the message will status \Simple\Queue\Status::UNDEFINED_HANDLER
.
Messages are processed with statuses:
\Simple\Queue\Status::NEW
and\Simple\Queue\Status::REDELIVERED
You can configure message handling yourself.
<?php
declare(strict_types=1);
include __DIR__ . '/../vendor/autoload.php';
$producer = new \Simple\Queue\Producer($transport, $config);
$consumer = new \Simple\Queue\Consumer($transport, $producer, $config);
// create table for queue messages
$transport->init();
echo 'Start consuming' . PHP_EOL;
while (true) {
if ($message = $transport->fetchMessage(['my_queue'])) {
// Your message handling logic
$consumer->acknowledge($message);
echo sprintf('Received message: %s ', $message->getBody());
echo PHP_EOL;
}
}
If you use jobs or processors when processing a message, you must return the appropriate status:
-
ACK -
\Simple\Queue\Consumer::STATUS_ACK
- message has been successfully processed and will be removed from the queue. -
REJECT -
\Simple\Queue\Consumer::STATUS_REJECT
- message has not been processed but is no longer required. -
REQUEUE -
\Simple\Queue\Consumer::STATUS_REQUEUE
- message has not been processed, it is necessary redelivered.
A consumer can be run in the background in several ways:
Configure your consume.php and run the command
exec php /path/to/folder/example/consume.php > /dev/null &
the result of a successful launch of the command will be the process code, for example:
[1] 97285
use this to get detailed information about the process.
ps -l 97285
This command will allow your docker container to run in the background:
docker exec -t -d you-container-name sh -c "php ./path/to/consume.php"
Сonfigure your supervisor config file /etc/supervisor/conf.d/consume.conf
[program:consume]
command=/usr/bin/php /path/to/folder/example/consume.php -DFOREGROUND
directory=/path/to/folder/example/
autostart=true
autorestart=true
startretries=5
user=root
numprocs=1
startsecs=0
process_name=%(program_name)s_%(process_num)02d
stderr_logfile=/path/to/folder/example/%(program_name)s_stderr.log
stderr_logfile_maxbytes=10MB
stdout_logfile=/path/to/folder/example/%(program_name)s_stdout.log
stdout_logfile_maxbytes=10MB
Let supervisor read our config file /etc/supervisor/conf.d/consume.conf
to start our service/script.
$ sudo supervisorctl reread
consume: available
Let supervisor start our service/script /path/to/folder/example/consume.php
based on the config we prepared above.
This will automatically create log files /path/to/folder/example/consume_stderr.log
and
/path/to/folder/example/consume_stdout.log
.
$ sudo supervisorctl update
consume: added process group
Lets check if the process is running.
$ ps aux | grep consume
root 17443 0.0 0.4 194836 9844 ? S 19:41 0:00 /usr/bin/php /path/to/folder/example/consume.php