程序员社区

Go:基于Redis Stream构建可扩展事件流

Go:基于Redis Stream构建可扩展事件流插图

事件流架构是独立扩展某些组件的好方法。当提到事件流工具时,这方面的主流似乎是Kafka,但也有一些其他实用的流工具,比如NATS流/Jetstream, NSQ或Redis流。

今天我将写一些关于Redis Stream基本用法的笔记,我们将构建一个发布者和一个消费者的例子,并使用运行在docker上的Redis服务器做本地测试。发布者将使用XADD命令发送一些消息到Redis的持久化消息流中,消费者将使用XREADGROUP读取流。

当事件在给定流中发布时,通过使用XREADGROUP,可以让多个消费者执行相同的操作。这种机制使水平扩展消费者成为可能。当我们添加一个消费者时,它将自己注册到一个消费者组中,并且流中的的消息将均匀地发送到组中的不同消费者。

构建示例

假设我们构建了一个分布式系统,在这个系统中,将从外部接收一些”tickets“消息,当消息到达时,我们希望执行一些操作,例如解析消息的内容,调用API写信息到数据库。

本地Redis

开始之前,我们需要一个Redis服务,我们在本地使用docker镜像启动redis服务:

docker run --name localredis -d redis redis-server --appendonly yes

这里启动本地redis服务,appendonly参数的作用是设置将redis数据持久化。当有数据变化,就会写副本到一个文件。如果redis服务重启,消息数据会恢复。

Go发布者

让我们开始编写代码,为发布者创建一个新的go module。这个发布者将简单地使用XADD命令向Redis流发送一些消息。
初始化一个Go module,创建一个main.go文件,并添加redis调用库:

go mod init publisher
touch main.go
go get github.com/go-redis/redis

在main.go文件中,我们将连接redis并使用ping来检查是否连接成功:

package main
import (
 "fmt"
 "log"
"github.com/go-redis/redis"
)
func main() {
  log.Println("Publisher started")
  redisClient := redis.NewClient(&redis.Options{
    Addr: fmt.Sprintf("%s:%s", "127.0.0.1", "6379"),
  })
  _, err := redisClient.Ping().Result()
  if err != nil {
    log.Fatal("Unable to connect to Redis", err)
  }
  log.Println("Connected to Redis server")
}

当执行go run main.go,如果一切正常的话将看到如下日志:

2021/08/25 20:54:21 Publisher started
2021/08/25 20:54:21 Connected to Redis server

现在我们创建一个“tickets”流然后添加一个消息:

func publishTicketReceivedEvent(client *redis.Client) error {
  log.Println("Publishing event to Redis")
  err := client.XAdd(&redis.XAddArgs{
    Stream:       "tickets",
    MaxLen:       0,
    MaxLenApprox: 0,
    ID:           "",
    Values: map[string]interface{}{
      "whatHappened": string("ticket received"),
      "ticketID":     int(rand.Intn(100000000)),
      "ticketData":   string("some ticket data"),
    },
  }).Err()
  return err
}

这里函数将发送一条“ticket received”消息并附带一个随机id和一些数据。在主函数中,我们将以上函数在循环中多次使用看看会发生什么?

for i := 0; i < 3000; i++ {
  err = publishTicketReceivedEvent(redisClient)
  if err != nil {
    log.Fatal(err)
  }
}

如果你执行main.go函数,将会打印3000多行日志:

...
2021/08/25 21:08:38 Publishing event to Redis
2021/08/25 21:08:38 Publishing event to Redis
2021/08/25 21:08:38 Publishing event to Redis

我们进入redis容器内看看,在redis服务中,打开redis-cli客户端,查看当前状态:

docker exec -it localredis /bin/bash
redis-cli
127.0.0.1:6379> XINFO STREAM tickets

XINFO是redis命令用于监控消息流或消费者组状态。这里我们将看到有3000条流数据:以下是第一条和最后一条数据:

1) "length"
 2) (integer) 3000
 3) "radix-tree-keys"
 4) (integer) 45
 5) "radix-tree-nodes"
 6) (integer) 111
 7) "last-generated-id"
 8) "1615061318123-0"
 9) "groups"
10) (integer) 0
11) "first-entry"
12) 1) "1615061313111-0"
    2) 1) "whatHappened"
       2) "ticket received"
       3) "ticketID"
       4) "98498081"
       5) "ticketData"
       6) "some ticket data"
13) "last-entry"
14) 1) "1615061318123-0"
    2) 1) "whatHappened"
       2) "ticket received"
       3) "ticketID"
       4) "39114354"
       5) "ticketData"
       6) "some ticket data"

Ok,我们已经在名为“tickets”流中发布了一些消息,现在我们来构建一个消费者去消费流数据:

Go消费者

和发布者类似,首先需要创建一个新的module:

go mod init consumer
touch main.go
go get github.com/go-redis/redis

