thinkPhp使用框架自带队列think-queue
首先讲解一下何为异步消息队列:
所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。
异步队列的作用:
个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列
转载:https://zhuanlan.zhihu.com/p/129383173
--------------------------------------------------------------------------------------------------------------
队列流程:一个消息的 创建 -> 推送 -> 消费 -> 删除
转载:https://github.com/coolseven/notes/blob/master/thinkphp-queue/README.md
1.安装队列依赖
由于框架版本原因可以选择适合的版本
composer require topthink/think-queue
由于我是tp框架5.1的,所以选择了think-queue 2.*
# Thinkphp5.1 composer require topthink/think-queue:2.* # Thinkphp6 composer require topthink/think-queue:3.*
判断安装成功
php think queue:work -h
2.配置文件
看了网上其他的一些帖子说配置文件在统一目录下/config/queue.php
但是,我这边没有生成,但是根据Queue.php源码可以看出,配置是在config.php文件中的一个键值对
// 文件路径 App/config/queue.php // 队列设置 'queue' => [ 'connector' => 'Redis', // 驱动方式 'expire' => 60, // 缓存有效期 'default' => "queue", // 如果未设置队列名称,默认队列名称 'host' => '127.0.0.1', // 主机地址 'port' => 6379, // 端口 'password' => '', // 密码 'select' => 0, // 默认选择第一个库 'timeout' => 0, // 超时 'persistent' => false, // 是否长连接 ],
3.在项目下新建一个Job目录,用来存放处理消息的类
4.控制器编写测试代码
<?php namespace app\http\controller; use app\http\Job\MsgPushJob; use think\Queue; class Index { /** * 投递消息(生产者) * @return string */ public function push(): string { // queue的 push方法 第一个参数可以接收字符或者对象字符串 $job = 'app\http\Job\MsgPushJob'; $queueName = 'test'; $data['msg'] = 'Test queue msg,time:' . date('Y-m-d H:i:s', time()); $data['user_id'] = 1; // $res = Queue::push(MsgPushJob::class, $data, $queueName); // 可以自动获取 $res = Queue::push($job, $data, $queueName); // 可以手动指定 if ($res == false) { return '消息投递失败'; } else { return '消息投递成功'; } } }
5.编写对应的消费者类
<?php namespace app\http\Job; use think\Db; use think\queue\Job; class MsgPushJob { /** * php think queue:listen --queue test * @param Job $job * @param $data * @return bool * @throws \think\db\exception\DataNotFoundException * @throws \think\db\exception\ModelNotFoundException * @throws \think\exception\DbException */ public function fire(Job $job, $data): bool { // 验证消息 只处理user_id = 1的值 $is_exit = $this->checkMsg($data); if ($is_exit) { try { // 这里是处理消息的逻辑 $res = Db::name('test')->where('id', $data['user_id'])->update(['age' => 10]); if (!$res) return false; $job->delete(); } catch (\Exception $exception) { if ($job->attempts() > 3) { // 如果消息处理失败次数大于 3 次 // 1.可以把失败的消息放入队列重新消费 // 2.延迟消息执行 // 3.删除消息 } } } $job->delete(); return false; } /** * 是否需要消费 * @param $data * @return bool */ public function checkMsg($data): bool { // 判断消息到达这里的时候,是否还需要继续处理 if ($data['user_id'] == 1) return true; return false; } }
6.测试消息投递
数据表默认数据
启动队列监听,对应的参数可以查阅相关文档
php think queue:listen --queue test
访问控制器接口的时候回来窗口下打印出对应消息者的地址
消息投递成功后,会在redis中生成一条数据(list数据类型),可以在redis中查看
成功消费后数据库的数据
7消息在linux上以守护进程方式运行
生成 test 文件 mknod test c 1 3 nohup php think queue:work --daemon --queue test--tries 2 > /dev/test 2>&1 &
版权声明:
作者:admin
链接:http://blog.mryxh.cn/1866.html
文章版权归作者所有,未经允许请勿转载。
THE END