程序员社区

微服务总结(下)

六、服务网关 SpringCloud Gateway

在微服务架构中,一个系统会被拆分为很多个微服务。那么作为客户端要如何去调用这么多的微服务呢?

如果没有网关的存在,我们只能在客户端记录每个微服务的地址,然后分别去调用。

微服务总结(下)插图

这样的话,会有很多问题:

  • 客户端多次请求不同的微服务,增加客户端代码或配置编写的复杂性
  • 认证复杂,每个服务都需要独立认证。
  • 各个微服务都存在跨域请求,在一定场景下处理相对复杂。

这些问题,我们就可以采用网关来解决。

所谓的API网关,就是指系统的统一入口,它封装了应用程序的内部结构,为客户端提供统一服务,一些与业务本身功能无关的公共逻辑可以在这里实现,诸如认证、鉴权、监控、路由转发等等。

添加上API网关之后,系统的架构图变成了如下所示:

微服务总结(下)插图1

6.1 目前可以使用的网关

Ngnix+lua

使用nginx的反向代理和负载均衡可实现对api服务器的负载均衡及高可用

lua是一种脚本语言,可以来编写一些简单的逻辑, nginx支持lua脚本

Kong

基于Nginx+Lua开发,性能高,稳定,有多个可用的插件(限流、鉴权等等)可以开箱即用。 问题:只支持Http协议;二次开发,自由扩展困难;提供管理API,缺乏更易用的管控、配置方式。

Zuul

Netflix开源的网关,功能丰富,使用JAVA开发,易于二次开发。问题:缺乏管控,无法动态配置;依赖组件较多;处理Http请求依赖的是Web容器,性能不如Nginx。

Spring Cloud Gateway

Spring Cloud Gateway是Spring公司基于Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。它的目标是替代 Netflix Zuul,其不仅提供统一的路由方式,并且基于 Filter 链的方式提供了网关基本的功能,例如:安全,监控和限流。

优点:

  • 性能强劲:是Zuul1.0的1.6倍
  • 功能强大:内置了很多实用的功能,例如转发、监控、限流等
  • 设计优雅,容易扩展

缺点:

  • 其实现依赖Netty与WebFlux,不是传统的Servlet编程模型,学习成本高
  • 不能将其部署在Tomcat、Jetty等Servlet容器里,只能打成jar包执行
  • 需要Spring Boot 2.0及以上的版本,才支持

6.3 快速开始

基础版

1、创建api-gateway模块

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

2、添加配置文件

server:
  port: 7000
