Programmation PHP/RabbitMQ
RabbitMQ est un logiciel de messages en protocole AMQP. Il permet donc à des processus de produire des messages JSON dans des files d'attente pour que d'autres les consomme ensuite[1].
Installation
modifierClient PHP
modifiercomposer require php-amqplib/php-amqplib
Serveur
modifierL'installation du serveur est multi-plateforme. Sous Linux[2] :
apt-get install rabbitmq-server
Test de fonctionnement :
telnet localhost 5672
Site de gestion
modifierUne interface graphique existe pour lire et manipuler les messages manuellement, c'est le management plugin[3]. Pour l'activer :
/usr/sbin/rabbitmq-plugins enable rabbitmq_management
Test de fonctionnement depuis le serveur :
curl localhost:15672
Depuis le client : http://mon_serveur:15672
On la trouve aussi sur Docker[4].
Pour trouver le fichier de configuration :
cat /usr/sbin/rabbitmq-server | grep RABBITMQ_ENV
Connexion
modifierLes identifiants par défaut de RabbitMQ dépendent des versions. On trouve soit le login / mot de passe "user / password", soit "guest / guest". Pour tester :
curl -i -u guest:guest http://localhost:15672/api/whoami
Si cela ne fonctionne pas, configurer le serveur avec rabbitmqctl
. Exemple sous Linux :
/usr/sbin/rabbitmqctl add_user userDev mon_mot_de_passe
/usr/sbin/rabbitmqctl set_permissions -p / userDev '.*' '.*' '.*'
/usr/sbin/rabbitmqctl set_user_tags userDev management
/usr/sbin/rabbitmqctl list_users
Connexion PHP
modifier$connection = new AMQPStreamConnection($host, $port, $login, $password);
...
$connection->close();
Création de queue et routage
modifierPour créer une queue simple prête à recevoir des messages :
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue1', false, false, false, false);
Exchange
modifierImage externe | |
---|---|
Schéma des différents types de routage RabbitMQ sur le site : (en) Jyoti Sachdeva, « Getting Started With RabbitMQ: Python », |
Une autre manière de poster des messages est en passant par un exchange. On en distingue plusieurs types[5] :
- direct : une seule queue recevra le message (patron de conception producteur/consommateur).
- fanout : toutes les queues liée à l’exchange recevront le message (patron de conception producteur/abonné).
- topic : les queues de l’exchange inscrites aux sujets concernés recevront le message (selon un motif dans la "routing key" où "*" représente un seul mot séparé par un point, et "#" au moins un)[6].
- headers : routage par en-tête de message plutôt que par "routing key".
Dans cet exemple, on rattache la queue à un exchange "Bus" :
$this->rabbitMqConnection->getChannel()->exchange_declare('Bus', 'fanout', false, true, false);
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue2', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue2', 'Bus');
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue3', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue3', 'Bus');
Exemple de topic : on ne publie pas dans la queue mais dans l’exchange qui leur routera ensuite le message.
$this->rabbitMqConnection->getChannel()->exchange_declare('Topic_bus', 'topic', false, false, false);
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue4', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue4', 'Topic_bus');
$this->rabbitMqConnection->getChannel()->queue_declare('Wikibooks.Queue5', false, true, false, false);
$this->rabbitMqConnection->getChannel()->queue_bind('Wikibooks.Queue5', 'Topic_bus');
QoS
modifierPour demander à RabbitMQ de ne pas surcharger les consommateurs d'une queue en leur répartissant les messages que s'ils ont terminé de traiter le précédent :
$this->rabbitMqConnection->getChannel()->basic_qos(null, 1, null);
DLX
modifierLe mode DLX (Dead Letter Exchanges) permet de transférer un message d'une queue dans un autre après un certain temps[7].
Production
modifier $amqpMessage = new AMQPMessage(json_encode('Hello World!'),
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
$this->rabbitMqConnection->getChannel()->basic_publish($amqpMessage, 'Bus', 'Wikibooks.Queue1');
Consommation
modifierPar défaut on consomme un seul message de la queue. Pour tous les lire un par un, utiliser basic_ack() après basic_consume().
$this->rabbitMqConnection->getChannel()->basic_consume(
'Wikibooks.Queue1',
gethostname() . '#' . rand(1, 9999),
false,
false,
false,
false,
[$this, 'consumeCallback']
);
while (count($this->rabbitMqConnection->getChannel()->callbacks)) {
$this->rabbitMqConnection->getChannel()->wait();
}
public function consumeCallback(?AMQPMessage $msg)
{
if (empty($msg)) {
return null;
}
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
var_dump(json_decode($msg->getBody()));
}
En mode "topic", on peut remplacer 'Wikibooks.Queue1' par 'Wikibooks.*' pour récupérer toutes les queues.
Références
modifier- ↑ https://www.rabbitmq.com/tutorials/tutorial-one-php.html
- ↑ https://www.rabbitmq.com/install-debian.html
- ↑ https://www.rabbitmq.com/management.html
- ↑ https://hub.docker.com/_/rabbitmq
- ↑ https://www.rabbitmq.com/tutorials/tutorial-three-php.html
- ↑ https://www.rabbitmq.com/tutorials/tutorial-five-php.html
- ↑ https://www.rabbitmq.com/dlx.html