程序员社区

关系型数据库中的事务


title: 关系型数据库中的事务
date: 2019/03/28 17:38


引言

在做国土的辅助审查系统的时候,由于需求比较特殊,需要对模型计算的结果进行进一步处理才能入库,基本的操作如下:

if (数据库不存在) {
    插入数据 a = 0
} else {
    更新数据 a = a + 1
}

如果只是单个线程执行这个操作进行,显然是没有问题的,由于模型过多,采用了多线程对他们进行入库,从而引起以下几种情况:

  • 1、数据库中插入了2条数据(按照上面代码的逻辑,只会有一条数据)
  • 2、数据库中的字段a的值不正确

而导致了上述2个问题的原因就是线程并发问题(多个线程同时操作同一个全局变量),而在这个demo中,这个全局变量就是表中的某一行数据

大家可能会想到加上synchronized字段进行限制,但是在本系统还是有以下2种问题:

  • 1、在方法中调用了 xxxRepository.save() 方法,事务提交了,它并没有立刻进行保存操作(有可能会等待1s才会真正的进行保存操作);采用xxxRepository.saveAndFlush() 方法强迫事务提交。
关系型数据库中的事务插图
  • 2、由于本项目有一个大事务,代码如下:
@Transactional
public void func() {

    // 一些保存操作
    ...

    this.func2();
}

@Transactional
public synchronized void func2() {
    if (数据库不存在) {
        插入数据 a = 0
    } else {
        更新数据 a = a + 1
    }
}

上面代码虽然看着解决了线程并发问题,但是经过大量的测试,还是发现了一些问题,主要的原因是:因为当func2方法结束的时候,事务还未提交,但是此时第2个线程挤进来了,插入或更新了数据,从而把之前的修改给覆盖了。

  • 3、如果使用了集群,那么synchronized字段是无效的(本文不介绍 可以采用zookeeper解决)

虽然上面的案例说的是线程安全问题,但是我想先说一下事务

一、本地事务

1、事务的特性

事务是指由一组操作组成的一个工作单元,这个工作单元具有原子性(atomicity)、一致性(consistency)、隔离性(isolation)和持久性(durability)。

  • 原子性:执行单元中的操作要么全部执行成功,要么全部失败。如果有一部分成功一部分失败那么成功的操作要全部回滚到执行前的状态。

  • 一致性:执行一次事务会使用数据从一个正确的状态转换到另一个正确的状态,执行前后数据都是完整的。指的是逻辑上的一致性,即所有操作是符合现实当中的期望的。

  • 隔离性:在该事务执行的过程中,任何数据的改变只存在于该事务之中,对外界没有影响,事务与事务(并发事务)之间是完全的隔离的。只有事务提交后其它事务才可以查询到最新的数据。

  • 持久性:事务完成后对数据的改变会永久性的存储起来,即使发生断电宕机数据依然在。

本地事务就是用关系数据库来控制事务,关系数据库通常都具有ACID特性,传统的单体应用通常会将数据全部存储在一个数据库中,会借助关系数据库来完成事务控制。

2、并发事务可能带来的问题

当多个线程都开启事务操作数据库中的数据时,数据库系统要能进行隔离操作,以保证各个线程获取数据的准确性,在介绍数据库提供的各种隔离级别之前,我们先看看如果不考虑事务的隔离性,会发生的几种问题:

2.1 脏读

脏读是指在一个事务处理过程里读取了另一个未提交的事务中的数据

当一个事务正在多次修改某个数据,而在这个事务中这多次的修改都还未提交,这时一个并发的事务来访问该数据,就会造成两个事务得到的数据不一致。例如:用户A向用户B转账100元,对应SQL命令如下:

update account set money=money+100 where name=’B’;  (此时A通知B)
update account set money=money - 100 where name=’A’;

当只执行第一条SQL时,A通知B查看账户,B发现确实钱已到账(此时即发生了脏读),而之后无论第二条SQL是否执行,只要该事务不提交,则所有操作都将回滚,那么当B以后再次查看账户时就会发现钱其实并没有转。

2.2 不可重复读(针对一条记录)

不可重复读是指在对于数据库中的某个数据,一个事务范围内多次查询却返回了不同的数据值,这是由于在查询间隔,被另一个事务修改并提交了。

2.3 幻读(针对记录集)

在一个事务中一次查询之后,有另一个事务进行了插入,插入的内容是满足上面查询条件的,如果这个事务当中还有第二次同样查询条件的查询,那么就会导致了第二次查询得到的结果集增多。

事务A在读某些记录的期间,其他事务对这条记录无法进行修改和删除。但是其他事务B可以通过添加一条 满足A事务的查询条件 的新纪录,来使的事务A再次读取的时候发现记录集增多了。可重复读只是保证了同一事务中原来读到的那条记录不会消失,并不能保证读的记录集不会增多。

不可重复读重点在于update和delete,而幻读的重点在于insert;不可重复读针对一条记录,幻读针对一个结果集

参见本文的橡皮擦用户的回答。

3、MySQL提供的事务隔离级别

3.1 读未提交

最低级别,任何情况都无法保证

3.2 读已提交(Oracle默认级别)

事务成功提交后才可以被查询到。

3.3 可重复读(MySQL默认级别)

在事务执行期间会锁定该事务以任何方式引用的所有行,其他事务对这条记录无法进行修改和删除。

3.4 串行化

强制的进行排序,在每个读读数据行上添加共享锁。会导致大量超时现象和锁竞争。(把整个表锁住)

3.5 Oracle的事务隔离级别

Oracle数据库支持读已提交和串行化

引言中的那2个问题,第二个可以通过【可重复读】来解决,第一个可以通过【串行化】来解决。