spring:
  application:
    name: api-gateway
  cloud:
    gateway:
      routes:
        - id: product_route
          uri: http://127.0.0.1:8081
          order: 1
          predicates: # 断言,当路由满足全部断言时进行转发
            - Path=/product-serv/**
          filters:  # 过滤器
            - StripPrefix=1 # 删除一级路径

3、启动网关和商品微服务,请求 http://127.0.0.1:7000/product-serv/product/1

微服务总结(下)插图2

接入注册中心nacos

1、引入nacos依赖

<!--nacos客户端-->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>

2、修改配置文件

server:
  port: 7000
spring:
  application:
    name: api-gateway
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # 将gateway注册到nacos
    gateway:
      discovery:
        locator:
          enabled: true # 让gateway从nacos中获取服务信息
      routes:
        - id: product_route
          uri: lb://service-product
#          uri: http://127.0.0.1:8081
          order: 1
          predicates: # 断言,当路由满足全部断言时进行转发
            - Path=/product-serv/**
          filters:  # 过滤器
            - StripPrefix=1 # 删除一级路径

3、请求 http://127.0.0.1:7000/product-serv/product/1

6.4 Gateway的基本概念&请求处理流程

基本概念

路由(Route) 是 gateway 中最基本的组件之一,表示一个具体的路由信息载体。主要定义了下面的几个信息:

  • id,路由标识符,区别于其他 Route。
  • uri,路由指向的目的地 uri,即客户端请求最终被转发到的微服务。
  • order,用于多个 Route 之间的排序,数值越小排序越靠前,匹配优先级越高。
  • predicate,断言的作用是进行条件判断,只有断言都返回真,才会真正的执行路由。 、- filter,过滤器用于修改请求和响应信息。

请求处理流程

微服务总结(下)插图3
  1. Gateway Client向Gateway Server发送请求
  2. 请求首先会被HttpWebHandlerAdapter进行提取组装成网关上下文
  3. 然后网关的上下文会传递到DispatcherHandler,它负责将请求分发给RoutePredicateHandlerMapping
  4. RoutePredicateHandlerMapping负责路由查找,并根据路由断言判断路由是否可用
  5. 如果过断言成功,由FilteringWebHandler创建过滤器链并调用
  6. 请求会一次经过PreFilter--微服务--PostFilter的方法,最终返回响应

6.5 断言

6.5.1 内置断言工厂

微服务总结(下)插图4
基于Datetime类型的断言工厂

此类型的断言根据时间做判断,主要有三个:

AfterRoutePredicateFactory: 接收一个日期参数,判断请求日期是否晚于指定日期

BeforeRoutePredicateFactory: 接收一个日期参数,判断请求日期是否早于指定日期

BetweenRoutePredicateFactory: 接收两个日期参数,判断请求日期是否在指定时间段内

-After=2019-12-31T23:59:59.789+08:00[Asia/Shanghai]
基于远程地址的断言工厂

RemoteAddrRoutePredicateFactory:接收一个IP地址段,判断请求主机地址是否在地址段中

-RemoteAddr=192.168.1.1/24
基于Cookie的断言工厂

CookieRoutePredicateFactory:接收两个参数,cookie 名字和一个正则表达式。 判断请求cookie是否具有给定名称且值与正则表达式匹配。

-Cookie=chocolate, ch.
基于Header的断言工厂

HeaderRoutePredicateFactory:接收两个参数,标题名称和正则表达式。判断请求Header是否具有给定名称且值与正则表达式匹配。

-Header=X-Request-Id, \d+
基于Host的断言工厂

HostRoutePredicateFactory:接收一个参数,主机名模式。判断请求的Host是否满足匹配规则。

-Host=**.testhost.org
基于Method请求方法的断言工厂

MethodRoutePredicateFactory:接收一个参数,判断请求类型是否跟指定的类型匹配。

-Method=GET
基于Path请求路径的断言工厂

PathRoutePredicateFactory:接收一个参数,判断请求的URI部分是否满足路径规则。

-Path=/foo/{segment}
基于Query请求参数的断言工厂

QueryRoutePredicateFactory :接收两个参数,请求param和正则表达式,判断请求参数是否具有给定名称且值与正则表达式匹配。

-Query=baz, ba.
基于路由权重的断言工厂

WeightRoutePredicateFactory:接收一个[组名,权重], 然后对于同一个组内的路由按照权重转发

routes:
    -id: weight_route1 
     uri: host1 
     predicates: 
        -Path=/product/**
        -Weight=group3, 1
    -id: weight_route2
     uri: host2
     predicates: 
        -Path=/product/**
        -Weight= group3, 9

6.5.2 自定义断言工厂

我们来设定一个场景: 假设我们的应用仅仅让age在(min,max)之间的人来访问。

1、在配置文件中,添加一个Age的断言配置

server:
  port: 7000
spring:
  application:
    name: api-gateway
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # 将gateway注册到nacos
    gateway:
      discovery:
        locator:
          enabled: true # 让gateway从nacos中获取服务信息
      routes:
        - id: product_route
          uri: lb://service-product
#          uri: http://127.0.0.1:8081
          order: 1
          predicates: # 断言,当路由满足全部断言时进行转发
            - Path=/product-serv/**
            - Age=18,60 # ********看这里
          filters:  # 过滤器
            - StripPrefix=1 # 删除一级路径

2、新建一个断言工厂,实现断言方法

//这是一个自定义的路由断言工厂类,要求有两个
//1 名字必须是 配置+RoutePredicateFactory
//2 必须继承AbstractRoutePredicateFactory<配置类>
@Component
public class AgeRoutePredicateFactory extends AbstractRoutePredicateFactory<AgeRoutePredicateFactory.Config> {

    //构造函数
    public AgeRoutePredicateFactory() {
        super(Config.class);
    }

    //读取配置文件的中参数值 给他赋值到配置类中的属性上
    public List<String> shortcutFieldOrder() {
        //这个位置的顺序必须跟配置文件中的值的顺序对应
        return Arrays.asList("minAge", "maxAge");
    }

    //断言逻辑
    public Predicate<ServerWebExchange> apply(Config config) {
        return new Predicate<ServerWebExchange>() {
            @Override
            public boolean test(ServerWebExchange serverWebExchange) {
                //1 接收前台传入的age参数
                String ageStr = serverWebExchange.getRequest().getQueryParams().getFirst("age");

                //2 先判断是否为空
                if (StringUtils.isNotEmpty(ageStr)) {
                    //3 如果不为空,再进行路由逻辑判断
                    int age = Integer.parseInt(ageStr);
                    if (age < config.getMaxAge() && age > config.getMinAge()) {
                        return true;
                    } else {
                        return false;
                    }
                }
                return false;
            }
        };
    }

    //配置类,用于接收配置文件中的对应参数
    @Data
    @NoArgsConstructor
    public static class Config {
        private int minAge;//18
        private int maxAge;//60
    }
}

3、重启网关服务,访问:

http://localhost:7000/product-serv/product/1?age=30

http://localhost:7000/product-serv/product/1?age=10

6.6 过滤器

作用:在请求的传递过程中,对请求和响应做一些手脚。

在Gateway中,Filter的生命周期只有两个:

  1. PRE:这种过滤器在请求被路由之前调用。我们可利用这种过滤器实现身份验证、在集群中选择请求的微服务、记录调试信息等。
  2. POST:这种过滤器在路由到微服务以后执行。这种过滤器可用来为响应添加标准的HTTP Header、收集统计信息和指标、将响应从微服务发送给客户端等。

Gateway 的Filter从作用范围可分为两种:

  1. GatewayFilter:应用到单个路由或者一个分组的路由上。
  2. GlobalFilter:应用到所有的路由上。

6.6.1 局部过滤器

局部过滤器是针对单个路由的过滤器。

内置局部过滤器
过滤器工厂 作用 参数
AddRequestHeader 为原始请求添加Header Header的名称及值
AddRequestParameter 为原始请求添加请求参数 参数名称及值
AddResponseHeader 为原始响应添加Header Header的名称及值
DedupeResponseHeader 剔除响应头中重复的值 需要去重的Header名称及去重策略
Hystrix 为路由引入Hystrix的断路器保护 HystrixCommand的名称
FallbackHeaders 为fallbackUri的请求头中添加具体的异常信息 Header的名称
PrefixPath 为原始请求路径添加前缀 前缀路径
PreserveHostHeader 为请求添加一个preserveHostHeader=true的属性,路由过滤器会检查该属性以决定是否要发送原始的Host
RequestRateLimiter 用于对请求限流,限流算法为令牌桶 keyResolver、rateLimiter、statusCode、denyEmptyKey、emptyKeyStatus
RedirectTo 将原始请求重定向到指定的URL http状态码及重定向的url
RemoveHopByHopHeadersFilter 为原始请求删除IETF组织规定的一系列Header 默认就会启用,可以通过配置指定仅删除哪些Header
RemoveRequestHeader 为原始请求删除某个Header Header名称
RemoveResponseHeader 为原始响应删除某个Header Header名称
RewritePath 重写原始的请求路径 原始路径正则表达式以及重写后路径的正则表达式
RewriteResponseHeader 重写原始响应中的某个Header Header名称,值的正则表达式,重写后的值
SaveSession 在转发请求之前,强制执行WebSession::save操作
secureHeaders 为原始响应添加一系列起安全作用的响应头 无,支持修改这些安全响应头的值
SetPath 修改原始的请求路径 修改后的路径
SetResponseHeader 修改原始响应中某个Header的值 Header名称,修改后的值
SetStatus 修改原始响应的状态码 HTTP 状态码,可以是数字,也可以是字符串
StripPrefix 用于截断原始请求的路径 使用数字表示要截断的路径的数量
Retry 针对不同的响应进行重试 retries、statuses、methods、series
RequestSize 设置允许接收最大请求包的大小。如果请求包大小超过设置的值,则返回 413 Payload Too Large 请求包大小,单位为字节,默认值为5M
ModifyRequestBody 在转发请求之前修改原始请求体内容 修改后的请求体内容
ModifyResponseBody 修改原始响应体的内容 修改后的响应体内容
Default 为所有路由添加过滤器 过滤器工厂名称及值

Tips:每个过滤器工厂都对应一个实现类,并且这些类的名称必须以GatewayFilterFactory结尾,这是Spring Cloud Gateway的一个约定,例如AddRequestHeader对应的实现类为AddRequestHeaderGatewayFilterFactory

自定义局部过滤器

1、在配置文件中,添加一个Log的过滤器配置

server:
  port: 7000
spring:
  application:
    name: api-gateway
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848 # 将gateway注册到nacos
    gateway:
      discovery:
        locator:
          enabled: true # 让gateway从nacos中获取服务信息
      routes:
        - id: product_route
          uri: lb://service-product
#          uri: http://127.0.0.1:8081
          order: 1
          predicates: # 断言,当路由满足全部断言时进行转发
            - Path=/product-serv/**
#            - Age=18,60
          filters:  # 过滤器
            - StripPrefix=1 # 删除一级路径
            - Log=true,false # 控制日志是否开启

2、自定义一个过滤器工厂

//自定义局部过滤器
@Component
public class LogGatewayFilterFactory
        extends AbstractGatewayFilterFactory<LogGatewayFilterFactory.Config> {

    //构造函数
    public LogGatewayFilterFactory() {
        super(Config.class);
    }

    //读取配置文件中的参数 赋值到 配置类中
    @Override
    public List<String> shortcutFieldOrder() {
        return Arrays.asList("consoleLog", "cacheLog");
    }

    //过滤器逻辑
    @Override
    public GatewayFilter apply(Config config) {
        return new GatewayFilter() {
            @Override
            public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
                if (config.isCacheLog()) {
                    System.out.println("cacheLog已经开启了....");
                }
                if (config.isConsoleLog()) {
                    System.out.println("consoleLog已经开启了....");
                }

                return chain.filter(exchange);
            }
        };
    }

    //配置类 接收配置参数
    @Data
    @NoArgsConstructor
    public static class Config {
        private boolean consoleLog;
        private boolean cacheLog;
    }
}

3、访问 http://localhost:7000/product-serv/product/1 查看控制台

6.6.2 全局过滤器

全局过滤器作用于所有路由,无需配置。通过全局过滤器可以实现对权限的统一校验,安全性验证等功能。

内置全局过滤器
微服务总结(下)插图5

其中LBCFilter在我们写uri: lb://service-product时,已经使用到了。

自定义全局过滤器

内置的过滤器已经可以完成大部分的功能,但是对于企业开发的一些业务功能处理,还是需要我们自己编写过滤器来实现的,那么我们一起通过代码的形式自定义一个过滤器,去完成统一的权限校验。

eg。实现服务鉴权

当客户端第一次请求服务时,服务端对用户进行信息认证(登录)

认证通过,将用户信息进行加密形成token,返回给客户端,作为登录凭证

以后每次请求,客户端都携带认证的token

服务端对token进行解密,判断是否有效。

微服务总结(下)插图6

1、实现GlobalFilter, Ordered 接口,书写自己的鉴权逻辑

//自定义全局过滤器需要实现GlobalFilter和Ordered接口
@Component
public class AuthGlobalFilter implements GlobalFilter, Ordered {

    //完成判断逻辑
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        String token = exchange.getRequest().getQueryParams().getFirst("token");
        if (!StringUtils.equals(token, "admin")) {
            System.out.println("鉴权失败");
            exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
            return exchange.getResponse().setComplete();
        }
        //调用chain.filter继续向下游执行
        return chain.filter(exchange);
    }

    //顺序,数值越小,优先级越高
    @Override
    public int getOrder() {
        return 0;
    }
}

2、访问

http://localhost:7000/product-serv/product/1

http://localhost:7000/product-serv/product/1?token=admin

6.7 网关限流(感觉用途不大)

网关是所有请求的公共入口,所以可以在网关进行限流,而且限流的方式也很多,我们本次采用前面学过的Sentinel组件来实现网关的限流。Sentinel支持对SpringCloud Gateway、Zuul等主流网关进行限流。

微服务总结(下)插图7

从1.6.0版本开始,Sentinel提供了SpringCloud Gateway的适配模块,可以提供两种资源维度的限流:

  • route维度:即在Spring配置文件中配置的路由条目,资源名为对应的routeId
  • 自定义API维度:用户可以利用Sentinel提供的API来自定义一些API分组

路由维度

1、引入sentinel依赖

<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-spring-cloud-gateway-adapter</artifactId>
</dependency>

2、编写配置类

基于Sentinel的Gateway限流是通过其提供的Filter来完成的,使用时只需注入对应的 SentinelGatewayFilter实例以及 SentinelGatewayBlockExceptionHandler 实例即可。

代码见 cn.x5456.gateway.config.GatewayConfiguration

3、在一秒钟内多次访问http://localhost:7000/product-serv/product/1就可以看到限流启作用了。

API维度

代码见 cn.x5456.gateway.config.GatewayConfiguration

七、链路追踪 Sleuth

在大型系统的微服务化构建中,一个系统被拆分成了许多模块。这些模块负责不同的功能,组合成系统,最终可以提供丰富的功能。在这种架构中,一次请求往往需要涉及到多个服务。互联网应用构建在不同的软件模块集上,这些软件模块,有可能是由不同的团队开发、可能使用不同的编程语言来实现、有可能布在了几千台服务器,横跨多个不同的数据中心,也就意味着这种架构形式也会存在一些问题:

  • 如何快速发现问题?
  • 如何判断故障影响范围?
  • 如何梳理服务依赖以及依赖的合理性?
  • 如何分析链路性能问题以及实时容量规划?

分布式链路追踪(Distributed Tracing),就是将一次分布式请求还原成调用链路,进行日志记 录,性能监控并将一次分布式请求的调用情况集中展示。比如各个服务节点上的耗时、请求具体到达哪 台机器上、每个服务节点的请求状态等等。

7.1 常用链路追踪技术

cat 由大众点评开源,基于Java开发的实时应用监控平台,包括实时应用监控,业务监控。集成方案是通过代码埋点的方式来实现监控,比如: 拦截器,过滤器等。对代码的侵入性很大,集成成本较高。风险较大。

zipkin 由Twitter公司开源,开放源代码分布式的跟踪系统,用于收集服务的定时数据,以解决微服务架构中的延迟问题,包括:数据的收集、存储、查找和展现。该产品结合spring-cloud-sleuth 使用较为简单,集成很方便,但是功能较简单。

pinpoint Pinpoint是韩国人开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI功能强大,接入端无代码侵入。

SkyWalking是本土开源的基于字节码注入的调用链分析,以及应用监控分析工具。特点是支持多种插件,UI功能较强,接入端无代码侵入。目前已加入Apache孵化器。

SpringCloud Sleuth 提供的分布式系统中链路追踪解决方案。

注:我们可以使用SpringCloud Sleuth收集链路中的日志信息,交给zipkin来展示可视化界面

7.2 SpringCloud Sleuth 基本概念 & 快速开始

基本概念

SpringCloud Sleuth主要功能就是在分布式系统中提供追踪解决方案。它大量借用了Google Dapper的设计,先来了解一下Sleuth中的术语和相关概念。

Trace

由一组Trace Id相同的Span串联形成一个树状结构。为了实现请求跟踪,当请求到达分布式系统的入口端点时,只需要服务跟踪框架为该请求创建一个唯一的标识(即TraceId),同时在分布式系统内部流转的时候,框架始终保持传递该唯一值,直到整个请求的返回。那么我们就可以使用该唯一标识将所有的请求串联起来,形成一条完整的请求链路。

Span

代表了一组基本的工作单元。为了统计各处理单元的延迟,当请求到达各个服务组件的时 候,也通过一个唯一标识(SpanId)来标记它的开始、具体过程和结束。通过SpanId的开始和结束时间戳,就能统计该span的调用时间,除此之外,我们还可以获取如事件的名称。请求信息等元数据。

Annotation

用它记录一段时间内的事件,内部使用的重要注释:

  • cs(Client Send)客户端发出请求,开始一个请求的生命
  • sr(Server Received)服务端接受到请求开始进行处理, sr-cs = 网络延迟(服务调用的时间)
  • ss(Server Send)服务端处理完毕准备发送到客户端,ss - sr = 服务器上的请求处理时间
  • cr(Client Reveived)客户端接受到服务端的响应,请求结束。 cr - sr = 请求的总时间

QuickStart

1、在api-gateway、shop-order、shop-product微服务中引入sleuth依赖

<!--链路追踪-->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-sleuth</artifactId>
</dependency>

2、启动3个微服务,访问 http://127.0.0.1:7000/order-serv/order/prod/2 查看日志

微服务名称, traceId, spanid,是否将链路的追踪结果输出到第三方平台
[api-gateway,3977125f73391553,3977125f73391553,false]
[service-order,3977125f73391553,57547b5bf71f8242,false] [service-product,3977125f73391553,449f5b3f3ef8d5c5,false]

通过TraceId,通过查看日志,我们可以将调用链路串起来。

但查看日志文件并不是一个很好的方法,当微服务越来越多日志文件也会越来越多,通过Zipkin可以将日志聚合,并进行可视化展示和全文检索。

7.3 集成 ZipKin

Zipkin 是 Twitter 的一个开源项目,它基于Google Dapper实现,它致力于收集服务的定时数据,以解决微服务架构中的延迟问题,包括数据的收集、存储、查找和展现。

我们可以使用它来收集各个服务器上请求链路的跟踪数据,并通过它提供的REST API接口来辅助我们查询跟踪数据以实现对分布式系统的监控程序,从而及时地发现系统中出现的延迟升高问题并找出系统性能瓶颈的根源。

除了面向开发的 API 接口之外,它也提供了方便的UI组件来帮助我们直观的搜索跟踪信息和分析请 求链路明细,比如:可以查询某段时间内各用户请求的处理时间等。

Zipkin 提供了可插拔数据存储方式:In-Memory、MySql、Cassandra 以及 Elasticsearch。

微服务总结(下)插图8

上图展示了 Zipkin 的基础架构,它主要由 4 个核心组件构成:

  • Collector:收集器组件,它主要用于处理从外部系统发送过来的跟踪信息,将这些信息转换为Zipkin内部处理的 Span 格式,以支持后续的存储、分析、展示等功能。
  • Storage:存储组件,它主要对处理收集器接收到的跟踪信息,默认会将这些信息存储在内存中,我们也可以修改此存储策略,通过使用其他存储组件将跟踪信息存储到数据库中。
  • RESTful API:API 组件,它主要用来提供外部访问接口。比如给客户端展示跟踪信息,或是外接系统访问以实现监控等。
  • Web UI:UI 组件, 基于API组件实现的上层应用。通过UI组件用户可以方便而有直观地查询和分 析跟踪信息。

Zipkin分为两端,一个是 Zipkin服务端,一个是 Zipkin客户端,客户端也就是微服务的应用。客户端会配置服务端的 URL 地址,一旦发生服务间的调用的时候,会被配置在微服务里面的 Sleuth 的监听器监听,并生成相应的 Trace 和 Span 信息发送给服务端。

7.3.1 安装 ZipKin 服务端

jar包在git中有

java -jar zipkin-server-2.12.9-exec.jar

访问 http://127.0.0.1:9411

7.3.2 客户端集成Zipkin

1、在3个微服务中添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-zipkin</artifactId>
</dependency>

2、在3个微服务中添加配置

spring:
  zipkin:
    base-url: http://127.0.0.1:9411/  #zipkin server的请求地址
    discoveryClientEnabled: false #让nacos把它当成一个URL,而不要当做服务名
  sleuth:
    sampler:
      probability: 1.0  #采样的百分比

3、重启3个微服务,访问 http://127.0.0.1:7000/order-serv/order/prod/2

4、进入zipkin,查看

微服务总结(下)插图9

7.4 持久化

Zipkin Server默认会将追踪数据信息保存到内存,但这种方式不适合生产环境。Zipkin支持将追踪数据持久化到mysql数据库或elasticsearch中。

7.4.1 持久化到MySQL中

1、创建MySQL表

CREATE TABLE IF NOT EXISTS zipkin_spans (
  `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
  `trace_id` BIGINT NOT NULL,
  `id` BIGINT NOT NULL,
  `name` VARCHAR(255) NOT NULL,
  `parent_id` BIGINT,
  `debug` BIT(1),
  `start_ts` BIGINT COMMENT 'Span.timestamp(): epoch micros used for endTs query and to implement TTL',
  `duration` BIGINT COMMENT 'Span.duration(): micros used for minDuration and maxDuration query',
  PRIMARY KEY (`trace_id_high`, `trace_id`, `id`)
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

ALTER TABLE zipkin_spans ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTracesByIds';
ALTER TABLE zipkin_spans ADD INDEX(`name`) COMMENT 'for getTraces and getSpanNames';
ALTER TABLE zipkin_spans ADD INDEX(`start_ts`) COMMENT 'for getTraces ordering and range';

CREATE TABLE IF NOT EXISTS zipkin_annotations (
  `trace_id_high` BIGINT NOT NULL DEFAULT 0 COMMENT 'If non zero, this means the trace uses 128 bit traceIds instead of 64 bit',
  `trace_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.trace_id',
  `span_id` BIGINT NOT NULL COMMENT 'coincides with zipkin_spans.id',
  `a_key` VARCHAR(255) NOT NULL COMMENT 'BinaryAnnotation.key or Annotation.value if type == -1',
  `a_value` BLOB COMMENT 'BinaryAnnotation.value(), which must be smaller than 64KB',
  `a_type` INT NOT NULL COMMENT 'BinaryAnnotation.type() or -1 if Annotation',
  `a_timestamp` BIGINT COMMENT 'Used to implement TTL; Annotation.timestamp or zipkin_spans.timestamp',
  `endpoint_ipv4` INT COMMENT 'Null when Binary/Annotation.endpoint is null',
  `endpoint_ipv6` BINARY(16) COMMENT 'Null when Binary/Annotation.endpoint is null, or no IPv6 address',
  `endpoint_port` SMALLINT COMMENT 'Null when Binary/Annotation.endpoint is null',
  `endpoint_service_name` VARCHAR(255) COMMENT 'Null when Binary/Annotation.endpoint is null'
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

ALTER TABLE zipkin_annotations ADD UNIQUE KEY(`trace_id_high`, `trace_id`, `span_id`, `a_key`, `a_timestamp`) COMMENT 'Ignore insert on duplicate';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`, `span_id`) COMMENT 'for joining with zipkin_spans';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id_high`, `trace_id`) COMMENT 'for getTraces/ByIds';
ALTER TABLE zipkin_annotations ADD INDEX(`endpoint_service_name`) COMMENT 'for getTraces and getServiceNames';
ALTER TABLE zipkin_annotations ADD INDEX(`a_type`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`a_key`) COMMENT 'for getTraces and autocomplete values';
ALTER TABLE zipkin_annotations ADD INDEX(`trace_id`, `span_id`, `a_key`) COMMENT 'for dependencies job';

CREATE TABLE IF NOT EXISTS zipkin_dependencies (
  `day` DATE NOT NULL,
  `parent` VARCHAR(255) NOT NULL,
  `child` VARCHAR(255) NOT NULL,
  `call_count` BIGINT,
  `error_count` BIGINT,
  PRIMARY KEY (`day`, `parent`, `child`)
) ENGINE=InnoDB ROW_FORMAT=COMPRESSED CHARACTER SET=utf8 COLLATE utf8_general_ci;

2、删除之前的zipkin容器,重新执行下面代码

java -jar zipkin-server-2.12.9-exec.jar --STORAGE_TYPE=mysql --MYSQL_HOST=127.0.0.1 --MYSQL_TCP_PORT=3306 --MYSQL_DB=zipkin --MYSQL_USER=root --MYSQL_PASS=5456

7.4.2 持久化到elasticsearch中

java -jar zipkin-server-2.12.9-exec.jar --STORAGE_TYPE=elasticsearch --ES-HOST=localhost:9200

八、服务配置中心 Nacos Config

微服务架构下关于配置文件存在的一些问题:

  1. 配置文件相对分散。在一个微服务架构下,配置文件会随着微服务的增多变的越来越多,而且分散在各个微服务中,不好统一配置和管理
  2. 配置文件无法区分环。微服务项目可能会有多个环境,例如:测试环境、预发布环境、生产环境。每一个环境所使用的配置理论上都是不同的,一旦需要修改,就需要我们去各个微服务下手动维护,这比较困难。
  3. 配置文件无法实时更新。我们修改了配置文件之后,必须重新启动微服务才能使配置生效,这对一个正在运行的项目来说是非常不友好的。

基于上面这些问题,我们就需要配置中心的加入来解决这些问题。

配置中心的思路是:

  1. 首先把项目中各种配置全部都放到一个集中的地方进行统一管理,并提供一套标准的接口。
  2. 当各个服务需要获取配置的时候,就来配置中心的接口拉取自己的配置。
  3. 当配置中心中的各种参数有更新的时候,也能通知到各个服务实时的过来同步最新的信息,使之动态更新。

当加入了服务配置中心之后,我们的系统架构图会变成下面这样:

微服务总结(下)插图10

8.1 常见的服务配置中心

Apollo是由携程开源的分布式配置中心。特点有很多,比如:配置更新之后可以实时生效,支持灰度发布功能,并且能对所有的配置进行版本管理、操作审计等功能,提供开放平台API。并且资料也写的很详细。

Disconf是由百度开源的分布式配置中心。它是基于Zookeeper来实现配置变更后实时通知和生效的。

SpringCloud Config是Spring Cloud中带的配置中心组件。它和Spring是无缝集成,使用起来非常方便,并且它的配置存储支持Git。不过它没有可视化的操作界面,配置的生效也不是实时的,需要重启或去刷新。

Nacos是SpingCloud alibaba技术栈中的一个组件,前面我们已经使用它做过服务注册中心。其实它也集成了服务配置的功能,我们可以直接使用它作为服务配置中心。

8.2 Nacos Config 快速开始

1、在商品微服务中引入nacos config的依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

2、新建bootstrap.yml

spring:
  application:
    name: service-product 
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:8848 #nacos中心地址
        file-extension: yaml # 配置文件格式 
  profiles:
    active: dev # 环境标识

注:配置文件优先级(由高到低):bootstrap.properties -> bootstrap.yml -> application.properties -> application.yml

3、将application.yml中的配置删除,在nacos中添加配置

微服务总结(下)插图11
微服务总结(下)插图12
server:
  port: 8081
spring:
  zipkin:
    base-url: http://127.0.0.1:9411  #zipkin server的请求地址
    discoveryClientEnabled: false #让nacos把它当成一个URL,而不要当做服务名
  sleuth:
    sampler:
      probability: 1.0  #采样的百分比
  datasource:
    driver-class-name: com.mysql.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/shop?characterEncoding=UTF-8
    username: root
    password: 5456
  jpa:
    properties:
      hibernate:
        hbm2ddl:
          auto: update
        dialect: org.hibernate.dialect.MySQL5InnoDBDialect
  cloud:
    nacos:
      discovery:
        server-addr: localhost:8848

4、重启商品微服务

8.3 配置动态刷新

在nacos中的配置中,新增下面配置:

config:
    appName: product

方式一

@RestController
public class NacosConfigController {

    @Autowired
    private ConfigurableApplicationContext applicationContext;

    @RequestMapping("/test-config1")
    public String testConfig1() {
        return applicationContext.getEnvironment().getProperty("config.appName");
    }
}

方式二

@RestController
@RefreshScope//动态刷新的注解
public class NacosConfigController {

    @Autowired
    private ConfigurableApplicationContext applicationContext;

    @Value("${config.appName}")
    private String appName;

    @RequestMapping("/test-config1")
    public String testConfig1() {
        return applicationContext.getEnvironment().getProperty("config.appName");
    }


    @RequestMapping("/test-config2")
    public String testConfig2() {
        return appName;
    }
}

注:类似数据库连接的配置修改后是不会动态更新的

8.4 配置共享

同一个微服务的不同环境之间共享配置

当配置越来越多的时候,我们就发现有很多配置是重复的,这时候就考虑可不可以将公共配置文件提取出来,然后实现共享呢?当然是可以的。接下来我们就来探讨如何实现这一功能。

如果想在同一个微服务的不同环境之间实现配置共享,其实很简单。只需要提取一个以 spring.application.name 命名的配置文件,然后将其所有环境的公共配置放在里面即可。

1、新建一个名为service-product.yaml配置存放商品微服务的公共配置

微服务总结(下)插图13

2、删除dev配置文件的这部分

3、重启系统

不同微服务中间共享配置

不同为服务之间实现配置共享的原理类似于文件引入,就是定义一个公共配置,然后在当前配置中引入。

1、在nacos中定义一个DataID为all-service.yaml(这个名字可以随便写)的配置,用于所有微服务共享

微服务总结(下)插图14
spring:
  zipkin:
    base-url: http://127.0.0.1:9411  #zipkin server的请求地址
    discoveryClientEnabled: false #让nacos把它当成一个URL,而不要当做服务名
  sleuth:
    sampler:
      probability: 1.0  #采样的百分比

2、删除商品微服务-dev的这部分配置

3、修改bootstrap.yml

spring:
  application:
    name: service-product
  cloud:
    nacos:
      config:
        server-addr: 127.0.0.1:8848 #nacos中心地址
        file-extension: yaml # 配置文件格式
        shared-dataids: all-service.yaml # 配置要引入的配置 
        refreshable-dataids: all-service.yaml # 配置要实现动态配置刷新的配置
  profiles:
    active: dev # 环境标识

4、重启微服务

8.5 nacos-config 的几个概念

命名空间(Namespace) 命名空间可用于进行不同环境的配置隔离。一般一个环境划分到一个命名空间

配置分组(Group) 配置分组用于将不同的服务可以归类到同一分组。一般将一个项目的配置分到一组

配置集(Data ID) 在系统中,一个配置文件通常就是一个配置集。一般一个微服务的配置就是一个配置集

微服务总结(下)插图15

九、分布式事务 Seata

概念见 https://www.jianshu.com/p/19492cfc71fb

9.1 Seata简介

2019 年 1 月,阿里巴巴中间件团队发起了开源项目 Fescar(Fast & EaSy Commit And Rollback),其愿景是让分布式事务的使用像本地事务的使用一样,简单和高效,并逐步解决开发者们遇到的分布式事务方面的所有难题。后来更名为 Seata,意为:Simple Extensible Autonomous Transaction Architecture,是一套分布式事务解决方案。

Seata的设计目标是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进。它把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个关系数据库的本地事务。

微服务总结(下)插图16

Seata主要由三个重要组件组成:

  • TC:Transaction Coordinator 事务协调器,管理全局的分支事务的状态,用于全局性事务的提交和回滚。
  • TM:Transaction Manager 事务管理器,用于开启、提交或者回滚全局事务。
  • RM:Resource Manager 资源管理器,用于分支事务上的资源管理,向TC注册分支事务,上报分支事务的状态,接受TC的命令来提交或者回滚分支事务。
微服务总结(下)插图17

A服务的TM向TC申请开启一个全局事务,TC就会创建一个全局事务并返回一个唯一的XID

A服务的RM向TC注册分支事务,并及其纳入XID对应全局事务的管辖

A服务执行分支事务,向数据库做操作

A服务开始远程调用B服务,此时XID会在微服务的调用链上传播

B服务的RM向TC注册分支事务,并将其纳入XID对应的全局事务的管辖

B服务执行分支事务,向数据库做操作

全局事务调用链处理完毕,TM根据有无异常向TC发起全局事务的提交或者回滚

TC协调其管辖之下的所有分支事务,决定是否回滚

Seata实现2PC与传统2PC的差别

  1. 架构层次方面,传统2PC方案的 RM 实际上是在数据库层,RM本质上就是数据库自身,通过XA协议实现,而 Seata 的RM是以jar包的形式作为中间件层部署在应用程序这一侧的。

  2. 两阶段提交方面,传统2PC无论第二阶段的决议是commit还是rollback,事务性资源的锁都要保持到Phase2完成才释放。而Seata的做法是在Phase1 就将本地事务提交,这样就可以省去Phase2 持锁的时间,整体提高效率。

?这样会不会出现线程安全问题啊

9.2 快速开始

模拟电商中的下单和扣库存的过程,我们通过订单微服务执行下单操作,然后由订单微服务调用商品微服务扣除库存。

1、新建OrderController3


@RestController
@Slf4j
public class OrderController3 {

    @Autowired
    private OrderDao orderDao;

    @Autowired
    private ProductClient productClient;


    //下单--fegin
    @RequestMapping("/order3/prod/{pid}")
    public Order order(@PathVariable("pid") Integer pid) {
        log.info("接收到{}号商品的下单请求,接下来调用商品微服务查询此商品信息", pid);

        //调用商品微服务,查询商品信息
        Product product = productClient.findByPid(pid);

        if (product.getPid() == -100) {
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }

        log.info("查询到{}号商品的信息,内容是:{}", pid, JSON.toJSONString(product));

        //下单(创建订单)
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");
        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);

        orderDao.save(order);

        log.info("创建订单成功,订单信息为{}", JSON.toJSONString(order));

        //扣库存
        productClient.reduceInventory(pid, order.getNumber());

        return order;
    }
}

2、在ProductClient中添加扣减库存

//value用于指定调用nacos下哪个微服务
@FeignClient(value = "service-product"/*, fallbackFactory = ProductServiceFallbackFactory.class*/)
public interface ProductClient {
    //@FeignClient的value +  @RequestMapping的value值  其实就是完成的请求地址  "http://service-product/product/" + pid
    //指定请求的URI部分
    @RequestMapping("/product/{pid}")
    Product findByPid(@PathVariable("pid") Integer pid);

