Broker

YTask使用broker与任务队列通信,发送或接收任务。
支持的broker有:

redisBroker

import "github.com/gojuukaze/YTask/v2"

// 127.0.0.1 : host
// 6379 : port
// "" : password
// 0 : db
// 10 : client连接池大小. (server端无需设置)
//      对于client端, 你需要根据情况自行设置连接池
ytask.Broker.NewRedisBroker("127.0.0.1", "6379", "", 0, 10)

rabbitMqBroker

import "github.com/gojuukaze/YTask/v2"
// 127.0.0.1 : host
// 5672 : port
// guest : username
// guest : password

ytask.Broker.NewRabbitMqBroker("127.0.0.1", "5672", "guest", "guest", "")

rocketMqBroker

不建议在延时任务中使用

import "github.com/gojuukaze/YTask/v2"

ytask.Broker.NewRocketMqBroker([]string{"127.0.0.1:9876"},[]string{"127.0.0.1:10911"})

自定义broker

你可以自行定义broker。需要注意,因为系统中会调用SetPoolSize设置连接池,所以初始化broker时不要建立连接,调用Activate()时再建立。

如果你的broker不支持连接池,那可以不用管Activate,SetPoolSize,GetPoolSize三个方法,直接返回空就行。

获取任务时,应不断循环获取,而不是阻塞在这,若队列为空,则返回 ErrTypeEmptyQuery 错误。

type BrokerInterface interface {
    // 获取任务
    Next(queryName string) (message.Message, error)
    // 发送任务
    Send(queryName string, msg message.Message) error
    // 把任务插到队头
    //  - 如果你的broker不支持,也没有优先队列这样的替代方案,则可以复用Send(这样做会影响延时任务的处理时间)。
    LSend(queryName string, msg message.Message) error
    // 建立连接
    Activate()
    SetPoolSize(int)
    GetPoolSize()int
    // 用当前broker的配置生成个新的broker
    Clone() BrokerInterface
}