网站首页 文章专栏 Go 语言快速尝试 rocketmq
初次接触消息中间件,使用公共 rocketmq 可能更容易上手。
包含内容:
- Docker 部署 rocketmq nameserver 和 broker
- 图形化运维
- 使用 Orange 框架完成 rocketmq 的快速测试
参考 http://www.justdojava.com/2019/08/26/rocketmq-creator/
图形化界面
快速测试的代码
参考 https://gocn.vip/topics/10716
package main
import (
"fmt"
"time"
"gitee.com/zhucheer/orange/queue"
)
func main() {
// 注册生产者 填入broker节点,group名称,重试次数信息
mqProducerClient := queue.RegisterRocketProducerMust([]string{"10.227.82.166:9876"}, "test", 1)
// 注册消费者 填入broker节点,group名称信息
mqConsumerClient := queue.RegisterRocketConsumerMust([]string{"10.227.82.166:9876"}, "test")
go func() {
for i := 0; i < 10; i++ {
// 向队列发送一条消息 填入消息队列topic和消息体信息
ret, _ := mqProducerClient.SendMsg("topicTest", "Hello mq~~")
fmt.Println("========producer push one message====", ret.MsgId)
time.Sleep(time.Second)
}
}()
// 执行消费者监听 填入消息队列topic
mqConsumerClient.ListenReceiveMsgDo("topicTest", func(mqMsg queue.MqMsg) {
// 收到一条消息
fmt.Println("receive====>", mqMsg.MsgId, mqMsg.BodyString())
})
time.Sleep(20 * time.Second)
}
运行该测试代码后的输出
bytedance@C02D56MZMD6R rocketmq_test % go run main.go
INFO[0000] the consumer start beginning consumerGroup=test messageModel=Clustering unitMode=false
WARN[0000] query topic route from server error underlayError="topic not exist"
WARN[0000] queryTopicRouteInfoFromServer return nil topic=topicTest
INFO[0000] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a \",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":7,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster \",\"brokerName\":\"broker-a \",\"brokerAddrs\":{\"0\":\"10.227.82.166:10911\"}}]}" changedFrom="<nil>" topic=topicTest
WARN[0000] query topic route from server error underlayError="topic not exist"
bytedance@C02D56MZMD6R rocketmq_test % go run main.go
INFO[0000] the consumer start beginning consumerGroup=test messageModel=Clustering unitMode=false
INFO[0000] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a \",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster \",\"brokerName\":\"broker-a \",\"brokerAddrs\":{\"0\":\"10.227.82.166:10911\"}}]}" changedFrom="<nil>" topic=topicTest
WARN[0000] query topic route from server error underlayError="topic not exist"
WARN[0000] queryTopicRouteInfoFromServer return nil topic="%RETRY%test"
========producer push one message==== 0A0191F2AE8F0000000036080fd00001
WARN[0000] query topic route from server error underlayError="topic not exist"
WARN[0000] queryTopicRouteInfoFromServer return nil topic="%RETRY%test"
INFO[0000] receive broker's notification to consumer group consumerGroup=test
INFO[0000] the topic route info changed changeTo="{\"OrderTopicConf\":\"\",\"queueDatas\":[{\"brokerName\":\"broker-a \",\"readQueueNums\":4,\"writeQueueNums\":4,\"perm\":6,\"topicSynFlag\":0}],\"brokerDatas\":[{\"cluster\":\"DefaultCluster \",\"brokerName\":\"broker-a \",\"brokerAddrs\":{\"0\":\"10.227.82.166:10911\"}}]}" changedFrom="<nil>" topic=topicTest
INFO[0000] delete mq from offset table MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=0]"
INFO[0000] delete mq from offset table MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=1]"
INFO[0000] delete mq from offset table MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=2]"
INFO[0000] delete mq from offset table MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=3]"
receive====> 0A0191F2AE8F0000000036080fd00001 Hello mq~~
receive====> 0A0191F2AE7100000000360796b80001 Hello mq~~
INFO[0000] the MessageQueue changed, version also updated changeTo=1605066498468329000 changedFrom=0
INFO[0000] The PullThresholdForTopic is changed changeTo=25600 changedFrom=102400
INFO[0000] The PullThresholdSizeForTopic is changed changeTo=12800 changedFrom=51200
WARN[0000] do balance in group failed, the topic does not exist consumerGroup=test topic="%RETRY%test"
WARN[0000] do balance in group failed, the topic does not exist consumerGroup=test topic="%RETRY%test"
========producer push one message==== 0A0191F2AE8F00000000360813b80002
receive====> 0A0191F2AE8F00000000360813b80002 Hello mq~~
========producer push one message==== 0A0191F2AE8F00000000360817a00003
receive====> 0A0191F2AE8F00000000360817a00003 Hello mq~~
========producer push one message==== 0A0191F2AE8F0000000036081b880004
receive====> 0A0191F2AE8F0000000036081b880004 Hello mq~~
========producer push one message==== 0A0191F2AE8F0000000036081f700005
receive====> 0A0191F2AE8F0000000036081f700005 Hello mq~~
========producer push one message==== 0A0191F2AE8F00000000360823580006
receive====> 0A0191F2AE8F00000000360823580006 Hello mq~~
========producer push one message==== 0A0191F2AE8F00000000360827400007
receive====> 0A0191F2AE8F00000000360827400007 Hello mq~~
========producer push one message==== 0A0191F2AE8F0000000036082b280008
receive====> 0A0191F2AE8F0000000036082b280008 Hello mq~~
========producer push one message==== 0A0191F2AE8F0000000036082f100009
receive====> 0A0191F2AE8F0000000036082f100009 Hello mq~~
========producer push one message==== 0A0191F2AE8F00000000360832f8000a
receive====> 0A0191F2AE8F00000000360832f8000a Hello mq~~
INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=2]" consumerGroup=test offset=3
INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=3]" consumerGroup=test offset=2
INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=0]" consumerGroup=test offset=2
INFO[0010] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=1]" consumerGroup=test offset=4
INFO[0015] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=0]" consumerGroup=test offset=2
INFO[0015] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=1]" consumerGroup=test offset=4
INFO[0015] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=2]" consumerGroup=test offset=3
INFO[0015] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=3]" consumerGroup=test offset=2
INFO[0020] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=0]" consumerGroup=test offset=2
INFO[0020] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=1]" consumerGroup=test offset=4
INFO[0020] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=2]" consumerGroup=test offset=3
INFO[0020] update offset to broker success MessageQueue="MessageQueue [topic=topicTest, brokerName=broker-a , queueId=3]" consumerGroup=test offset=2
WARN[0020] do balance in group failed, the topic does not exist consumerGroup=test topic="%RETRY%test"
这里使用了 Orange
框架里的 queue
包。Orange
实际上是一个 Go
语言 web
开发框架,特点在于集成度高,有一些实用小工具:
- 图片验证码
- 发送邮件
- http
请求客户端
- 消息队列