我讨厌Oracle!

4、事务隔离级别与并发事务问题之间的关系

隔离级别\并发事务问题 脏读 不可重复读 幻读
读未提交 可能 可能 可能
读已提交 不可能 可能 可能
可重复读 不可能 不可能 可能
串行化 不可能 不可能 不可能

5、锁

5.1 锁的概念

锁是网络数据库中的一个非常重要的概念,当多个用户同时对数据库并发操作时,会带来数据不一致的问题,所以,锁主要用于并发环境下保证数据库完整性和一致性

锁(LOCKING)是最常用的并发控制机构。是对其他事务访问指定的资源进行控制、实现并发控制的一种主要手段。锁是事务对某个数据库中的资源(如表和记录)存取前,先向系统提出请求,封锁该资源,事务获得锁后,即取得对数据的控制权,在事务释放它的锁之前,其他事务不能更新此数据。当事务撤消后,释放被 锁定的资源。
当一个用户锁住数据库中的某个对象时,其他用户就不能再访问该对象

5.2 锁的分类

从数据库系统角度分为三种:排他锁、共享锁、更新锁。
从程序员角度分为两种:一种是悲观锁,一种乐观锁。

5.3 悲观锁(Pessimistic Lock)

每次去拿数据的时候都认为别人会修改,所以每次在拿数据的时候都会上锁,这样别人拿这个数据就会block(阻塞),直到它拿锁。

在关系数据库管理系统里,悲观并发控制(又名“悲观锁”,Pessimistic Concurrency Control,缩写“PCC”)是一种并发控制的方法。它可以阻止一个事务以影响其他用户的方式来修改数据。如果一个事务执行的操作读某行数据应用了锁,那只有当这个事务把锁释放,其他事务才能够执行与该锁冲突的操作。

悲观并发控制主要用于数据争用激烈的环境,以及发生并发冲突时使用锁保护数据的成本要低于回滚事务的成本的环境中。 —— Wiki

5.3.1 按使用性质划分

5.3.1.1 共享锁(Share Lock)

S锁,也叫读锁,用于所有的只读数据操作。共享锁是非独占的,允许多个并发事务读取其锁定的资源**。

性质:

    1. 多个事务可封锁同一个共享页;
    1. 任何事务都不能修改该页;
    1. 通常是该页被读取完毕,S锁立即被释放。

通俗的说就是多个事务可以读同一页数据,但是都不能进行修改这一页的数据,直到这一页的S锁被释放。

在SQL Server中,默认情况下,数据被读取后,立即释放共享锁。
例如,执行查询语句“SELECT * FROM my_table”时,首先锁定第一页,读取之后,释放对第一页的锁定,然后锁定第二页。这样,就允许在读操作过程中,修改未被锁定的第一页。
例如,语句“SELECT * FROM my_table HOLDLOCK”就要求在整个查询过程中,保持对表的锁定,直到查询完成才释放锁定。

5.3.1.2 排他锁(Exclusive Lock)

X锁,也叫写锁,表示对数据进行写操作。如果一个事务对对象加了排他锁,其他事务就不能再给它加任何锁了。(某个顾客把试衣间从里面反锁了,其他顾客想要使用这个试衣间,就只有等待锁从里面打开了。)

性质:

    1. 仅允许一个事务封锁此页;
    1. 其他任何事务必须等到X锁被释放才能对该页进行访问;
    1. X锁一直到事务结束才能被释放。

仅允许一个事务对这一页进行操作,直到这一页被操作完才能进行访问。

产生排他锁的SQL语句如下:select * from ad_plan for update;

5.3.1.3 更新锁(Update Lock)

U锁,在修改操作的初始化阶段用来锁定可能要被修改的资源,这样可以避免使用共享锁造成的死锁现象。

因为当使用共享锁时,修改数据的操作分为两步:

    1. 首先获得一个共享锁,读取数据。
    1. 然后将共享锁升级为排他锁,再执行修改操作。

这样如果有两个或多个事务同时对一个事务申请了共享锁,在修改数据时,这些事务都要将共享锁升级为排他锁。这时,这些事务都不会释放共享锁,而是一直等待对方释放,这样就造成了死锁。
如果一个数据在修改前直接申请更新锁,在数据修改时再升级为排他锁,就可以避免死锁。

性质:

    1. 用来预定要对此页施加X锁,它允许其他事务读,但不允许再施加U锁或X锁
    1. 当被读取的页要被更新时,则升级为X锁;
    1. U锁一直到事务结束时才能被释放。

避免了从共享锁升级为排他锁时造成的死锁问题,预先加锁,并不允许其它事务加锁。

5.3.2 按作用范围划分

  • 行锁:锁的作用范围是行级别。

  • 表锁:锁的作用范围是整张表。

数据库能够确定那些行需要锁的情况下使用行锁,如果不知道会影响哪些行的时候就会使用表锁。

举个例子,一个用户表user,有主键id和用户生日birthday。

当你使用update … where id=?这样的语句时,数据库明确知道会影响哪一行,它就会使用行锁;

当你使用update … where birthday=?这样的的语句时,因为事先不知道会影响哪些行就可能会使用表锁。

5.3.3 优点

悲观并发控制(悲观锁)实际上是“先取锁再访问”的保守策略,为数据处理的安全提供了保证。 —— Wiki

5.3.4 缺点

在效率方面,处理加锁的机制会让数据库产生额外的开销,还有增加产生死锁的机会;另外,在只读型事务处理中由于不会产生冲突,也没必要使用锁,这样做只能增加系统负载;还有会降低了并行性,一个事务如果锁定了某行数据,其他事务就必须等待该事务处理完才可以处理那行数据。 —— Wiki

