Recently, when working on a toy project, I used MQ to separate two services, one of which is a scenario similar to broadcasting, and then the services that meet the conditions respond.
The direct demo is as follows, mainly the configuration of Exchange and Auto Ack.
The MQ server is my old laptop, and it runs quite impressively on the 1000M intranet.
package main
import (
"fmt"
"time"
"github.com/streadway/amqp"
)
func Api(port string) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))
if err != nil {
println(err.Error())
}
chann, err := conn.Channel()
// publish each sec
go func() {
_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)
for true {
chann.Publish("ApiSend", "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("data from api[" + port + "]"),
})
time.Sleep(1 * time.Nanosecond)
}
}()
_, _ = chann.QueueDeclare("ApiGetQueue", false, false, false, false, nil)
_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)
_ = chann.QueueBind("ApiGetQueue", "", "ApiGet", false, nil)
consumeChan, err := chann.Consume("ApiGetQueue", "", false, false, false, false, nil)
if err != nil {
println(err.Error())
}
for d := range consumeChan {
println("[api get] --> ", string(d.Body))
d.Ack(false)
}
}
func Data(port string) {
conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))
if err != nil {
println(err.Error())
}
chann, err := conn.Channel()
// publish each sec
_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)
_, _ = chann.QueueDeclare("ApiSendGet"+port, false, false, false, false, nil)
_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)
_ = chann.QueueBind("ApiSendGet"+port, "", "ApiSend", false, nil)
consumeChan, err := chann.Consume("ApiSendGet"+port, "", false, false, false, false, nil)
if err != nil {
println(err.Error())
}
for d := range consumeChan {
println("[data "+port+" get] --> ", string(d.Body))
chann.Publish("ApiGet", "", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte("data from data[" + port + "]"),
})
d.Ack(false)
}
}
func main() {
go Api("1111")
go Data("2222")
Data("3333")
}