    // 扣减库存
    @RequestMapping("/product/reduceInventory")
    void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("num") Integer num);
}

3、在商品微服务中添加扣减库存方法

@Transactional
@RequestMapping("/product/reduceInventory")
public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("num") Integer num) {
    Product product = productDao.findById(pid).get();
    product.setStock(product.getStock() - num);
    productDao.save(product);
}

4、启动项目测试一下 http://127.0.0.1:8091/order3/prod/1

5、下载Seata服务端,修改conf/registry.conf

这是注册中心和配置中心的配置

registry {
    type = "nacos"
    nacos {
        serverAddr = "localhost"
        namespace = "public"
        cluster = "default"
    }
}

config {
    type = "nacos"
    nacos {
        serverAddr = "localhost"
        namespace = "public"
        cluster = "default"
    }
}

6、删除nacos中的所有配置,将商品微服务的配置改回之前的

7、打开nacos-config.txt,添加

service.vgroup_mapping.service-product=default 
service.vgroup_mapping.service-order=default

这里的语法为:service.vgroup_mapping.{your-service-gruop}=default ,中间的{your-service-gruop} 为自己定义的服务组名称, 这里需要我们在程序的配置文件中配置。

7、初始化seata在nacos的配置

./nacos-config.sh 127.0.0.1