5.4 乐观锁(Optimistic Lock)

每次去拿数据的时候都认为别人不会修改,所以,不会上锁。但是在更新的时候会判断一下在此期间别人有没有更新这个数据,可以使用版本号等机制实现

乐观并发控制(又名“乐观锁”,Optimistic Concurrency Control,缩写“OCC”)是一种并发控制的方法。它假设多用户并发的事务在处理时不会彼此互相影响,各事务能够在不产生锁的情况下处理各自影响的那部分数据。在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。如果其他事务有更新的话,正在提交的事务会进行回滚。

乐观并发控制多数用于数据争用不大、冲突较少的环境中,这种环境中,偶尔回滚事务的成本会低于读取数据时锁定数据的成本,因此可以获得比其他并发控制方法更高的吞吐量。 —— Wiki

乐观锁( Optimistic Locking ):相对悲观锁而言,乐观锁机制采取了更加宽松的加锁机制。

悲观锁大多数情况下依靠数据库的锁机制实现,以保证操作最大程度的独占性。但随之而来的就是数据库性能的大量开销,特别是对长事务而言,这样的开销往往无法承受。而乐观锁机制在一定程度上解决了这个问题。

乐观锁,大多是基于数据版本(Version)记录机制实现。

  • 数据版本:为数据表中增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表增加一个 “version” 字段来实现。读取出数据时,将此版本号一同读出,之后更新时,对此版本号加1。此时,将提交数据的版本数据与数据库表对应记录的当前版本信息进行比对,如果提交的数据版本号大于数据库表当前版本号,则予以更新,否则认为是过期数据。

乐观锁就是使用了低成本的回滚来解决并发事务问题(适用于秒杀系统)

5.4.1 乐观锁的事务阶段

  • 读取:事务将数据读入缓存,这时系统会给事务分派一个时间戳。
  • 校验:事务执行完毕后,进行提交。这时同步校验所有事务,如果事务所读取的数据在读取之后又被其他事务修改,则产生冲突,事务被中断回滚
  • 写入:通过校验阶段后,将更新的数据写入数据库。

5.4.2 乐观锁的实现方式

5.4.2.1 数据版本号(version)

版本号(记为version):就是给数据增加一个版本标识,在数据库上就是表中增加一个version字段,每次更新把这个字段加1,读取数据的时候把version读出来,更新的时候比较version,如果还是开始读取的version就可以更新了,如果现在的version比老的version大,说明有其他事务更新了该数据,并增加了版本号,这时候得到一个无法更新的通知,用户自行根据这个通知来决定怎么处理,比如重新开始一遍。这里的关键是判断version和更新两个动作需要作为一个原子单元执行,否则在你判断可以更新以后正式更新之前有别的事务修改了version,这个时候你再去更新就可能会覆盖前一个事务做的更新,造成第二类丢失更新,所以你可以使用update … where … and version=”old version”这样的语句,根据返回结果是0还是非0来得到通知,如果是0说明更新没有成功,因为version被改了,如果返回非0说明更新成功。

5.4.2.2 时间戳(使用数据库服务器的时间戳)

和版本号基本一样,只是通过时间戳来判断而已,注意时间戳要使用数据库服务器的时间戳不能是业务系统的时间。

5.4.2.3 待更新字段

和版本号方式相似,只是不增加额外字段,直接使用有效数据字段做版本控制信息,因为有时候我们可能无法改变旧系统的数据库表结构。假设有个待更新字段叫count,先去读取这个count,更新的时候去比较数据库中count的值是不是我期望的值(即开始读的值),如果是就把我修改的count的值更新到该字段,否则更新失败。java的基本类型的原子类型对象如AtomicInteger就是这种思想。

5.4.2.4 所有字段

和待更新字段类似,只是使用所有字段做版本控制信息,只有所有字段都没变化才会执行更新。

5.4.2.5 几种方式的选用

新系统设计可以使用version方式和timestamp方式,需要增加字段,应用范围是整条数据,不论那个字段修改都会更新version,也就是说两个事务更新同一条记录的两个不相关字段也是互斥的,不能同步进行。旧系统不能修改数据库表结构的时候使用数据字段作为版本控制信息,不需要新增字段,待更新字段方式只要其他事务修改的字段和当前事务修改的字段没有重叠就可以同步进行,并发性更高。

5.4.3 优点与不足

乐观并发控制相信事务之间的数据竞争(data race)的概率是比较小的,因此尽可能直接做下去,直到提交的时候才去锁定,所以不会产生任何锁和死锁。但如果直接简单这么做,还是有可能会遇到不可预期的结果,例如两个事务都读取了数据库的某一行,经过修改以后写回数据库,这时就遇到了问题(事务回滚)。 —— Wiki

5.5 在高并发事务下采用悲观锁会造成的问题

5.5.1 活锁

指的是T1封锁了数据R,T2同时也请求封锁数据R,T3也请求封锁数据R,当T1释放了锁之后,T3会锁住R,T4也请求封锁R,则T2就会一直等待下去。

解决方法:采用“先来先服务”策略可以避免。

5.5.2 死锁

就是我等你,你又等我,双方就会一直等待下去。比如:T1封锁了数据R1,正请求对R2封锁,而T2封住了R2,正请求封锁R1,这样就会导致死锁,死锁这种没有完全解决的方法,只能尽量预防。

预防方法:

    1. 一次封锁法,指的是一次性把所需要的数据全部封锁住,但是这样会扩大了封锁的范围,降低系统的并发度;
    1. 顺序封锁法,指的是事先对数据对象指定一个封锁顺序,要对数据进行封锁,只能按照规定的顺序来封锁,但是这个一般不大可能的。

