程序员社区

基于NATS JetStream构建分布式事件流系统

基于NATS JetStream构建分布式事件流系统插图

【译文】原文地址
本文我将简要介绍NATS JetStream项目,它是一个来自NATS生态的分布式事件流系统。简言之,NATS JetStream是一个以发布/订阅作为消息传递模式的分布式流媒体系统。当我在咨询工作中向我的客户介绍NATS时,对NATS、NATS Streaming和新推出的NATS JetStream的不同感到困惑。我将简要介绍这三个产品,然后通过一个非常简单的示例深入介绍NATS JetStream。

从NATS服务到NATS JetStream来理解NATS生态

NAT云原生消息分发系统提供了一个基于“最多一次”交付模型的简单NATS服务,该服务将已发布的消息转发给消费者,而对已发布的消息不进行任何持久性处理,其性能处于行业领先地位。对于某些类型的应用程序,如果不需要事件流平台,那么基本的NATS平台就足够了并且还能获得高性能。使用“最多一次”发布模型,如果消息服务在转发数据给订阅者过程中,订阅者系统故障的话,消息将会丢失,因此无法保证已发布的消息的传递。

后来NATS生态添加了第二个消息发布可选方案:NATS流,这是一个基于NATS生态构建的高性能、轻量级、可靠的流平台。当使用NATS流发布消息时,发布的消息被持久化到一个可定制的存储中,由于具备发布者和订阅者提供ACK消息、发布者速率限制和每个订阅者速率匹配/限制的能力,这样我们可以重用发布的消息为消费者提供“At-least-once-delivery”模式。基础NATS和NATS流都是用Go编写的。虽然NATS Streaming非常高性能和轻量化,但在某些能力和成熟度方面,它没有像Kafka这样的分布式流系统那么强大。与此同时,随着NATS 2.0的出现,NATS的生态系统也得到了很大的发展,NATS 2.0提供了分布式安全、去中心化管理、多租户、更大的网络、全球可扩展以及安全的数据共享。但NATS流在适应NATS 2.0方面存在很多限制,而且流系统还没有发展到能够应对下一代物联网和边缘计算的挑战。

NATS 2.0的下一代NATS流被称为NATS JetStream。因此,如果你想使用流系统来构建分布式系统、物联网和边缘计算,那么最好基于NATS使用NATS JetStream。官方对NATS流的支持将于2023年6月结束。因此,如果不想持久化发布的消息,请选择基本的NATS;如果希望将消息持久化到持久化存储中,并通过ACK、速率匹配/限制等方式重复来自持久化存储的消息,请使用NATS JetStream。NATS JetStream是用Go编写的。

NATS JetStream
基于NATS JetStream构建分布式事件流系统插图1

NATS JetStream是NATS生态系统的下一代流系统,为NATS 2.0打造,具有分布式安全、多租户和水平扩展能力。

什么是分布式事件流系统?

流系统允许您从分布式系统和微服务、物联网传感器、边缘设备和软件应用程序中捕获事件流,并将这些流持久化到存储中。因为将这些事件流持久化到持久化存储中,就可以使用事件驱动的结构回放这些事件流,以便检索、处理和响应这些事件流。使用NATS JetStream,流将由生产者系统发布,通过持久化重放消息并通过消费系统发布。简而言之,NATS JetStream是一个以发布/订阅作为消息传递模式的分布式流系统。

尽管现有的流技术很多,其中一些流技术在某些方面比较擅长,但大多数技术还没有经过很大的发展,无法应对未来的计算场景,尤其是物联网和边缘计算。例如,当您使用物联网和边缘计算发布和订阅消息时,您可能需要更好的安全模型。您可能需要多租户和对多个部署模型的支持。NATS JetStream就是为解决这些问题而设计的,这些问题我们在今天的流技术堆栈中已经看到了。

设计目标

NATS的官方文档列出了NATS JetStream的设计目标:

  • 系统必须易于配置和操作,易于观察。
  • 系统必须是安全的,并且在NATS 2.0安全模型下运行良好。
  • 系统必须能水平扩展,并适用于高摄取率。
  • 系统必须支持多个用例。
  • 系统必须能自愈并高可用。
  • 系统必须有一个接近NATS核心的API。
  • 系统必须允许NATS消息成为流的一部分。
  • 系统必须显示与有效载荷无关的行为。
  • 系统不能有第三方依赖关系。