执行成功后可以打开Nacos的控制台,在配置列表中,可以看到初始化了很多Group为SEATA_GROUP的配置。

微服务总结(下)插图19

8、启动seata服务

cd ../bin
./seata-server.sh -p 9000 -m file

启动后在 Nacos 的服务列表下面可以看到一个名为 serverAddr 的服务。

微服务总结(下)插图20

9、在我们的数据库(每个业务库都要有)中加入一张undo_log表,这是Seata记录事务日志要用到的表

CREATE TABLE `undo_log` (
  `id`            BIGINT(20)   NOT NULL AUTO_INCREMENT,
  `branch_id`     BIGINT(20)   NOT NULL,
  `xid`           VARCHAR(100) NOT NULL,
  `context`       VARCHAR(128) NOT NULL,
  `rollback_info` LONGBLOB     NOT NULL,
  `log_status`    INT(11)      NOT NULL,
  `log_created`   DATETIME     NOT NULL,
  `log_modified`  DATETIME     NOT NULL,
  `ext`           VARCHAR(100)          DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
)
  ENGINE = InnoDB
  AUTO_INCREMENT = 1
  DEFAULT CHARSET = utf8;

10、在商品和order微服务中添加以下依赖

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>

11、在商品和order微服务中添加DataSourceProxyConfig