系统判定死锁的方法:

  • 超时法:如果某个事物的等待时间超过指定时限,则判定为出现死锁;
  • 等待图法:如果事务等待图中出现了回路,则判断出现了死锁。

对于解决死锁的方法,只能是撤销一个处理死锁代价最小的事务,释放此事务持有的所有锁,同时对撤销的事务所执行的数据修改操作必须加以恢复。

5.6 事务隔离级别与悲观锁的关系

事务隔离级别的定义及实现都是基于悲观锁的概念,即对修改持保守态度,在事务期间加锁禁止其他事务对数据的修改

事务隔离级别是一个规范(接口),而悲观锁是实现这个规范的一种方式(实现类)

目前大部分主流数据库都采用乐观锁的思想来保证性能,并通过其他的机制来避免出现脏读、不可重复读以及幻读的问题,如:MVCC(多版本并发控制)。

6、@Transactional注解中的参数

参数名称 功能描述
readOnly 该属性用于设置当前事务是否为只读事务,设置为true表示只读,false则表示可读写,默认值为false。例如:@Transactional(readOnly=true)
rollbackFor 该属性用于设置需要进行回滚的异常类数组,当方法中抛出指定异常数组中的异常时,则进行事务回滚。例如:指定单一异常类:@Transactional(rollbackFor=RuntimeException.class)指定多个异常类:@Transactional(rollbackFor={RuntimeException.class, Exception.class})
rollbackForClassName 该属性用于设置需要进行回滚的异常类名称数组,当方法中抛出指定异常名称数组中的异常时,则进行事务回滚。例如:指定单一异常类名称:@Transactional(rollbackForClassName="RuntimeException")指定多个异常类名称:@Transactional(rollbackForClassName={"RuntimeException","Exception"})
noRollbackFor 该属性用于设置不需要进行回滚的异常类数组,当方法中抛出指定异常数组中的异常时,不进行事务回滚。例如:指定单一异常类:@Transactional(noRollbackFor=RuntimeException.class)指定多个异常类:@Transactional(noRollbackFor={RuntimeException.class, Exception.class})
noRollbackForClassName 该属性用于设置不需要进行回滚的异常类名称数组,当方法中抛出指定异常名称数组中的异常时,不进行事务回滚。例如:指定单一异常类名称:@Transactional(noRollbackForClassName="RuntimeException")指定多个异常类名称:@Transactional(noRollbackForClassName={"RuntimeException","Exception"})
propagation 该属性用于设置事务的传播行为。例如:@Transactional(propagation=Propagation.NOT_SUPPORTED,readOnly=true)
isolation 该属性用于设置底层数据库的事务隔离级别,事务隔离级别用于处理多事务并发的情况,通常使用数据库的默认隔离级别即可,基本不需要进行设置
timeout 该属性用于设置事务的超时秒数,默认值为-1表示永不超时

事物传播行为介绍:

  • @Transactional(propagation=Propagation.REQUIRED) :如果有事务, 那么加入事务, 没有的话新建一个(默认情况下)
  • @Transactional(propagation=Propagation.NOT_SUPPORTED) :以非事务方式执行操作,如果当前存在事务,就把当前事务挂起
  • @Transactional(propagation=Propagation.REQUIRES_NEW) :不管是否存在事务,都创建一个新的事务,原来的挂起,新的执行完毕,继续执行老的事务(它拥有自己的隔离范围,自己的锁,Commit和rollback不受外部事物影响。)
  • @Transactional(propagation=Propagation.MANDATORY) :如果没有事务,抛出异常
  • @Transactional(propagation=Propagation.NEVER) :如果有事务,则抛出异常
  • @Transactional(propagation=Propagation.SUPPORTS) :如果有事务, 那么加入事务, 没有的话就不用事务。
  • @Transactional(propagation=Propagation.NESTED) :如果有事务在运行,当前的方法就应该在这个事务的嵌套事务内运行。否则,就启动一个新的事务,并在它自己的事务内运行。

REQUIRES_NEW启动的新事务不依赖于外部事务,是完全独立的,这意味着事务commit和rollback操作都是独立的,不受外部事务commit或者rollback影响。 NESTED是依赖于外部事务的子事务,只有当外部事务commit时,子事务才能commit;外部事务发生异常rollback,子事务也要回滚。 —— unascribed

事务隔离级别:

  • @Transactional(isolation = Isolation.READ_UNCOMMITTED):读取未提交数据(会出现脏读, 不可重复读) 基本不使用
  • @Transactional(isolation = Isolation.READ_COMMITTED):读取已提交数据(会出现不可重复读和幻读)
  • @Transactional(isolation = Isolation.REPEATABLE_READ):可重复读(会出现幻读)
  • @Transactional(isolation = Isolation.SERIALIZABLE):串行化

MYSQL: 默认为REPEATABLE_READ级别

注意点:

  • @Transactional 只能被应用到public方法上, 对于其它非public的方法,如果标记了@Transactional也不会报错,但方法没有事务功能。(和this.xxx调用@Async注释的方法不生效 同理,原因还不知道)
  • 默认只回滚RuntimeException
  • Spring团队的建议是你在具体的类(或类的方法)上使用 @Transactional 注解,而不要使用在类所要实现的任何接口上。你当然可以在接口上使用 @Transactional 注解,但是这将只能当你设置了基于接口的代理时它才生效。因为注解是不能继承的,这就意味着如果你正在使用基于类的代理时,那么事务的设置将不能被基于类的代理所识别,而且对象也将不会被事务代理所包装(将被确认为严重的)。因此,请接受Spring团队的建议并且在具体的类上使用 @Transactional 注解。

二、分布式事务

1、CAP理论

