AMQP库非常强大,只需要40-50行简单的代码就可以启动并运行。不幸的是,amqp标准库缺少很多我们希望用到的功能,比如重连逻辑、线程的创建、队列和绑定模版以及流控。
别慌最近作者开源了自己的库, 巧妙地包装了Streadway的amqp库,并提供了那些更高级抽象。github地址。
构建这个库的主要目标如下:
- 处理自动重连。当Node宕机时,amqp库无法处理重连。
- 从队列中生成并发的goroutine——没有理由让每个人都必须在他们构建的应用程序中编写相同的20行代码。
- 提供默认设置,但完全可控。使用go-rabbitmq,你仍然可以获得amqp级别的所有控制,但提供了默认设置,这将减少你在应用程序逻辑中需要的模版代码数量。
- 流控制。AMQP库只提供了一个channel,告诉您何时应该停止发布。当发布到服务端被要求停止发布时,go-rabbitmq库实际上会返回一个错误。
示例:
go-rabbitmq库提供了两种类型:Publisher和Consumer,它们将保存你发布和消费消息的所有配置。
使用默认配置消费
consumer, err := rabbitmq.NewConsumer("amqp://user:pass@localhost")
if err != nil {
log.Fatal(err)
}
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) bool {
log.Printf("consumed: %v", string(d.Body))
// true to ACK, false to NACK
return true
},
"my_queue",
[]string{"routing_key1", "routing_key2"}
)
if err != nil {
log.Fatal(err)
}
以上代码实现功能:
1、 创建一个消费者连接到所提供消息的集群
2、如果my_queue队列不存在,则创建它,并将routing_key1和routing_key2绑定到它。
3、传入匿名函数提供的处理程序,以单线程方式消费消息。我们提供了一个处理程序来打印消息并将其从队列中删除。
4、如果应用程序失去与集群的连接或某个节点宕机,客户端将不断尝试以指数后退策略重新连接。
使用默认设置发布消息
publisher, returns, err := rabbitmq.NewPublisher("amqp://user:pass@localhost")
if err != nil {
log.Fatal(err)
}
err = publisher.Publish([]byte("hello, world"), []string{"routing_key"})
if err != nil {
log.Fatal(err)
}
实现功能:
1、创建连接到所提供集群的发布者
2、函数返回channel,该channel将在服务器拒绝发布时返回消息。注意,当使用特定选项(强制和立即)时,就会发生这种情况。如果集群宕机或连接丢失,发布将会失败,直到发布者重新连接(它会自动重连)。
3、将文本“hello, world”的字节发布到routing_key路由键。
消费选项
完整消费者配置项是相当大的,不会限制其功能:
type ConsumeOptions struct {
QueueDurable bool
QueueAutoDelete bool
QueueExclusive bool
QueueNoWait bool
QueueArgs Table
BindingExchange string
BindingNoWait bool
BindingArgs Table
Concurrency int
QOSPrefetch int
QOSGlobal bool
ConsumerName string
ConsumerAutoAck bool
ConsumerExclusive bool
ConsumerNoWait bool
ConsumerNoLocal bool
ConsumerArgs Table
}
如果你看看RabbitMQ的特性集,你会发现大多数选项都是不言自明的。例如,持久化队列不会在服务器重启时丢失,独占队列不能被多个连接使用,自动删除队列在没有消费者时被删除,等等。让我们看看如何设置一些更高级的配置。
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) bool {
log.Printf("consumed: %v", string(d.Body))
return true
},
"my_queue",
[]string{"routing_key1", "routing_key2"},
func(opts *rabbitmq.ConsumeOptions) {
opts.QueueDurable = true
opts.Concurrency = 10
opts.QOSPrefetch = 100
},
)
我们传入一个匿名函数(使用Go的可变参数特性),它将按照我们想要的方式改变指向配置结构的指针。在上面的代码中,我们设置队列可持久化,配制消费者要以10个线程运行处理程序,我们希望服务器一次批处理100条消息(这有助于吞吐量,每个处理函数仍然一次只能处理一个消息)。
还有一些配置功能,你可以直接使用:
err = consumer.StartConsuming(
func(d rabbitmq.Delivery) bool {
log.Printf("consumed: %v", string(d.Body))
return true
},
"my_queue",
[]string{"routing_key1", "routing_key2"},
rabbitmq.WithConsumeOptionsConcurrency(10),
rabbitmq.WithConsumeOptionsQueueDurable,
rabbitmq.WithConsumeOptionsQuorum,
)
发布配置
坦率地说,我不太喜欢强制性和即时的选项。在我看来,这似乎打破了发布/订阅架构的一个简单原则,即发布者不应该感知到他们的消费者。应该由使用者来确保它们正确连接并创建了它们的队列。也就是说,你仍然可以使用这些选项:
err = publisher.Publish(
[]byte("hello, world"),
[]string{"routing_key"},
// leave blank for defaults
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
)
如果您对该库有任何问题或改进建议,请提issue,让我们一起讨论!我计划将这个库保持在V0版本,直到我非常确定对这个API感到满意。