Seata 是通过代理数据源实现事务分支的,所以需要配置 io.seata.rm.datasource.DataSourceProxy的Bean,且是@Primary默认的数据源,否则事务不会回滚,无法实现分布式事务

@Configuration
public class DataSourceProxyConfig {
    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DruidDataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Primary
    @Bean
    public DataSourceProxy dataSource(DruidDataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }
}

12、在resources下添加Seata的配置文件 registry.conf

和上面的那个一样

registry {
    type = "nacos"
    nacos {
        serverAddr = "localhost"
        namespace = "public"
        cluster = "default"
    }
}

config {
    type = "nacos"
    nacos {
        serverAddr = "localhost"
        namespace = "public"
        cluster = "default"
    }
}

13、修改bootstarp.yml,添加如下配置

spring:
  cloud:
    nacos:
      config:
        server-addr: localhost:8848 # nacos的服务端地址
        namespace: public
        group: SEATA_GROUP
    alibaba:
      seata:
        tx-service-group: service-order # 与第7步是对应的

14、开启全局事务

@GlobalTransactional
@RequestMapping("/order3/prod/{pid}")
public Order order(@PathVariable("pid") Integer pid) {

15、添加异常

@Transactional
@RequestMapping("/product/reduceInventory")
public void reduceInventory(@RequestParam("pid") Integer pid, @RequestParam("num") Integer num) {
    Product product = productDao.findById(pid).get();
    product.setStock(product.getStock() - num);
    productDao.save(product);

    int i = 1 / 0;
}

16、测试

注意:alibaba的版本一定要切换到2.1.0.RELEASE,见5.5.1。

9.3 seata运行流程分析

微服务总结(下)插图21

1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有undo_log。

2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分支事务提交,也就释放了锁资源。

3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。

4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各各参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。

5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失 败则会重试回滚操作。

赞(0) 打赏
未经允许不得转载:IDEA激活码 » 微服务总结(下)

一个分享Java & Python知识的社区