CAP理论指出对于一个分布式计算系统来说,不可能同时满足以下三点:

  • 一致性(Consistency) (等同于所有节点访问同一份最新的数据副本)
  • 可用性(Availability)(每次请求都能获取到非错的响应,但是不保证获取的数据为最新数据)
  • 分区容错性(Partition tolerance)(以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择。)

理解CAP理论的最简单方式是想象两个节点分处分区两侧。允许至少一个节点更新状态会导致数据不一致,即丧失了C性质。如果为了保证数据一致性,将分区一侧的节点设置为不可用,那么又丧失了A性质。除非两个节点可以互相通信,才能既保证C又保证A,这又会导致丧失P性质。 —— Wiki

1.1 概念

关系型数据库中的事务插图1

一致性(Consistency):服务A、B、C三个结点都存储了用户数据, 三个结点的数据需要保持同一时刻数据一致性。

可用性(Availability):服务A、B、C三个结点,其中一个结点宕机不影响整个集群对外提供服务,如果只有服务A结点,当服务A宕机整个系统将无法提供服务,增加服务B、C是为了保证系统的可用性。

分区容忍性(Partition Tolerance):分区容忍性就是允许系统通过网络协同工作,分区容忍性要解决由于网络分区导致数据的不完整及无法访问等问题。换句话说就是允许在一定时限内达成数据的一致性

  • 分布式系统不可避免的出现了多个系统通过网络协同工作的场景,结点之间难免会出现网络中断、网延延迟等现象,这种现象一旦出现就导致数据被分散在不同的结点上,这就是网络分区。

1.2 分布式系统能否兼顾CAP

如果在保证分区容错性(P)下:

  • 如果要满足一致性(C),在同步数据的时候遇到了网络阻塞,由于数据还没有还没有同步过去,为了满足C,就会让节点阻塞住,从而违背了可用性(A)
  • 如果要满足可用性(A),在同步数据的时候遇到了网络阻塞,由于数据还没有还没有同步过去,导致各个节点的数据不同,违背了一致性(C)

所以3者不能共存。

1.3 CAP有哪些组合方式

1、CA:放弃分区容忍性,加强一致性和可用性,关系数据库按照CA进行设计。我觉得满足了CA,那么他就不是分布式系统了。

2、AP:放弃一致性,加强可用性和分区容忍性,追求最终一致性,很多NoSQL数据库按照AP进行设计。

  • 说明:这里放弃一致性是指放弃强一致性,强一致性就是写入成功立刻要查询出最新数据。追求最终一致性是指允许暂时的数据不一致,只要最终在用户接受的时间内数据一致即可。

3、CP:放弃可用性,加强一致性和分区容忍性,一些强一致性要求的系统按CP进行设计,比如跨行转账,一次转账请求要等待双方银行系统都完成整个事务才算完成。

  • 说明:由于网络问题的存在CP系统可能会出现待等待超时,如果没有处理超时问题则整理系统会出现阻塞。

在分布式系统设计中AP的应用较多,即保证分区容忍性和可用性,牺牲数据的强一致性(写操作后立刻读取到最新数据),保证数据最终一致性。比如:订单退款,今日退款成功,明日账户到账,只要在预定的用户可以接受的时间内退款事务走完即可。

1.4 Base理论

CAP理论告诉我们一个悲惨但不得不接受的事实——我们只能在C、A、P中选择两个条件。而对于业务系统而言,我们往往选择牺牲一致性来换取系统的可用性和分区容错性。不过这里要指出的是,所谓的“牺牲一致性”并不是完全放弃数据一致性,而是牺牲强一致性换取弱一致性。下面来介绍下BASE理论。

1)BA:Basic Available 基本可用

什么是基本可用呢?假设系统,出现了不可预知的故障,但还是能用,相比较正常的系统而言:

  • 响应时间上的损失:正常情况下的搜索引擎0.5秒即返回给用户结果,而基本可用的搜索引擎可以在2秒作用返回结果。
  • 功能上的损失:在一个电商网站上,正常情况下,用户可以顺利完成每一笔订单。但是到了大促期间,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面。

2)S:Soft State:软状态

什么是软状态呢?相对于原子性而言,要求多个节点的数据副本都是一致的,这是一种“硬状态”。

软状态指的是:允许系统中的数据存在中间状态,并认为该状态不影响系统的整体可用性,即允许系统在多个不同节点的数据副本存在数据延时。

3)E:Eventual Consisstency:最终一致性

同一数据的不同副本的状态,可以不需要实时一致,但一定要保证经过一定时间后仍然是一致的。

4)酸碱平衡

ACID能够保证事务的强一致性,即数据是实时一致的。这在本地事务中是没有问题的,在分布式事务中,强一致性会极大影响分布式系统的性能,因此分布式系统中遵循BASE理论即可。但分布式系统的不同业务场景对一致性的要求也不同。如交易场景下,就要求强一致性,此时就需要遵循ACID理论,而在注册成功后发送短信验证码等场景下,并不需要实时一致,因此遵循BASE理论即可。因此要根据具体业务场景,在ACID和BASE之间寻求平衡。

2、什么是分布式事务

在分布式系统中一次操作由多个系统协同完成,这种一次事务操作涉及多个系统通过网络协同完成的过程称为分布式事务。这里强调的是多个系统通过网络协同完成一个事务的过程,并不强调多个系统访问了不同的数据库,即使多个系统访问的是同一个数据库也是分布式事务,如下图:

关系型数据库中的事务插图2

另外一种分布式事务的表现是,一个应用程序使用了多个数据源连接了不同的数据库,当一次事务需要操作多个数据源,此时也属于分布式事务,当系统作了数据库拆分后会出现此种情况。

关系型数据库中的事务插图3

3、分布式事务解决方案

