在MySQL 5.7.7版本中,才将xa的bug修复,符合Open Group 的<<Distributed Transaction Processing:The XA Specification>> 标准。Mysql中存在两种XA事务,一种是内部XA事务主要用来协调存储引擎和二进制日志,一种是外部事务可以参与到外部分布式事务中(比如多个数据库实现的分布式事务)。xa的语法如下:
XA {START|BEGIN} xid [JOIN|RESUME] //开启本地事务XA END xid //结束本地事务XA PREPARE xid //全局事务进入预备状态XA COMMIT xid[ONE PHASE] //提交XA ROLLBACK xid //回滚XA RECOVER[CONVERT XID ] //恢复没有提交的事务,继续执行
XA是牺牲可用性保证强一致性的事务,因为需要mysql的事务隔离级别为串行化。下面我们来实践一把。首先启动两个mysql实例,端口分别是3306和3307:
version: "3.1"services:mysql:image: mysql:5.7container_name: mysqlenvironment:- MYSQL_ROOT_PASSWORD=12345678command: --default-authentication-plugin=mysql_native_password --default-time-zone='+08:00'volumes:- /learn/mysql:/docker-entrypoint-initdb.d- /learn/mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnfports:- "3306:3306"extra_hosts:- host.docker.internal:host-gatewaymysql2:image: mysql:5.7container_name: mysql2environment:- MYSQL_ROOT_PASSWORD=12345678command: --default-authentication-plugin=mysql_native_password --default-time-zone='+08:00'volumes:- /learn/mysql:/docker-entrypoint-initdb.d- /learn/mysql/mysqld.cnf:/etc/mysql/mysql.conf.d/mysqld.cnfports:- "3307:3306"extra_hosts:- host.docker.internal:host-gateway
分别链接mysql,确认是否支持xa协议,修改事务隔离级别
% mysql -uroot -p12345678 -h127.0.0.1% mysql -uroot -p12345678 -h127.0.0.1 -P3307mysql> show variables like 'innodb_support_xa';+-------------------+-------+| Variable_name | Value |+-------------------+-------+| innodb_support_xa | ON |+-------------------+-------+1 row in set (0.01 sec)mysql> set global tx_isolation=serializable;Query OK, 0 rows affected, 1 warning (0.01 sec)mysql> show variables like '%tx_isolation%';+---------------+--------------+| Variable_name | Value |+---------------+--------------+| tx_isolation | SERIALIZABLE |+---------------+--------------+1 row in set (0.00 sec)
然后创建数据库和表
mysql3306中 我们有一个user表
create database orders;use orders;create table user (id int,name varchar(10),score int);insert into user values(1, "foo", 10);
在mysql3307中,我们有一个wallet表。
use orders;create table wallet (id int,money float);insert into wallet values(1, 10.1);
然后就可以通过xa协议实现分布式事务
package mainimport ("database/sql""fmt""log""strconv""time"_ "github.com/go-sql-driver/mysql""github.com/pkg/errors")func main() {var err error// db1的连接db1, err := sql.Open("mysql", "root:[email protected](127.0.0.1:3306)/orders")if err != nil {panic(err.Error())}defer db1.Close()// db2的连接db2, err := sql.Open("mysql", "root:[email protected](127.0.0.1:3307)/orders")if err != nil {panic(err.Error())}defer db2.Close()// 开始前显示var score intdb1.QueryRow("select score from user where id = 1").Scan(&score)fmt.Println("user1 score:", score)var money float64db2.QueryRow("select money from wallet where id = 1").Scan(&money)fmt.Println("wallet1 money:", money)// 生成xidxid := strconv.FormatInt(time.Now().Unix(), 10)fmt.Println("=== xid:" + xid + " ====")defer func() {if err := recover(); err != nil {fmt.Printf("%+v\n", err)fmt.Println("=== call rollback ====")// db1.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))// db2.Exec(fmt.Sprintf("XA ROLLBACK '%s'", xid))}db1.QueryRow("select score from user where id = 1").Scan(&score)fmt.Println("user1 score:", score)db2.QueryRow("select money from wallet where id = 1").Scan(&money)fmt.Println("wallet1 money:", money)}()// XA 启动fmt.Println("=== call start ====")if _, err = db1.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {panic(errors.WithStack(err))}if _, err = db2.Exec(fmt.Sprintf("XA START '%s'", xid)); err != nil {panic(errors.WithStack(err))}// DML操作if _, err = db1.Exec("update user set score=score+2 where id =1"); err != nil {panic(errors.WithStack(err))}if _, err = db2.Exec("update wallet set money=money+1.2 where id=1"); err != nil {panic(errors.WithStack(err))}// panic(errors.WithStack(err))/*xa recover;Empty set (0.01 sec)*/// XA endfmt.Println("=== call end ====")if _, err = db1.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {panic(errors.WithStack(err))}if _, err = db2.Exec(fmt.Sprintf("XA END '%s'", xid)); err != nil {panic(errors.WithStack(err))}// preparefmt.Println("=== call prepare ====")if _, err = db1.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {panic(errors.WithStack(err))}// panic(errors.New("db1 prepare error"))/*db1xa recover;+----------+--------------+--------------+------------+| formatID | gtrid_length | bqual_length | data |+----------+--------------+--------------+------------+| 1 | 10 | 0 | 1671287200 |+----------+--------------+--------------+------------+1 row in set (0.00 sec)db2xa recover;Empty set (0.00 sec)//新事务会死锁=== xid:1671287283 ======= call start ====Error 1205: Lock wait timeout exceeded; try restarting transactionmain.mainmysql> xa commit '1671287200';Query OK, 0 rows affected (0.00 sec)*/if _, err = db2.Exec(fmt.Sprintf("XA PREPARE '%s'", xid)); err != nil {panic(errors.WithStack(err))}// panic(errors.New("db2 prepare error"))/*db1mysql> xa recover;+----------+--------------+--------------+------------+| formatID | gtrid_length | bqual_length | data |+----------+--------------+--------------+------------+| 1 | 10 | 0 | 1671287434 |+----------+--------------+--------------+------------+1 row in set (0.00 sec)db2xa recover;+----------+--------------+--------------+------------+| formatID | gtrid_length | bqual_length | data |+----------+--------------+--------------+------------+| 1 | 10 | 0 | 1671287434 |+----------+--------------+--------------+------------+1 row in set (0.00 sec)*/// commitfmt.Println("=== call commit ====")if _, err = db1.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {// TODO: 尝试重新提交COMMIT// TODO: 如果还失败,记录xid,进入数据恢复逻辑,等待数据库恢复重新提交log.Println("xid:" + xid)}// panic(errors.New("db2 commit error"))if _, err = db2.Exec(fmt.Sprintf("XA COMMIT '%s'", xid)); err != nil {log.Println("xid:" + xid)}db1.QueryRow("select score from user where id = 1").Scan(&score)fmt.Println("user1 score:", score)db2.QueryRow("select money from wallet where id = 1").Scan(&money)fmt.Println("wallet1 money:", money)}
为了实验充分理解分布式事务,我们分别在几个特殊阶段点来panic,中断事务。
1,在两个db实例prepare之前
2,在db1 prepare之后,db2 prepare之前
3,在两个db prepare之后,db1 commit之前
4,在db1 commit之后, db2 commit之前
情形1下:两个事务都没有prepare,全局不可见,异常中断后,本地事务回滚掉了,在两个实例上恢复的时候,都是一致的
xa recover;Empty set (0.00 sec)
情形2下:db1,已经prepare了,全局事务会记录下来,本地事务不会自动回滚掉,db2 没有prepare,本地事务会自动回滚掉
db1xa recover;+----------+--------------+--------------+------------+| formatID | gtrid_length | bqual_length | data |+----------+--------------+--------------+------------+| 1 | 10 | 0 | 1671287200 |+----------+--------------+--------------+------------+1 row in set (0.00 sec)db2xa recover;Empty set (0.00 sec)
这个时候如果新开一个事务,会等待上一个没有结束的事务释放锁,而超时
=== xid:1671287283 ======= call start ====Error 1205: Lock wait timeout exceeded; try restarting transactionmain.main
情形3下:两个事务都处于prepare状态,等待处理
db1mysql> xa recover;+----------+--------------+--------------+------------+| formatID | gtrid_length | bqual_length | data |+----------+--------------+--------------+------------+| 1 | 10 | 0 | 1671287434 |+----------+--------------+--------------+------------+1 row in set (0.00 sec)db2xa recover;+----------+--------------+--------------+------------+| formatID | gtrid_length | bqual_length | data |+----------+--------------+--------------+------------+| 1 | 10 | 0 | 1671287434 |+----------+--------------+--------------+------------+1 row in set (0.00 sec)
情形4下:db1已经提交,recover后是空,db2是待提交状态。
db1mysql> xa recover;Empty set (0.00 sec)db2xa recover;+----------+--------------+--------------+------------+| formatID | gtrid_length | bqual_length | data |+----------+--------------+--------------+------------+| 1 | 10 | 0 | 1671289143 |+----------+--------------+--------------+------------+1 row in set (0.00 sec)
这个时候可以操作db2的回滚,或者提交,db1已经提交,没法回滚了
db1xa rollback '1671289143';ERROR 1397 (XAE04): XAER_NOTA: Unknown XIDdb2xa rollback '1671289143';Query OK, 0 rows affected (0.01 sec)
所以在xa下也会出现不一致,需要人工介入进行回滚或者提交,保证最终的一致性。
总结下:没有银弹,xa虽然尽最大努力保证了一致性,但是如果出现部分提交,还是需要人工介入处理,保证最终的一致性。
推荐阅读