网站首页 文章专栏 Go 语言快速尝试 rocketmq
Go 语言快速尝试 rocketmq
创建于:2021-07-04 08:37:54 更新于:2024-05-05 03:08:56 羽瀚尘 1039
消息队列 消息队列

背景

初次接触消息中间件,使用公共 rocketmq 可能更容易上手。
包含内容:
- Docker 部署 rocketmq nameserver 和 broker
- 图形化运维
- 使用 Orange 框架完成 rocketmq 的快速测试

部署 rocketmq

参考 http://www.justdojava.com/2019/08/26/rocketmq-creator/
图形化界面

测试

快速测试的代码
参考 https://gocn.vip/topics/10716

package main
import (
    "fmt"
    "time"
    "gitee.com/zhucheer/orange/queue"
)
func main() {
    // 注册生产者 填入broker节点,group名称,重试次数信息
    mqProducerClient := queue.RegisterRocketProducerMust([]string{"10.227.82.166:9876"}, "test", 1)
    // 注册消费者 填入broker节点,group名称信息
    mqConsumerClient := queue.RegisterRocketConsumerMust([]string{"10.227.82.166:9876"}, "test")
    go func() {
        for i := 0; i < 10; i++ {
            // 向队列发送一条消息 填入消息队列topic和消息体信息
            ret, _ := mqProducerClient.SendMsg("topicTest", "Hello mq~~")
            fmt.Println("========producer push one message====", ret.MsgId)
            time.Sleep(time.Second)
        }
    }()
    // 执行消费者监听 填入消息队列topic
    mqConsumerClient.ListenReceiveMsgDo("topicTest", func(mqMsg queue.MqMsg) {
        // 收到一条消息
        fmt.Println("receive====>", mqMsg.MsgId, mqMsg.BodyString())
    })
    time.Sleep(20 * time.Second)
}

运行该测试代码后的输出

bytedance@C02D56MZMD6R rocketmq_test % go run main.go
INFO[0000] the consumer start beginning                  consumerGroup=test messageModel=Clustering unitMode=false
WARN[0000] query topic route from server error           underlayError="topic not exist"
WARN[0000] queryTopicRouteInfoFromServer return nil      topic=topicTest
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a  \",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":7,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster  \",\"brokerName\":\"broker-a  \",\"brokerAddrs\":{\"0\":\"10.227.82.166:10911\"}}]}" changedFrom="<nil>" topic=topicTest
WARN[0000] query topic route from server error           underlayError="topic not exist"
bytedance@C02D56MZMD6R rocketmq_test % go run main.go
INFO[0000] the consumer start beginning                  consumerGroup=test messageModel=Clustering unitMode=false
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a  \",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster  \",\"brokerName\":\"broker-a  \",\"brokerAddrs\":{\"0\":\"10.227.82.166:10911\"}}]}" changedFrom="<nil>" topic=topicTest
WARN[0000] query topic route from server error           underlayError="topic not exist"
WARN[0000] queryTopicRouteInfoFromServer return nil      topic="%RETRY%test"
========producer push one message==== 0A0191F2AE8F0000000036080fd00001
WARN[0000] query topic route from server error           underlayError="topic not exist"
WARN[0000] queryTopicRouteInfoFromServer return nil      topic="%RETRY%test"
INFO[0000] receive broker's notification to consumer group  consumerGroup=test
INFO[0000] the topic route info changed                  changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a  \",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster  \",\"brokerName\":\"broker-a  \",\"brokerAddrs\":{\"0\":\"10.227.82.166:10911\"}}]}" changedFrom="<nil>" topic=topicTest
INFO[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=0]"
INFO[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=1]"
INFO[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=2]"
INFO[0000] delete mq from offset table                   MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=3]"
receive====> 0A0191F2AE8F0000000036080fd00001 Hello mq~~
receive====> 0A0191F2AE7100000000360796b80001 Hello mq~~
INFO[0000] the MessageQueue changed, version also updated  changeTo=1605066498468329000 changedFrom=0
INFO[0000] The PullThresholdForTopic is changed          changeTo=25600 changedFrom=102400
INFO[0000] The PullThresholdSizeForTopic is changed      changeTo=12800 changedFrom=51200
WARN[0000] do balance in group failed, the topic does not exist  consumerGroup=test topic="%RETRY%test"
WARN[0000] do balance in group failed, the topic does not exist  consumerGroup=test topic="%RETRY%test"
========producer push one message==== 0A0191F2AE8F00000000360813b80002
receive====> 0A0191F2AE8F00000000360813b80002 Hello mq~~
========producer push one message==== 0A0191F2AE8F00000000360817a00003
receive====> 0A0191F2AE8F00000000360817a00003 Hello mq~~
========producer push one message==== 0A0191F2AE8F0000000036081b880004
receive====> 0A0191F2AE8F0000000036081b880004 Hello mq~~
========producer push one message==== 0A0191F2AE8F0000000036081f700005
receive====> 0A0191F2AE8F0000000036081f700005 Hello mq~~
========producer push one message==== 0A0191F2AE8F00000000360823580006
receive====> 0A0191F2AE8F00000000360823580006 Hello mq~~
========producer push one message==== 0A0191F2AE8F00000000360827400007
receive====> 0A0191F2AE8F00000000360827400007 Hello mq~~
========producer push one message==== 0A0191F2AE8F0000000036082b280008
receive====> 0A0191F2AE8F0000000036082b280008 Hello mq~~
========producer push one message==== 0A0191F2AE8F0000000036082f100009
receive====> 0A0191F2AE8F0000000036082f100009 Hello mq~~
========producer push one message==== 0A0191F2AE8F00000000360832f8000a
receive====> 0A0191F2AE8F00000000360832f8000a Hello mq~~
INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=2]" consumerGroup=test offset=3
INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=3]" consumerGroup=test offset=2
INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=0]" consumerGroup=test offset=2
INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=1]" consumerGroup=test offset=4
INFO[0015] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=0]" consumerGroup=test offset=2
INFO[0015] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=1]" consumerGroup=test offset=4
INFO[0015] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=2]" consumerGroup=test offset=3
INFO[0015] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=3]" consumerGroup=test offset=2
INFO[0020] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=0]" consumerGroup=test offset=2
INFO[0020] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=1]" consumerGroup=test offset=4
INFO[0020] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=2]" consumerGroup=test offset=3
INFO[0020] update offset to broker success               MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a  , queueId=3]" consumerGroup=test offset=2
WARN[0020] do balance in group failed, the topic does not exist  consumerGroup=test topic="%RETRY%test"

其他

这里使用了 Orange 框架里的 queue 包。Orange 实际上是一个 Go 语言 web 开发框架,特点在于集成度高,有一些实用小工具:
- 图片验证码
- 发送邮件
- http 请求客户端
- 消息队列