3.1 两阶段提交协议(2PC)

3.1.0 XA与DTP与2PC

XA标准是由X/Open于1991年发布的规范,用于分布式事务处理(DTP)

XA的目标是保证跨异构后端数据存储(例如数据库,应用程序服务器,消息队列,事务缓存等)执行的“全局事务”中的原子性。为了保证原子性,XA使用两阶段提交(2PC)来确保所有资源一致地提交或回滚任何特定事务。 —— Wiki

3.1.1 简介

在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。因此,二阶段提交的算法思路可以概括为:参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。 —— Wiki

    协调者                                              参与者
                              QUERY TO COMMIT
                -------------------------------->
                              VOTE YES/NO           prepare*/abort*
                <-------------------------------
commit*/abort*                COMMIT/ROLLBACK
                -------------------------------->
                              ACKNOWLEDGMENT        commit*/abort*
                <--------------------------------  
end


第一阶段:准备阶段(prepare)
    1、协调者通知参与者准备提交订单,参与者开始投票。
    2、协调者完成准备工作向协调者回应Yes。

第二阶段:提交(commit)/回滚(rollback)阶段
    1、协调者根据参与者的投票结果发起最终的提交指令。
    2、如果有参与者没有准备好则发起回滚指令

3.1.2 以下单减库存为例

关系型数据库中的事务插图4
  • 1、应用程序连接两个数据源。
  • 2、应用程序通过事务协调器向两个库发起prepare,两个数据库收到消息分别执行本地事务(记录日志),但不提交,如果执行成功则回复yes,否则回复no。
  • 3、事务协调器收到回复,只要有一方回复no则分别向参与者发起回滚事务,参与者开始回滚事务。
  • 4、事务协调器收到回复,全部回复yes,此时向参与者发起提交事务。如果参与者有一方提交事务失败则由事务协调器发起回滚事务。

3.1.3 优缺点以及实现协议的解决方案

优点:实现强一致性,部分关系数据库支持(Oracle、MySQL等)。

缺点:整个事务的执行需要由协调者在多个节点之间去协调,增加了事务的执行时间,性能低下。在投票阶段如果某个节点不回复,会造成长时间的等待。

解决方案有:springboot+Atomikos or Bitronix

3PC主要是解决协调者与参与者通信阻塞问题而产生的,它比2PC传递的消息还要多,性能不高。

想想2pc实现了CAP的哪两个?适用于那种场景?

CA

3.2 事务补偿(TCC)

3.2.1 简介

TCC事务补偿是基于2PC实现的业务层事务控制方案,它是Try、Confirm和Cancel三个单词的首字母,含义如下:

  • 1、Try 检查及预留业务资源:完成提交事务前的检查,并预留好资源。
  • 2、Confirm 确定执行业务操作:对try阶段预留的资源正式执行。
  • 3、Cancel 取消执行业务操作:对try阶段预留的资源释放。

3.2.2 以下单减库存为例

关系型数据库中的事务插图5

1、Try

下单业务由订单服务和库存服务协同完成,在try阶段订单服务和库存服务完成检查和预留资源。

订单服务检查当前是否满足提交订单的条件(比如:当前存在未完成订单的不允许提交新订单)。

库存服务检查当前是否有充足的库存,并锁定资源。

2、Confirm

订单服务和库存服务成功完成Try后开始正式执行资源操作。

订单服务向订单写一条订单信息。

库存服务减去库存。

3、Cancel

如果订单服务和库存服务有一方出现失败则全部取消操作。

订单服务需要删除新增的订单信息。

库存服务将减去的库存再还原。

3.2.3 优缺点

优点:最终保证数据的一致性,在业务层实现事务控制,灵活性好。
缺点:开发成本高,每个事务操作每个参与者都需要实现try/confirm/cancel三个接口。

注意:TCC的try/confirm/cancel接口都要实现幂等性,在为在try、confirm、cancel失败后要不断重试。

幂等性是指同一个操作无论请求多少次,其结果都相同。

幂等操作实现方式有:

  • 1、操作之前在业务方法进行判断如果执行过了就不再执行。
  • 2、缓存所有请求和处理的结果,已经处理的请求则直接返回结果。
  • 3、在数据库表中加一个状态字段(未处理,已处理),数据操作时判断未处理时再处理。

3.3 基于可靠消息服务实现最终一致

这种实现分布式事务的方式需要通过消息中间件来实现。假设有A和B两个系统,分别可以处理任务A和任务B。此时系统A中存在一个业务流程,需要将任务A和任务B在同一个事务中处理。下面来介绍基于消息中间件来实现这种分布式事务。

3.3.1 基本流程

关系型数据库中的事务插图6

1、在系统A处理任务A前,首先向消息中间件发送一条消息,消息中间件收到后将该条消息持久化,但并不投递。此时下游系统B仍然不知道该条消息的存在。

2、消息中间件持久化成功后,便向系统A返回一个确认应答;系统A收到确认应答后,则可以开始处理任务A;任务A处理完成后,向消息中间件发送Commit请求。该请求发送完成后,对系统A而言,该事务的处理过程就结束了,此时它可以处理别的任务了。但commit消息可能会在传输途中丢失,从而消息中间件并不会向系统B投递这条消息,从而系统就会出现不一致性。这个问题由消息中间件的事务回查机制完成,下文会介绍。

3、消息中间件收到Commit指令后,便向系统B投递该消息,从而触发任务B的执行;当任务B执行完成后,系统B向消息中间件返回一个确认应答,告诉消息中间件该消息已经成功消费,此时,这个分布式事务完成。