在main.go要先连接redis服务:

package main
import (
  "fmt"
  "log
  "github.com/go-redis/redis"
)
func main() {
  log.Println("Consumer started")
  redisClient := redis.NewClient(&redis.Options{
    Addr: fmt.Sprintf("%s:%s", "127.0.0.1", "6379"),
  })
  _, err := redisClient.Ping().Result()
  if err != nil {
    log.Fatal("Unbale to connect to Redis", err)
  }
  log.Println("Connected to Redis server")
}

下面使用XGROUPCREATE来创建消费者组:

subject := "tickets"
consumersGroup := "tickets-consumer-group"
err = redisClient.XGroupCreate(subject, consumersGroup, "0").Err()
if err != nil {
log.Println(err)
}

现在可以使用XREADGROUP来监听流中消息,并使用一个唯一id将消费者注册到消费者组里:
为了生成唯一id,将使用xid库:

go get github.com/rs/xid

当接收到ticket消息时,将调用以下函数:

func handleNewTicket(ticketID string, ticketData string) error {
  log.Printf("Handling new ticket id : %s data %s\n", ticketID, ticketData)
  return nil
}

然后在main.go中创建一个无限循环,我们调用XREADGROUP并在>位置,表示从该组的第一个待处理消息开始,然后为每个ticket调用handNewTicket函数,并发送XACK命令到redis服务通知消息已经被消费。

uniqueID := xid.New().String()
for {
  entries, err := redisClient.XReadGroup(&redis.XReadGroupArgs{
    Group:    consumersGroup,
    Consumer: uniqueID,
    Streams:  []string{subject, ">"},
    Count:    2,
    Block:    0,
    NoAck:    false,
  }).Result()
  if err != nil {
    log.Fatal(err)
  }
for i := 0; i < len(entries[0].Messages); i++ {
  messageID := entries[0].Messages[i].ID
  values := entries[0].Messages[i].Values
  eventDescription := fmt.Sprintf("%v", values["whatHappened"])
  ticketID := fmt.Sprintf("%v", values["ticketID"])
  ticketData := fmt.Sprintf("%v", values["ticketData"])
  if eventDescription == "ticket received" {
    err := handleNewTicket(ticketID, ticketData)
    if err != nil {
      log.Fatal(err)
    }
    redisClient.XAck(subject, consumersGroup, messageID)
  }
}

如果程序执行正常,将看到3000行的日志:

...
2021/08/25 21:51:44 Handling new ticket id : 28377708 data some ticket data
2021/08/25 21:51:44 Handling new ticket id : 56451806 data some ticket data
2021/08/25 21:51:44 Handling new ticket id : 94132471 data some ticket data

您还应该注意到,该程序没有退出,仍然在侦听消息。这是因为我们使用BLOCK = 0参数调用XREADGROUP,这意味着程序执行将被阻塞无限长的时间,直到新消息到来。如果您打开第二个终端并再次运行发布者,会看到消息同时被发布和消费,这是一件好事。

在现实生活中,消费消息时,我们会对消息做比log.Println()更复杂的事情。我们可能调用外部API,写入数据库或将数据导出到S3桶中等……当我们给消费者增加一些延迟时,会发生什么?

假设我们仍然以同样的速度发布这3000条消息,但是我们在消费者中添加了一个time.Sleep(100 * time.Millisecond),这将耗时超过5分钟来消费这3000条消息…除非我们同时运行多个消费者。好消息是,我们已经为这种方法编写了所有代码。

如何扩展?

我们做一个快速实验。如果我们将time.Sleep(100 * time.Millisecond)添加到消费者的handNewTicket函数,将会如何?

func handleNewTicket(ticketID string, ticketData string) error {
  log.Printf("Handling new ticket id : %s data %s\n", ticketID, ticketData)
   time.Sleep(100 * time.Millisecond)
   return nil
}

在消费者中打开5个终端运行go run main.go:

Go:基于Redis Stream构建可扩展事件流插图1

然后在发布者运行go run main.go,你将看到tickets消息平均的发布到各个消费者,所以消费者以固定的速度同时运行。几秒钟后,所有消费者都应该同时停止。

Go:基于Redis Stream构建可扩展事件流插图2

是不是很厉害?

只需几行代码,我们就拥有了一个带有事件发布者、事件流和任意数量消费者的分布式系统。这种解决方案可以很容易地在kubernetes集群上扩展,并允许我们处理潜在的巨大数据负载。
这个示例项目的源代码可以在这里找到:https://github.com/gmrdn/redis-streams-go
想了解更多关于Redis Streams的信息,查看 https://redis.io/topics/streams-intro

备注:文章部分结果在译者本地运行测试。原文

赞(0) 打赏
未经允许不得转载:IDEA激活码 » Go:基于Redis Stream构建可扩展事件流

一个分享Java & Python知识的社区