最近在做一个玩具项目的时候有用到 MQ 来分离两个服务,其中有个场景是类似广播,然后符合条件的服务给出回应。
直接的 demo 如下,主要是 Exchange 这些的配置的坑,以及 Auto Ack 之类的。
mq 服务器是自己的旧笔记本,demo 内网 1000M 跑起来还蛮壮观嘿嘿
package main
import (
"fmt"
"time"
"github.com/streadway/amqp"
)
func Api(port string) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))
if err != nil {
println(err.Error())
}
chann, err := conn.Channel()
// publish each sec
go func() {
_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)
for true {
chann.Publish("ApiSend", "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("data from api[" + port + "]"),
})
time.Sleep(1 * time.Nanosecond)
}
}()
_, _ = chann.QueueDeclare("ApiGetQueue", false, false, false, false, nil)
_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)
_ = chann.QueueBind("ApiGetQueue", "", "ApiGet", false, nil)
consumeChan, err := chann.Consume("ApiGetQueue", "", false, false, false, false, nil)
if err != nil {
println(err.Error())
}
for d := range consumeChan {
println("[api get] --> ", string(d.Body))
d.Ack(false)
}
}
func Data(port string) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))
if err != nil {
println(err.Error())
}
chann, err := conn.Channel()
// publish each sec
_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)
_, _ = chann.QueueDeclare("ApiSendGet"+port, false, false, false, false, nil)
_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)
_ = chann.QueueBind("ApiSendGet"+port, "", "ApiSend", false, nil)
consumeChan, err := chann.Consume("ApiSendGet"+port, "", false, false, false, false, nil)
if err != nil {
println(err.Error())
}
for d := range consumeChan {
println("[data "+port+" get] --> ", string(d.Body))
chann.Publish("ApiGet", "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("data from data[" + port + "]"),
})
d.Ack(false)
}
}
func main() {
go Api("1111")
go Data("2222")
Data("3333")
}