百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术文章 > 正文

在PHP中如何使用RabbitMQ来实现消息的订阅和发布?

wxin55 2024-10-26 16:38 8 浏览 0 评论

本文将介绍在PHP中如何使用RabbitMQ来实现消息的订阅和发布。我使用的系统依然是Centos7,为了方便,应用服务器我使用Docker进行部署,容器环境:centos7+nginx+php5.6。

运行环境,安装AMQP扩展:

如何安装Docker我就不说了,网上很多教程非常简单,如果有现成的php环境可以直接使用。Docker中我使用的镜像名为webdevops/php-nginx,tag为:centos-7-php56。下载镜像:

(国际带宽出口不稳定,可能会下载失败,重试记次就好了)

docker pull webdevops/php-nginx:centos-7-php56 //下载镜像
docker run -d -p 80:80 --name rabbitmq webdevops/php-nginx:centos-7-php56 //运行容器
docker exec -ti rabbitmq /bin/bash //进入容器

进入到容器后检测下环境是否有相应扩展

cd app
vi index.php 

刚刚我们在运行容器的时候使用80端口,在浏览器中输入http://ip

搜索下没有amqp相关的信息。下面开始安装amqp扩展。

yum install gcc librabbitmq-devel.x86_64 php56w-devel
-y
wget http://pecl.php.net/get/amqp-1.4.0.tgz
tar -zxvf amqp-1.4.0.tgz
cd amqp-1.4.0
phpize
./configure --with-amqp
make && make install

在php.ini中开启extension=amqp.so 接着重启php-fpm 或 Web服务器

vi /etc/php.ini
extension=amqp.so

我这里就直接重启容器了,如果是宿主机直接安装php环境直接重启环境。

exit //退出容器
docker restart rabbitmq //重启容器

再查看phpinfo,amqp扩展已经安装好了:

publish发布消息

在/app路径下新建一个publish.php的文件

touch publish.php
vi publish.php

以下是PHP代码,我们先定义好用来发消息的交换机、队列、RoutingKey、消息等变量。

$queueName = 'superrd';
$exchangeName = 'superrd';
$routeKey = 'superrd';
$message = 'Hello World!';

按照我们第二章讲到的首先建立一个连接。

$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");

新建一个信道。

$channel = new AMQPChannel($connection);

新建一个交换机Exchange,并定义属性,第二章我们讲过有四种类型的交换机,这里使用直连型DIRECT。AMQP_DURABLE代表这是一个持久化的交换机,不会以为服务器异常等因素丢失。

$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

新建一个队列Queue,前面也讲过生产者将消息发送到Exchange中,Exchange会根据绑定关系投递到队列,也就是如果生产者在生产消息时没有队列与之绑定消息就会丢失。为了保证系统更加健硕,一般无论是消息的生产者还是消费者都会新建一遍Exchange和Queue,新建后属性不会改变。同样AMQP_DURABLE代表这是一个持久化的队列,队列会被写入磁盘。需要注意的是虽然消息是缓存在队列中,但是并不是队列是持久化的队列队列中的消息就是持久化的,消息的持久化需要单独设置。

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();

通过routeKey绑定交换机和队列。

$queue->bind($exchangeName, $routeKey);

好了,下面可以发送消息了

$exchange->publish($message,$routeKey);

如果你希望消息也是持久化的可以使用如下的代码,实际测试结果在持久化消息后消息发布的性能下降一倍,我的磁盘是pcie的固态硬盘,如果你是机械磁盘这个性能下降估计会更明显,24核心CPU,48GB内存,pcie固态硬盘,单线程的情况下每秒可以发布2.5万左右的非持久化消息,持久化之后变为变为1.2万左右。

$exchange->publish($message,$routeKey,AMQP_NOPARAM, array('delivery_mode'=>2));

断开连接。

$connection->disconnect();

同样在发布消息之后可以通过WEB工具来查看是否发布成功,

查看交换机多了一个superid交换机。

查看交换机已经有superrd队列。

点击队列查看队列详情。Bindings标签可以看到交换机和队列的绑定关系。

点击Get messages标签Get message(s)按钮可以看到队列中的消息。

到此说明我们已经将一个消息发布到了消息队列中。完整的PHP代码如下。

'10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");
try {
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);
$exchange->publish($message,$routeKey);
var_dump("[x] Sent 'Hello World!'");
} catch (AMQPConnectionException $e) {
var_dump($e);
exit();
}
$connection->disconnect();

Subscribe订阅消息

在/app路径下新建一个subscribe.php的文件

touch subscribe.php
vi subscribe.php

以下是PHP代码,和发布消息一样我们先定义好用交换机、队列、RoutingKey等变量。

$queueName = 'superrd';
$exchangeName = 'superrd';
$routeKey = 'superrd';

按照我们第二章讲到的首先建立一个连接。

$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");

新建一个信道。

$channel = new AMQPChannel($connection);

与发布消息一样新建交换机。

$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

新建一个队列Queue。

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();

通过routeKey绑定交换机和队列。