在这个流程中:

  • 消息中间件扮演者分布式事务协调者的角色
  • 系统A完成任务A后,到任务B执行完成之间,会存在一定的时间差。在这个时间差内,整个系统处于数据不一致的状态,但这短暂的不一致性是可以接受的,因为经过短暂的时间后,系统又可以保持数据一致性,满足BASE理论。

3.3.2 如果A系统执行失败

关系型数据库中的事务插图7

若系统A在处理任务A时失败,那么就会向消息中间件发送Rollback请求。和发送Commit请求一样,系统A发完之后便可以认为回滚已经完成,它便可以去做其他的事情。

消息中间件收到回滚请求后,直接将该消息丢弃,而不投递给系统B,从而不会触发系统B的任务B。

3.3.3 如果Commit和Rollback指令在传输途中丢失

关系型数据库中的事务插图8

系统A除了实现正常的业务流程外,还需提供一个事务询问的接口,供消息中间件调用。当消息中间件收到一条事务型消息后便开始计时,如果到了超时时间也没收到系统A发来的Commit或Rollback指令的话,就会主动调用系统A提供的事务询问接口询问该系统目前的状态。该接口会返回三种结果:

  • 提交: 若获得的状态是“提交”,则将该消息投递给系统B。
  • 回滚: 若获得的状态是“回滚”,则直接将条消息丢弃。
  • 处理中: 若获得的状态是“处理中”,则继续等待。

3.3.4 消息中间件向B(下游)系统投递消息的流程

关系型数据库中的事务插图9

消息中间件向下游系统投递完消息后便进入阻塞等待状态,下游系统便立即进行任务的处理,任务处理完成后便向消息中间件返回应答。消息中间件收到确认应答后便认为该事务处理完毕!
如果消息在投递过程中丢失,或消息的确认应答在返回途中丢失,那么消息中间件在等待确认应答超时之后就会重新投递,直到下游消费者返回消费成功响应为止。当然,一般消息中间件可以设置消息重试的次数和时间间隔,比如:当第一次投递失败后,每隔五分钟重试一次,一共重试3次。如果重试3次之后仍然投递失败,那么这条消息就需要人工干预。

3.3.5 为什么消息投递失败后为什么不回滚消息,而是不断尝试重新投递?

这就涉及到整套分布式事务系统的实现成本问题。

我们知道,当系统A将向消息中间件发送Commit指令后,它便去做别的事情了。如果此时消息投递失败,需要回滚的话,就需要让系统A事先提供回滚接口,这无疑增加了额外的开发成本,业务系统的复杂度也将提高。对于一个业务系统的设计目标是,在保证性能的前提下,最大限度地降低系统复杂度,从而能够降低系统的运维成本。

3.3.6 如果下游报错了怎么办

他们说,是MQ回滚,我也不懂什么意思。 看评论

3.4 分布式事务的选用

关系型数据库中的事务插图10
关系型数据库中的事务插图11

如果向上述的这种业务需求,建议选用强一致性的XA协议(Spring JTA有实现),如果向订单和库存的这种,库存什么时候减无所谓(库存永远充足),可以选用MQ的内种方式

本文参考

事务

1、mysql、oracle默认事务隔离级别

2、理解数据库的事务,ACID,CAP和一致性

3、并发编程(四):也谈谈数据库的锁机制

4、MySQL数据库锁介绍

5、数据库锁分类和总结(本文主要来源)

6、Innodb中的事务隔离级别和锁的关系(美团大佬)

7、MySQL事务隔离级别与锁的关系(MVVC)

分布式事务

1、Spring Cloud分布式事务终极解决方案探讨

2、常用的分布式事务解决方案(建议看)

3、分布式理论(二) - BASE理论(建议看下下面的相关链接中的内容)

4、REST微服务的分布式事务实现-基于消息中间件(JTA实操)

引言中的解决办法

由ConcurrentHashMap(1.8版)得到启发,ConcurrentHashMap是采用自旋锁+CAS操作来实现的。

我的解决方案同理,使用数据库乐观锁的方式当作CAS原子操作,结合自旋锁解决。

public synchronized void saveOrUpdate(ModelCalculationStatistics modelCalculationStatistics) {

    if (Objects.nonNull(modelCalculationStatistics)) {
        for (; ; ) {

            // 此处的查找要使用jdbcTemplate,如果使用JPA,由于会有hibernate一级缓存,导致在一个session中查出的结果相同(1)
            ModelCalculationStatistics result = this.findByReviewTaskIdAndReviewPointId(modelCalculationStatistics.getReviewTaskId(),modelCalculationStatistics.getReviewPointId());

            // 数据库中存在该审查要点的数据【更新操作】
            if (result != null) {
                result.setConflictNumber(result.getConflictNumber() + modelCalculationStatistics.getConflictNumber());

                // 更新模型计算状态表的信息(2)
                Integer status = modelCalculationStatisticsRepository.updateResultStatusAndConflictNumberAndVersion(result.getResultStatus(), result.getConflictNumber(), result.getVersion() + 1, result.getId());
                if (status == 1) {
                    break;
                }
            } else {
                // 【保存操作】如果出现数据库完整性冲突异常,则重试
                try {
                    modelCalculationStatisticsRepository.saveAndFlush(modelCalculationStatistics);
                    break;
                } catch (DataIntegrityViolationException e) {   // 如果出现唯一索引异常则会抛出这个异常;但是这样写是错误的,根本就捕获不到这个异常,因为这个异常是在事务提交时抛出的
                    e.printStackTrace();
                }
            }
        }
    }
}

(0)实体
/**
 * 模型计算结果统计表
 * 设置【reviewTaskId和reviewPointId唯一索引】
 *
 * @author yujx
 * @date 2019/03/04 13:27
 */