用于运行NATS服务和NATS JetStream服的简单可执行文件

安装NATS Server v2.2.2,您可以使用相同的可执行文件“nats-server”运行基本的NATS服务(没有持久性)和NATS JetStream服务。当你运行nats-server可执行文件时,它将作为基本的NATS服务运行,当你运行带有-js参数的nats-server可执行文件或配置启用JetStream时,它将运行启用了JetStream子系统的NATS服务。
以下是JetStream的配置文件:
清单1。启用JetStream子系统

// 启用jetstream,空模块将启用并使用默认值 的jetstream
jetstream {
    // jetstream数据存放位置:/data/nats-server/jetstream
    store_dir: "/data/nats-server"
// 1GB
    max_memory_store: 1073741824
// 10GB
    max_file_store: 10737418240
}

上面的配置为NATS服务启用了JetStream子系统,并将/data/ nats-server目录配置为持久化流的存储目录。
在这里,我们使用js.conf文件运行带有JetStream子系统的nats-server:
清单2。使用js.conf文件运行NATS服务器

nats-server -c js.conf

图1所示。NATS JetStream服务正在运行:

基于NATS JetStream构建分布式事件流系统插图2

使用单个NATS客户端SDK与NATS和NATS JetStream工作

我们使用单独的客户端sdk与NATS服务和NATS流媒体服务一起工作。但Go SDK nats.gov1.11.0版本中,你可以使用相同的库来处理基本的NATS和NATS JetStream。
下面是从NATS连接创建JetStream上下文的代码:
清单3。创建JetStream上下文

import "github.com/nats-io/nats.go"
nc, _ := nats.Connect(nats.DefaultURL)
js, err := nc.JetStream() // 返回JetStreamContext

JetStreamContext允许JetStream消息和流管理。一旦创建了JetStreamContext,就可以通过使用其发布和订阅api轻松地使用JetStream。

JetStream流

在JetStream中,事件流以Streams存储,其中多个相关的主题存储在一个Stream中。例如,当你在构建一个订单处理系统时,可以考虑“ORDERS”作为一个Stream,在这里可以使用与Stream相关的主题比如“ORDERS.created”、“ORDERS.approved"、“ORDERS.rejected"、“ORDER.payment.debited"和“ORDERS.shiped",来发布和消费事件流。因此所有这些相关的主题都属于“ORDERS”这个Stream。
下图显示了在服务器上以存储文件形式存储的“ORDERS”流。
图2。订单流存储

基于NATS JetStream构建分布式事件流系统插图3

拉式消费者和推式消费者

JetStream提供了两种消费者(订阅者)系统:基于拉的消费者和基于推的消费者。基于Pull的消费者让JetStream从消费者系统中提取消息。基于pull的消费者系统就像工作队列。因为JetStream提供了ACK(确认)机制,所以可以轻松地水平扩展基于Pull的消费者系统,而不会出现消息重复的问题。拉式订阅是NATS生态系统的新功能。基于Push的消费者使JetStream将消息推送到消费者系统,这对于监控系统来说是一个很好的选择。
以下是来自NATS文档的JetStream混合推/拉订单处理架构:

基于NATS JetStream构建分布式事件流系统插图4

原图链接: https://docs.nats.io/jetstream/concepts

通配符订阅

NATS流的消费者不支持通配符订阅。庆幸的是在JetStream中支持通配符订阅。下面的代码块显示了通配符订阅:

nc, _ := nats.Connect(nats.DefaultURL)
js, _ := nc.JetStream()
js.Subscribe("ORDERS.*", func(msg *nats.Msg) {
}

NATS命令行

NATS生态系统为管理需求提供了一个新的NATS CLI工具。你可以从源代码安装或使用Homebrew tap。

$ brew tap nats-io/nats-tools
$ brew install nats-io/nats-tools/nats

一个NATS JetStream的示例

让我们是用 nats.goGo客户端SDK编写一个简单的示例来理解如何使用NATS JetStream。
配置最新版本NATS服务和nats.go客户端SDK:

# Go client latest 
go get github.com/nats-io/nats.go/@latest

# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2

例子演示

在这个例子中,使用Stream ”ORDERS“作为主题”ORDERS.的流。为了演示,我们用“ORDERS.created"和"ORDERS.approved"主题来发布消息。一个系统用“ORDERS.created"主题发布消息。一个基于Pull消费者将订阅该主题的消息,并基于这个事件做一些处理之后,进一步在"ORDERS.approved"主题上发布消息。另一个基于Push的消费者使用“ORDERS.”通配符订阅消息,因此所有“ORDERS”上的流都被订阅。

使用JetStreamContext创建流

我们首先创建一个流“ORDERS”,用于主题“ORDERS.*”。您可以使用管理工具(如NATS CLI)或使用NATS客户端sdk来创建Stream。这里我们使用nats.go创建Stream。

const (
   streamName  = "ORDERS"
   streamSubjects = "ORDERS.*"
)
// 使用JetStreamContext创建流
func createStream(js nats.JetStreamContext) error {
   // 检查流是否存在; 不存在就创建
   stream, err := js.StreamInfo(streamName)
   if err != nil {
      log.Println(err)
   }
   if stream == nil {
      log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
      _, err = js.AddStream(&nats.StreamConfig{
         Name:     streamName,
         Subjects: []string{streamSubjects},
      })
      if err != nil {
         return err
      }
   }
   return nil
}

在上面的代码块中,如果流不存在,我们就使用JetStreamContext的AddStream方法创建一个新的流。JetStreamContext可以通过调用NATS连接对象的JetStream方法来创建。
从NATS连接创建JetStreamContext,然后调用createStream方法创建一个Stream。

// 连接NATS 
nc, _ := nats.Connect(nats.DefaultURL)
// 创建JetStreamContext
js, err := nc.JetStream()
err = createStream(js)

事件发布流

下面的代码块以“ORDERS”为主题发布消息(事件流),目的是“让消费者系统知道新订单被放到了我们的分布式系统环境中,这样消费者系统就可以通过执行自己的操作和发布其他事件集来响应这些事件。
发布主题为“ORDERS.created”的消息:

const (
   subjectName ="ORDERS.created"
)
// createOrder 使用 "ORDERS.created" 主题来发布事件流
func createOrder(js nats.JetStreamContext) error{
   var order model.Order
   for i := 1; i <= 10; i++ {
      order = model.Order{
         OrderID:    i,
         CustomerID: "Cust-" + strconv.Itoa(i),
         Status:     "created",
      }
      orderJSON, _ := json.Marshal(order)
      _, err := js.Publish(subjectName, orderJSON)
      if err!=nil {
         return err
      }
      log.Printf("Order with OrderID:%d has been published\n",i)
   }
   return nil
}

下面是在我们的例子中使用的Order结构体:

type Order struct {
   OrderID    int
   CustomerID string
   Status     string
}

下面是示例完整代码块,它创建了Stream并在主题“ORDERS.created”上发布消息:

package main
import (
 "encoding/json"
 "log"
 "strconv"

 "github.com/nats-io/nats.go"

 "github.com/shijuvar/go-distsys/jsdemo/model"
)

const (
 streamName  = "ORDERS"
 streamSubjects = "ORDERS.*"
 subjectName ="ORDERS.created"
)

func main() {
 // 连接NATS
 nc, _ := nats.Connect(nats.DefaultURL)
 // 创建JetStreamContext
 js, err := nc.JetStream()
 checkErr(err)
 // 创建stream流
 err = createStream(js)
 checkErr(err)
 // 通过发布消息创建订单
 err= createOrder(js)
 checkErr(err)
}

// createOrder 以 "ORDERS.created"主题发布事件流
func createOrder(js nats.JetStreamContext) error{
 var order model.Order
 for i := 1; i <= 10; i++ {
    order = model.Order{
       OrderID:    i,
       CustomerID: "Cust-" + strconv.Itoa(i),
       Status:     "created",
    }
    orderJSON, _ := json.Marshal(order)
    _, err := js.Publish(subjectName, orderJSON)
    if err!=nil {
       return err
    }
    log.Printf("Order with OrderID:%d has been published\n",i)
 }
 return nil
}

// createStream 使用JetStreamContext创建流
func createStream(js nats.JetStreamContext) error {
 // Check if the ORDERS stream already exists; if not, create it.
 stream, err := js.StreamInfo(streamName)
 if err != nil {
    log.Println(err)
 }
 if stream == nil {
    log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
    _, err = js.AddStream(&nats.StreamConfig{
       Name:     streamName,
       Subjects: []string{streamSubjects},
    })
    if err != nil {
       return err
    }
 }
 return nil
}

func checkErr(err error) {
 if err != nil {
    log.Fatal(err)
 }
}

基于拉的消费者对事件的反应

一个基于Pull的消费者从主题“ORDERS”中订阅消息的例子。最终通过主题“ORDERS.approved”发布消息。

package main

import (
   "context"
   "encoding/json"
   "log"
   "time"

   "github.com/nats-io/nats.go"

   "github.com/shijuvar/go-distsys/jsdemo/model"
)

const (
   subSubjectName ="ORDERS.created"
   pubSubjectName ="ORDERS.approved"

)
func main() {
   // Connect to NATS
   nc, _ := nats.Connect(nats.DefaultURL)
   js, err := nc.JetStream()
   if err != nil {
      log.Fatal(err)
   }
   // Create Pull based consumer with maximum 128 inflight.
   // PullMaxWaiting defines the max inflight pull requests.
   sub, _ := js.PullSubscribe(subSubjectName, "order-review", nats.PullMaxWaiting(128))
   ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
   defer cancel()

   for {
      select {
      case <-ctx.Done():
         return
      default:
      }
      msgs, _ := sub.Fetch(10, nats.Context(ctx))
      for _, msg := range msgs {
         msg.Ack()
         var order model.Order
         err := json.Unmarshal(msg.Data, &order)
         if err != nil {
            log.Fatal(err)
         }
         log.Println("order-review service")
         log.Printf("OrderID:%d, CustomerID: %s, Status:%s\n", order.OrderID, order.CustomerID, order.Status)
         reviewOrder(js,order)
      }
   }
}

// reviewOrder reviews the order and publishes ORDERS.approved event
func reviewOrder(js nats.JetStreamContext, order model.Order) {
   // Changing the Order status
   order.Status ="approved"
   orderJSON, _ := json.Marshal(order)
   _, err := js.Publish(pubSubjectName, orderJSON)
   if err != nil {
      log.Fatal(err)
   }
   log.Printf("Order with OrderID:%d has been %s\n",order.OrderID, order.Status)
}

JetStreamContext的PullSubscribe方法创建了一个可以获取消息的订阅。它将返回一个Subscription对象。Subscription的Fetch方法从流中为Pull消费者提取一批消息。msg.Ack()向服务器发出手动确认。

基于Push的消费者使用通配符订阅

在下面示例中,另一个消费者是一个基于Push的订阅者,它使用通配符订阅消息。

package main

import (
   "encoding/json"
   "log"
   "runtime"

   "github.com/nats-io/nats.go"

   "github.com/shijuvar/go-distsys/jsdemo/model"
)

func main() {
   // Connect to NATS
   nc, _ := nats.Connect(nats.DefaultURL)
   js, err := nc.JetStream()
   if err != nil {
      log.Fatal(err)
   }
   // Create durable consumer monitor
   js.Subscribe("ORDERS.*", func(msg *nats.Msg) {
      msg.Ack()
      var order model.Order
      err := json.Unmarshal(msg.Data, &order)
      if err != nil {
         log.Fatal(err)
      }

      log.Printf("monitor service subscribes from subject:%s\n", msg.Subject)
      log.Printf("OrderID:%d, CustomerID: %s, Status:%s\n", order.OrderID, order.CustomerID, order.Status)
   }, nats.Durable("monitor"),nats.ManualAck())

   runtime.Goexit()

}

对于已经在NATS和NATS流媒体服务器上工作过的NATS开发人员来说,订阅API是一个非常熟悉的API。

源码

https://github.com/shijuvar/go-distsys/tree/master/jsdemo

总结

当我们考虑使用分布式流系统作为构建分布式系统、基于微服务的分布式系统、基于物联网的系统、下一代Edge系统的核心架构时,可以考虑使用NATS JetStream。在未来的计算中,处理大量的事件流和消息流将是一个巨大的挑战,特别是边缘计算。 NATS JetStream提供了分布式安全、多租户和水平扩展能力。在NATS JetStream的早期版本中,一些设计目标没有通过集群选项实现。但随着后来的发布和改进,我希望NATS JetStream能够实现所有的设计目标,成为一种极具竞争力的流技术。考虑到简单性和适应性边缘架构,我肯定会选择NATS JetStream而不是Kafka,并等待这个技术更加成熟。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » 基于NATS JetStream构建分布式事件流系统

一个分享Java & Python知识的社区