Skip to content
官方QQ交流群
技术交流1:87208295   点此加入
技术交流2:787747122   点此加入
官网
云控制台
开放平台
关注微信公众号
代码仓库: 码云

简单模式

支持情况

驱动是否支持
redis
amqp
mqtt

消费者

  • 创建消费者
sh
php shiyun queue:simple XxCon
php shiyun queue:simple XxCon
  • 代码参考
php
namespace addons\comA\moduleA\queue;

use shiyunQueue\drive\Job;

class Consumer1
{

    /**
     * 根据消息中的数据进行实际的业务处理...
     * @param array $data
     * @return bool
     */
    // doJob的参数由放入任务方法中的data决定
    // 执行逻辑
    // 成功返回true,失败返回fasle,会再次加入队列。执行到最大错误后自动删除
    public function onQueueMessage($message,Job $job): bool
    {

        $jobData = $message['data'] ?? [];
        $msgData = $jobData['data'] ?? [];
        //....这里执行具体的任务 
        if ($job->attempts() > 3) {
            //通过这个方法可以检查这个任务已经重试了几次了
        }
        // 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,
        // 直到达到最大重试次数后失败后,执行failed方法
        $job->delete();
        // 也可以重新发布这个任务
        $job->release($delay); //$delay为延迟时间
    }
    public function onQueueFailed($data){
        // 任务达到最大重试次数后,失败了
    }
}
namespace addons\comA\moduleA\queue;

use shiyunQueue\drive\Job;

class Consumer1
{

    /**
     * 根据消息中的数据进行实际的业务处理...
     * @param array $data
     * @return bool
     */
    // doJob的参数由放入任务方法中的data决定
    // 执行逻辑
    // 成功返回true,失败返回fasle,会再次加入队列。执行到最大错误后自动删除
    public function onQueueMessage($message,Job $job): bool
    {

        $jobData = $message['data'] ?? [];
        $msgData = $jobData['data'] ?? [];
        //....这里执行具体的任务 
        if ($job->attempts() > 3) {
            //通过这个方法可以检查这个任务已经重试了几次了
        }
        // 如果任务执行成功后 记得删除任务,不然这个任务会重复执行,
        // 直到达到最大重试次数后失败后,执行failed方法
        $job->delete();
        // 也可以重新发布这个任务
        $job->release($delay); //$delay为延迟时间
    }
    public function onQueueFailed($data){
        // 任务达到最大重试次数后,失败了
    }
}

生产者

  • 助手函数调用,发布任务
php
$isPushed = queue_producer('redis', 'queue', 'queue_sms', [
    'sms_id'=> '短信ID',
    'sms_code'=> '短信码',
    'sms_message'=> '短信内容'
]);

if ($isPushed !== false) {
    // echo "<br>" . date('Y-m-d H:i:s') . "推送成功<br>";
} else {
    // echo '<br>推送失败.<br>';
}
$isPushed = queue_producer('redis', 'queue', 'queue_sms', [
    'sms_id'=> '短信ID',
    'sms_code'=> '短信码',
    'sms_message'=> '短信内容'
]);

if ($isPushed !== false) {
    // echo "<br>" . date('Y-m-d H:i:s') . "推送成功<br>";
} else {
    // echo '<br>推送失败.<br>';
}

queue_producer方法说明

php

/**
 * 添加到队列
 * @param string        $connectName   连接名称
 * @param string        $exchangeName  交换机名称
 * @param string|array  $queueName     队列名称
 * @param string|array  $msg           队列数据
 * @param int           $delay         延迟时间
 */
function queue_producer($connectName, $exchangeName = '', $queueName = null, $msg = 0, $delay)

/**
 * 添加到队列
 * @param string        $connectName   连接名称
 * @param string        $exchangeName  交换机名称
 * @param string|array  $queueName     队列名称
 * @param string|array  $msg           队列数据
 * @param int           $delay         延迟时间
 */
function queue_producer($connectName, $exchangeName = '', $queueName = null, $msg = 0, $delay)
  • 单利调用,发布任务
php

 //单例调用
$isPushed = \shiyunQueue\QueueFactory::getInstance()
    ->setConnectName('redis') // 连接名称
    ->newConnection()
    ->setExchangeName('queue') // 交换机名称
    ->setQueueName('queue_sms') // 队列名称
    // ->setJobServer(\addons\sdks\sdk_sms\jobs\SmsConsumer::class) //执行任务类名
    // ->setJobServer($job) //执行任务类名
    // ->setJobFunc('helloJobQueue')
    // ->setJobFunc('helloJobQueue')
    ->addMessage($diyData) //执行任务需要的参数,可无限制传数
    ->addMessage($data) //执行任务需要的参数,可无限制传数
    ->addMessage(['' => '']) //执行任务需要的参数,可无限制传数
    ->setMsgSecs(2) //延迟2秒后执行任务
    ->setMsgSecs($delay) //延迟2秒后执行任务
    ->setMsgEncrypt(true) // 是否加密
    // ->sendPublish($diyData); //放入任务
    ->sendPublish(); //放入任务
    

$isPushed = \shiyunQueue\QueueFactory::getInstance()
        ->setConnectName('queue_connect_redis') // 配置通道
        ->setExchangeName('queue') // 交换机名称
        ->setQueueName('SmsProducer') // 队列名称
        ->setMsgDelay(2) //延迟2秒后执行任务
        ->addMessage($diyData) // 插入数据
        ->addMessage(['queue_type' => 'SmsProducer']) // 追加数据
        ->setMsgEncrypt(true) // 是否加密
        ->sendPublish();



if ($isPushed !== false) {
    // echo "<br>" . date('Y-m-d H:i:s') . "推送成功<br>";
} else {
    // echo '<br>推送失败.<br>';
}

 //单例调用
$isPushed = \shiyunQueue\QueueFactory::getInstance()
    ->setConnectName('redis') // 连接名称
    ->newConnection()
    ->setExchangeName('queue') // 交换机名称
    ->setQueueName('queue_sms') // 队列名称
    // ->setJobServer(\addons\sdks\sdk_sms\jobs\SmsConsumer::class) //执行任务类名
    // ->setJobServer($job) //执行任务类名
    // ->setJobFunc('helloJobQueue')
    // ->setJobFunc('helloJobQueue')
    ->addMessage($diyData) //执行任务需要的参数,可无限制传数
    ->addMessage($data) //执行任务需要的参数,可无限制传数
    ->addMessage(['' => '']) //执行任务需要的参数,可无限制传数
    ->setMsgSecs(2) //延迟2秒后执行任务
    ->setMsgSecs($delay) //延迟2秒后执行任务
    ->setMsgEncrypt(true) // 是否加密
    // ->sendPublish($diyData); //放入任务
    ->sendPublish(); //放入任务
    

$isPushed = \shiyunQueue\QueueFactory::getInstance()
        ->setConnectName('queue_connect_redis') // 配置通道
        ->setExchangeName('queue') // 交换机名称
        ->setQueueName('SmsProducer') // 队列名称
        ->setMsgDelay(2) //延迟2秒后执行任务
        ->addMessage($diyData) // 插入数据
        ->addMessage(['queue_type' => 'SmsProducer']) // 追加数据
        ->setMsgEncrypt(true) // 是否加密
        ->sendPublish();



if ($isPushed !== false) {
    // echo "<br>" . date('Y-m-d H:i:s') . "推送成功<br>";
} else {
    // echo '<br>推送失败.<br>';
}

Copyright © 2017 10yun.com | 十云提供计算服务-IPV6 | ctocode组开发