简单模式 ¶
支持情况 ¶
驱动 | 是否支持 |
---|---|
redis | |
amqp | |
mqtt |
消费者 ¶
- 创建消费者
sh
php shiyun queue:simple XxCon
php shiyun queue:simple XxCon
- 代码参考
php
namespace addons\comA\moduleA\queue;
use shiyunQueue\drive\Job;
class Consumer1
{
/**
* 根据消息中的数据进行实际的业务处理...
* @param array $data
* @return bool
*/
// doJob的参数由放入任务方法中的data决定
// 执行逻辑
// 成功返回true,失败返回fasle,会再次加入队列。执行到最大错误后自动删除
public function onQueueMessage($message,Job $job): bool
{
$jobData = $message['data'] ?? [];
$msgData = $jobData['data'] ?? [];
//....这里执行具体的任务
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
}
// 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,
// 直到达到最大重试次数后失败后,执行failed方法
$job->delete();
// 也可以重新发布这个任务
$job->release($delay); //$delay为延迟时间
}
public function onQueueFailed($data){
// 任务达到最大重试次数后,失败了
}
}
namespace addons\comA\moduleA\queue;
use shiyunQueue\drive\Job;
class Consumer1
{
/**
* 根据消息中的数据进行实际的业务处理...
* @param array $data
* @return bool
*/
// doJob的参数由放入任务方法中的data决定
// 执行逻辑
// 成功返回true,失败返回fasle,会再次加入队列。执行到最大错误后自动删除
public function onQueueMessage($message,Job $job): bool
{
$jobData = $message['data'] ?? [];
$msgData = $jobData['data'] ?? [];
//....这里执行具体的任务
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
}
// 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,
// 直到达到最大重试次数后失败后,执行failed方法
$job->delete();
// 也可以重新发布这个任务
$job->release($delay); //$delay为延迟时间
}
public function onQueueFailed($data){
// 任务达到最大重试次数后,失败了
}
}
生产者 ¶
- 助手函数调用,发布任务
php
$isPushed = queue_producer('redis', 'queue', 'queue_sms', [
'sms_id'=> '短信ID',
'sms_code'=> '短信码',
'sms_message'=> '短信内容'
]);
if ($isPushed !== false) {
// echo "<br>" . date('Y-m-d H:i:s') . "推送成功<br>";
} else {
// echo '<br>推送失败.<br>';
}
$isPushed = queue_producer('redis', 'queue', 'queue_sms', [
'sms_id'=> '短信ID',
'sms_code'=> '短信码',
'sms_message'=> '短信内容'
]);
if ($isPushed !== false) {
// echo "<br>" . date('Y-m-d H:i:s') . "推送成功<br>";
} else {
// echo '<br>推送失败.<br>';
}
queue_producer方法说明
php
/**
* 添加到队列
* @param string $connectName 连接名称
* @param string $exchangeName 交换机名称
* @param string|array $queueName 队列名称
* @param string|array $msg 队列数据
* @param int $delay 延迟时间
*/
function queue_producer($connectName, $exchangeName = '', $queueName = null, $msg = 0, $delay)
/**
* 添加到队列
* @param string $connectName 连接名称
* @param string $exchangeName 交换机名称
* @param string|array $queueName 队列名称
* @param string|array $msg 队列数据
* @param int $delay 延迟时间
*/
function queue_producer($connectName, $exchangeName = '', $queueName = null, $msg = 0, $delay)
- 单利调用,发布任务
php
//单例调用
$isPushed = \shiyunQueue\QueueFactory::getInstance()
->setConnectName('redis') // 连接名称
->newConnection()
->setExchangeName('queue') // 交换机名称
->setQueueName('queue_sms') // 队列名称
// ->setJobServer(\addons\sdks\sdk_sms\jobs\SmsConsumer::class) //执行任务类名
// ->setJobServer($job) //执行任务类名
// ->setJobFunc('helloJobQueue')
// ->setJobFunc('helloJobQueue')
->addMessage($diyData) //执行任务需要的参数,可无限制传数
->addMessage($data) //执行任务需要的参数,可无限制传数
->addMessage(['' => '']) //执行任务需要的参数,可无限制传数
->setMsgSecs(2) //延迟2秒后执行任务
->setMsgSecs($delay) //延迟2秒后执行任务
->setMsgEncrypt(true) // 是否加密
// ->sendPublish($diyData); //放入任务
->sendPublish(); //放入任务
$isPushed = \shiyunQueue\QueueFactory::getInstance()
->setConnectName('queue_connect_redis') // 配置通道
->setExchangeName('queue') // 交换机名称
->setQueueName('SmsProducer') // 队列名称
->setMsgDelay(2) //延迟2秒后执行任务
->addMessage($diyData) // 插入数据
->addMessage(['queue_type' => 'SmsProducer']) // 追加数据
->setMsgEncrypt(true) // 是否加密
->sendPublish();
if ($isPushed !== false) {
// echo "<br>" . date('Y-m-d H:i:s') . "推送成功<br>";
} else {
// echo '<br>推送失败.<br>';
}
//单例调用
$isPushed = \shiyunQueue\QueueFactory::getInstance()
->setConnectName('redis') // 连接名称
->newConnection()
->setExchangeName('queue') // 交换机名称
->setQueueName('queue_sms') // 队列名称
// ->setJobServer(\addons\sdks\sdk_sms\jobs\SmsConsumer::class) //执行任务类名
// ->setJobServer($job) //执行任务类名
// ->setJobFunc('helloJobQueue')
// ->setJobFunc('helloJobQueue')
->addMessage($diyData) //执行任务需要的参数,可无限制传数
->addMessage($data) //执行任务需要的参数,可无限制传数
->addMessage(['' => '']) //执行任务需要的参数,可无限制传数
->setMsgSecs(2) //延迟2秒后执行任务
->setMsgSecs($delay) //延迟2秒后执行任务
->setMsgEncrypt(true) // 是否加密
// ->sendPublish($diyData); //放入任务
->sendPublish(); //放入任务
$isPushed = \shiyunQueue\QueueFactory::getInstance()
->setConnectName('queue_connect_redis') // 配置通道
->setExchangeName('queue') // 交换机名称
->setQueueName('SmsProducer') // 队列名称
->setMsgDelay(2) //延迟2秒后执行任务
->addMessage($diyData) // 插入数据
->addMessage(['queue_type' => 'SmsProducer']) // 追加数据
->setMsgEncrypt(true) // 是否加密
->sendPublish();
if ($isPushed !== false) {
// echo "<br>" . date('Y-m-d H:i:s') . "推送成功<br>";
} else {
// echo '<br>推送失败.<br>';
}