banner
RustyNail

RustyNail

coder. 【blog】https://rustynail.me 【nostr】wss://ts.relays.world/ wss://relays.world/nostr

RabbitMq Golang使用

最近在做一个玩具项目的时候有用到 MQ 来分离两个服务,其中有个场景是类似广播,然后符合条件的服务给出回应。

直接的 demo 如下,主要是 Exchange 这些的配置的坑,以及 Auto Ack 之类的。

mq 服务器是自己的旧笔记本,demo 内网 1000M 跑起来还蛮壮观嘿嘿

image

 

package main



import (

	"fmt"

	"time"



	"github.com/streadway/amqp"

)



func Api(port string) {

	conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))

	if err != nil {

		println(err.Error())

	}

	chann, err := conn.Channel()

	// publish each sec

	go func() {

		_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)

		for true {

			chann.Publish("ApiSend", "", false, false, amqp.Publishing{

				ContentType: "text/plain",

				Body:        []byte("data from api[" + port + "]"),

			})

			time.Sleep(1 * time.Nanosecond)

		}

	}()

	_, _ = chann.QueueDeclare("ApiGetQueue", false, false, false, false, nil)

	_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)

	_ = chann.QueueBind("ApiGetQueue", "", "ApiGet", false, nil)

	consumeChan, err := chann.Consume("ApiGetQueue", "", false, false, false, false, nil)

	if err != nil {

		println(err.Error())

	}

	for d := range consumeChan {

		println("[api get] --> ", string(d.Body))

		d.Ack(false)

	}

}



func Data(port string) {

	conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))

	if err != nil {

		println(err.Error())

	}

	chann, err := conn.Channel()

	// publish each sec

	_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)



	_, _ = chann.QueueDeclare("ApiSendGet"+port, false, false, false, false, nil)

	_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)

	_ = chann.QueueBind("ApiSendGet"+port, "", "ApiSend", false, nil)

	consumeChan, err := chann.Consume("ApiSendGet"+port, "", false, false, false, false, nil)

	if err != nil {

		println(err.Error())

	}

	for d := range consumeChan {

		println("[data "+port+" get] --> ", string(d.Body))

		chann.Publish("ApiGet", "", false, false, amqp.Publishing{

			ContentType: "text/plain",

			Body:        []byte("data from data[" + port + "]"),

		})

		d.Ack(false)

	}

}



func main() {

	go Api("1111")

	go Data("2222")

	Data("3333")

}
加载中...
此文章数据所有权由区块链加密技术和智能合约保障仅归创作者所有。