@Entity
@Table(name = "ars_model_calc_statistics", uniqueConstraints = @UniqueConstraint(columnNames = {"REVIEWTASKID", "REVIEWPOINTID"}))
@SequenceGenerator(name = "ID_SEQ", sequenceName = "SEQ_ARS_HIBERNATE", allocationSize = 1)
public class ModelCalculationStatistics {

    @Id
    @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "ID_SEQ")
    private Long id;

    // 计算结果冲突数量
    private Integer conflictNumber;

    // 检测状态  @See StatusEnum.DetectionStatus
    @Column(length = 1)
    private Integer resultStatus;

    // 审查任务id   @See ReviewTask.id
    private Long reviewTaskId;

    // 审查要点id   @See ReviewPoint.id
    private Long reviewPointId;

    // 乐观锁的版本
    private Integer version;
}

(1)
/**
 * 采用JdbcTemplate进行查询模型计算状态【防止Hibernate的一级缓存】
 *
 * @param reviewTaskId
 * @param reviewPointId
 * @return
 */
private ModelCalculationStatistics findByReviewTaskIdAndReviewPointId(Long reviewTaskId, Long reviewPointId) {

    RowMapper<ModelCalculationStatistics> rowMapper = new BeanPropertyRowMapper<>(ModelCalculationStatistics.class);
    List<ModelCalculationStatistics> modelCalcStatisticsList = jdbcTemplate.query("SELECT * FROM ARS_MODEL_CALC_STATISTICS M WHERE  M.REVIEWTASKID = " + reviewTaskId + " AND M.REVIEWPOINTID = " + reviewPointId + "", rowMapper);
    if (ArrayUtil.isNonNullAndNonEmpty(modelCalcStatisticsList)) {
        return modelCalcStatisticsList.get(0);
    }
    return null;
}

(2)成功 返回1,失败返回0
@Modifying
@Query(value = "update ModelCalculationStatistics mcs set mcs.resultStatus = ?1,mcs.conflictNumber = ?2,mcs.version = ?3 where mcs.id=?4 and mcs.version < ?2")
Integer updateResultStatusAndConflictNumberAndVersion(Integer resultStatus, Integer conflictNumber, Integer version, Long id);

测试saveOrUpdate方法是否加锁时间

不加锁(synchronized)的情况下:121ms   179ms     374ms

[SimpleAsyncTaskExecutor-1] o = 1
[SimpleAsyncTaskExecutor-2] o = 0
[SimpleAsyncTaskExecutor-4] o = 0
[SimpleAsyncTaskExecutor-3] o = 0

[SimpleAsyncTaskExecutor-4] o = 1
[SimpleAsyncTaskExecutor-2] o = 0
[SimpleAsyncTaskExecutor-3] o = 0

[SimpleAsyncTaskExecutor-3] o = 1
[SimpleAsyncTaskExecutor-2] o = 0

[SimpleAsyncTaskExecutor-2] o = 1


加锁(synchronized)的情况下:116ms     135ms   279ms

[SimpleAsyncTaskExecutor-4] o = 1
[SimpleAsyncTaskExecutor-3] o = 0
[SimpleAsyncTaskExecutor-3] o = 1
[SimpleAsyncTaskExecutor-2] o = 1
[SimpleAsyncTaskExecutor-1] o = 1


由上面可知,不加锁的情况下需要访问数据库次数更多(与数据库交互的IO操作),这些IO操作导致的耗时比加锁的耗时更多,所以最终考虑自旋锁(此处的自旋锁的锁变量是数据库中的version字段)+synchronization

自旋锁是计算机科学用于多线程同步的一种锁,线程反复检查锁变量是否可用。 —— Wiki

(4)小插曲

当同时执行2个更新的代码会出现下面的情况:

// 线程1、2同时拿到version=1
2019-04-28 09:43:23.493 [SimpleAsyncTaskExecutor-1] WARN  c.d.c.s.ModelCalculationStatisticsServiceImpl - 1
2019-04-28 09:43:23.493 [SimpleAsyncTaskExecutor-2] WARN  c.d.c.s.ModelCalculationStatisticsServiceImpl - 1

// 线程2进行了更新操作
2019-04-28 09:43:23.540 [SimpleAsyncTaskExecutor-2] ERROR c.d.c.s.ModelCalculationStatisticsServiceImpl - o = 1
save5完成,睡了10s

// 线程1直到线程2事务提交结束才继续执行,执行结果返回0,重试
2019-04-28 09:43:33.553 [SimpleAsyncTaskExecutor-1] ERROR c.d.c.s.ModelCalculationStatisticsServiceImpl - o = 0
2019-04-28 09:43:33.555 [SimpleAsyncTaskExecutor-1] WARN  c.d.c.s.ModelCalculationStatisticsServiceImpl - 2

2019-04-28 09:43:33.558 [SimpleAsyncTaskExecutor-1] ERROR c.d.c.s.ModelCalculationStatisticsServiceImpl - o = 1
save4完成,睡了1s

原因是什么,为什么线程1会阻塞?

因为线程2执行更新语句的时候,数据库为那行加了行锁


sql条件插入,实在不行用jdbcTemplate

INSERT INTO ars_model_calc_statistics
(conflictNumber, resultStatus, reviewPointId, reviewTaskId, version, id) SELECT
1,
1,
1,
1,
0,
SEQ_ARS_HIBERNATE.nextval
FROM DUAL
WHERE NOT exists
(SELECT 1
FROM ARS_MODEL_CALC_STATISTICS mcs
WHERE mcs.REVIEWTASKID = 1 AND
mcs.REVIEWPOINTID = 1)

赞(0) 打赏
未经允许不得转载:IDEA激活码 » 关系型数据库中的事务

一个分享Java & Python知识的社区