golang gin 监听rabbitmq队列无限消费的案例代码
lj907722644 人气:0golang gin 监听rabbitmq队列无限消费
连接rabbitmq
package database import ( "github.com/streadway/amqp" "log" "reflect" "yy-data-processing/common/config" ) var RabbitConn *amqp.Connection var RabbitChannel *amqp.Channel func InitRabbitmq() { var err error RabbitConn, err = amqp.Dial(config.Config.RabbitUrl) if err != nil { log.Println("连接RabbitMQ失败") panic(err) } RabbitChannel, err = RabbitConn.Channel() if err != nil { log.Println("获取RabbitMQ channel失败") panic(err) } } // 0表示channel未关闭,1表示channel已关闭 func CheckRabbitClosed(ch amqp.Channel) int64 { d := reflect.ValueOf(ch) i := d.FieldByName("closed").Int() return i }
创建生产者
package service import ( "encoding/json" "github.com/streadway/amqp" "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Producer() { // 声明队列,没有则创建 // 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列) declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil) if err != nil { log.Printf("声明队列 %v 失败, error: %v", config.Config.HawkSaveQueueName, err) panic(err) } request := model.Request{} marshal, _ := json.Marshal(request ) // exchange、routing key、mandatory、immediate err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(marshal), }) if err != nil { log.Printf("生产者发送消息失败, error: %v", err) } else { log.Println("生产者发送消息成功") } }
创建消费者
package service import ( "encoding/json" "log" "os" "strings" "sync" "time" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/model" ) func Consumer() { // 声明队列,没有则创建 // 队列名称、是否持久化、所有消费者与队列断开时是否自动删除队列、是否独享(不同连接的channel能否使用该队列) _, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil) if err != nil { log.Printf("声明队列 %v 失败, error: %v", config.Config.QueueName, err) panic(err) } // 队列名称、consumer、auto-ack、是否独享 // deliveries是一个管道,有消息到队列,就会消费,消费者的消息只需要从deliveries这个管道获取 deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil) if err != nil { log.Printf("从队列 %v 获取数据失败, error: %v", config.Config.QueueName, err) } else { log.Println("从消费队列获取任务成功") } // 阻塞住 for { select { case message := <-deliveries: closed := database.CheckRabbitClosed(*database.RabbitChannel) if closed == 1 { // channel 已关闭,重连一下 database.InitRabbitmq() } else { msgData := string(message.Body) request := model.Request{} err := json.Unmarshal([]byte(msgData), &request) if err != nil { log.Printf("解析rabbitmq数据 %v 失败, error: %v", msgData, err) } else { // TODO... // 处理逻辑 } } } } }
main方法协程调用
package main import ( "log" "yy-data-processing/common/config" "yy-data-processing/common/database" "yy-data-processing/router" "yy-data-processing/service" ) func main() { // 初始化路由 routers := router.InitRouters() // 初始化RabbitMQ database.InitRabbitmq() go service.Producer() go service.Consumer() port := config.Config.Port if err := routers.Run(":" + port); err != nil { log.Printf("启动服务失败: ", err) } }
加载全部内容