banner
RustyNail

RustyNail

coder. 【blog】https://rustynail.me 【nostr】wss://ts.relays.world/ wss://relays.world/nostr

RabbitMq Golang Usage

Recently, when working on a toy project, I used MQ to separate two services, one of which is a scenario similar to broadcasting, and then the services that meet the conditions respond.

The direct demo is as follows, mainly the configuration of Exchange and Auto Ack.

The MQ server is my old laptop, and it runs quite impressively on the 1000M intranet.

image

 

package main

import (
	"fmt"
	"time"

	"github.com/streadway/amqp"
)

func Api(port string) {
	conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))
	if err != nil {
		println(err.Error())
	}
	chann, err := conn.Channel()
	// publish each sec
	go func() {
		_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)
		for true {
			chann.Publish("ApiSend", "", false, false, amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte("data from api[" + port + "]"),
			})
			time.Sleep(1 * time.Nanosecond)
		}
	}()
	_, _ = chann.QueueDeclare("ApiGetQueue", false, false, false, false, nil)
	_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)
	_ = chann.QueueBind("ApiGetQueue", "", "ApiGet", false, nil)
	consumeChan, err := chann.Consume("ApiGetQueue", "", false, false, false, false, nil)
	if err != nil {
		println(err.Error())
	}
	for d := range consumeChan {
		println("[api get] --> ", string(d.Body))
		d.Ack(false)
	}
}

func Data(port string) {
	conn, err := amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%d/", "dqn", "dqn", "192.168.1.104", 5672))
	if err != nil {
		println(err.Error())
	}
	chann, err := conn.Channel()
	// publish each sec
	_ = chann.ExchangeDeclare("ApiGet", "fanout", false, false, false, false, nil)

	_, _ = chann.QueueDeclare("ApiSendGet"+port, false, false, false, false, nil)
	_ = chann.ExchangeDeclare("ApiSend", "fanout", false, false, false, false, nil)
	_ = chann.QueueBind("ApiSendGet"+port, "", "ApiSend", false, nil)
	consumeChan, err := chann.Consume("ApiSendGet"+port, "", false, false, false, false, nil)
	if err != nil {
		println(err.Error())
	}
	for d := range consumeChan {
		println("[data "+port+" get] --> ", string(d.Body))
		chann.Publish("ApiGet", "", false, false, amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte("data from data[" + port + "]"),
		})
		d.Ack(false)
	}
}

func main() {
	go Api("1111")
	go Data("2222")
	Data("3333")
}
Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.