$queue->bind($exchangeName, $routeKey);

重点来了,阻塞订阅消息。

//阻塞模式接收消息
echo "Message:\n";
while(True){
$queue->consume('processMessage');
//自动ACK应答
//$queue->consume('processMessage', AMQP_AUTOACK);
}
$conn->disconnect();
/*
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $q) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$q->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

注意因为是阻塞监听,因为输出缓冲区的原因用浏览器访问该文件是看不到输出的。使用脚本访问。

php /app/subscribe.php

通过WEB工具查看队列。superrd队列中的消息数已经为0。

完整的PHP代码如下。

'10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");
$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();
$queue->bind($exchangeName, $routeKey);
//阻塞模式接收消息
echo "Message:\n";
while(True){
$queue->consume('processMessage');
//自动ACK应答
//$queue->consume('processMessage', AMQP_AUTOACK);
}
$conn->disconnect();
/*
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $q) {
$msg = $envelope->getBody();
echo $msg."\n"; //处理消息
$q->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

相关推荐

ES6中 Promise的使用场景?(es6promise用法例子)

一、介绍Promise,译为承诺,是异步编程的一种解决方案,比传统的解决方案(回调函数)更加合理和更加强大在以往我们如果处理多层异步操作,我们往往会像下面那样编写我们的代码doSomething(f...

JavaScript 对 Promise 并发的处理方法

Promise对象代表一个未来的值,它有三种状态:pending待定,这是Promise的初始状态,它可能成功,也可能失败,前途未卜fulfilled已完成,这是一种成功的状态,此时可以获取...

Promise的九大方法(promise的实例方法)

1、promise.resolv静态方法Promise.resolve(value)可以认为是newPromise方法的语法糖,比如Promise.resolve(42)可以认为是以下代码的语...

360前端一面~面试题解析(360前端开发面试题)

1.组件库按需加载怎么做的,具体打包配了什么-按需加载实现:借助打包工具(如Webpack的require.context或ES模块动态导入),在使用组件时才引入对应的代码。例如在V...

前端面试-Promise 的 finally 怎么实现的?如何在工作中使用?

Promise的finally方法是一个非常有用的工具,它无论Promise是成功(fulfilled)还是失败(rejected)都会执行,且不改变Promise的最终结果。它的实现原...

最简单手写Promise,30行代码理解Promise核心原理和发布订阅模式

看了全网手写Promise的,大部分对于新手还是比较难理解的,其中几个比较难的点:状态还未改变时通过发布订阅模式去收集事件实例化的时候通过调用构造函数里传出来的方法去修改类里面的状态,这个叫Re...

前端分享-Promise可以中途取消啦(promise可以取消吗)

传统Promise就像一台需要手动组装的设备,每次使用都要重新接线。而Promise.withResolvers的出现,相当于给开发者发了一个智能遥控器,可以随时随地控制异步操作。它解决了三大...

手写 Promise(手写输入法 中文)

前言都2020年了,Promise大家肯定都在用了,但是估计很多人对其原理还是一知半解,今天就让我们一起实现一个符合PromiseA+规范的Promise。附PromiseA+规范地址...

什么是 Promise.allSettled()!新手老手都要会?

Promise.allSettled()方法返回一个在所有给定的promise都已经fulfilled或rejected后的promise,并带有一个对象数组,每个对象表示对应的pr...

前端面试-关于Promise解析与高频面试题示范

Promise是啥,直接上图:Promise就是处理异步函数的API,它可以包裹一个异步函数,在异步函数完成时抛出完成状态,让代码结束远古时无限回掉的窘境。配合async/await语法糖,可...

宇宙厂:为什么前端离不开 Promise.withResolvers() ?

大家好,很高兴又见面了,我是"高级前端进阶",由我带着大家一起关注前端前沿、深入前端底层技术,大家一起进步,也欢迎大家关注、点赞、收藏、转发。1.为什么需要Promise.with...

Promise 新增了一个超实用的 API!

在JavaScript的世界里,Promise一直是处理异步操作的神器。而现在,随着ES2025的发布,Promise又迎来了一个超实用的新成员——Promise.try()!这个新方法简...

一次搞懂 Promise 异步处理(promise 异步顺序执行)

PromisePromise就像这个词的表面意识一样,表示一种承诺、许诺,会在后面给出一个结果,成功或者失败。现在已经成为了主流的异步编程的操作方式,写进了标准里面。状态Promise有且仅有...

Promise 核心机制详解(promise机制的实现原理)

一、Promise的核心状态机Promise本质上是一个状态机,其行为由内部状态严格管控。每个Promise实例在创建时处于Pending(等待)状态,此时异步操作尚未完成。当异步操作成功...

javascript——Promise(js实现promise)

1.PromiseES6开始支持,Promise对象用于一个异步操作的最终完成(包括成功和失败)及结果值的表示。简单说就是处理异步请求的。之所以叫Promise,就是我承诺,如果成功则怎么处理,失败怎...

取消回复欢迎 发表评论: