04_Realize distributed transaction based on JTA+Druid multiple data sources+Atomikos

04_Realize distributed transaction based on JTA+Druid multiple data sources+Atomikos

1. How to use JTA+Druid+Atomickos

1.1 Introduce dependencies

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>

1.2 Add configuration

Add the following configuration in application.yml, db1 and db2 refer to the multiple data sources we configured.

db1:
  datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
db2:
  datasource:
    type: com.alibaba.druid.pool.xa.DruidXADataSource
jta:
  log-dir: classpath:tx-logs
  transaction-manager-id: txManager

1.3 druid data source configuration

  1. Remove the DatasourceTransactionManager used by the single library
  2. Use JtaTransactionManager, refer to the following code
@Bean(name = "myJtm")
@Primary
public JtaTransactionManager activityTransactionManager() {
	UserTransactionManager userTransactionManager = new UserTransactionManager();
	UserTransaction userTransaction = new UserTransactionImp();
	return new JtaTransactionManager(userTransaction, userTransactionManager);
}

We may configure multiple data sources, but we only need to add the configuration of the JTA Transaction Manager (JtaTransactionManager) to any one of the data sources.

  1. For the creation of Datasource, refer to the following code
@Primary
@Bean(name = "db1Datasource")
public DataSource activityDatasource() {
	DruidXADataSource datasource = new DruidXADataSource();
	datasource.setUrl(this.dbUrl);
	datasource.setUsername(username);
	datasource.setPassword(password);
	datasource.setDriverClassName(driverClassName);
	datasource.setInitialSize(initialSize);
	datasource.setMinIdle(minIdle);
	datasource.setMaxActive(maxActive);
	datasource.setMaxWait(maxWait);
	datasource.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
	datasource.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
	datasource.setValidationQuery(validationQuery);
	datasource.setTestWhileIdle(testWhileIdle);
	datasource.setTestOnBorrow(testOnBorrow);
	datasource.setTestOnReturn(testOnReturn);
	datasource.setPoolPreparedStatements(poolPreparedStatements);
	datasource.setMaxPoolPreparedStatementPerConnectionSize(maxPoolPreparedStatementPerConnectionSize);

	try {
		datasource.setFilters(filters);
	} catch (SQLException e) {
		e.printStackTrace();
	}

	datasource.setConnectionProperties(connectionProperties);

	AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
	atomikosDataSourceBean.setXaDataSource(datasource);

	return atomikosDataSourceBean;
}

1.4 Add transaction notes

@Transactional(transactionManager = "myJtm", rollbackFor = Exception.class)

transactionManager Fill in the name of the transaction manager we just set up.

2. Source code analysis of JTA+Atomikos distributed transaction

2.1 Create a distributed transaction

In theory, it is nothing more than the distributed transaction API of JTA, with the help of Atomikos distributed transaction framework, to make a TM (transaction manager).

The entry point for studying the source code is still TransactionInterceptor #invoke(). First of all, we need to see how distributed transactions are created, so we focus on TransactionAspectSupport #createTransactionIfNecessary().

Before doing single database transactions, we used PlatformTransactionManager, and now we use JtaTransactionManager configured in the druid data source.

Insert picture description here


Then you will go to the doBegin() method of JtaTransactionManager (the previous single database transaction was JpaTransactionManager), execute JtaTransactionManager #doJtaBegin(), and then execute UserTransactionImpl #begin(). This is different from single database transactions. The bottom layer of single database transactions relies on TransactionImpl provided by hibernate, while JTA uses UserTransactionImp provided by Atomikos.

Then we went to TransactionManagerImpl #begin(), and we found that all subsequent operations are based on CompositeTransaction. CompositeTransaction is the distributed transaction we are looking for.

JTA has a Map structure and creates a set of stacks for each thread. Obviously, multiple CompositeTransaction objects can be stored in the stack.

So the question is, why does JTA maintain a set of stacks for each thread?

This is because CompositeTransaction represents a distributed transaction. For the same thread, multiple distributed transactions may be involved in a request. Here, multiple distributed transactions in one request are pushed into the "stack" maintained by JTA.

private Map<Thread, Stack<CompositeTransaction>> threadtotxmap_ = null;

When the request is received for the first time, the stack corresponding to the current thread must be empty, so the CompositeTransation taken from the stack must be null, so it needs to be initialized. The initialization of CompositeTransation is completed by BaseTransactionManager. The specific initialization process is as follows:

First of all, BaseTransactionManager will fetch the stack corresponding to the current thread from the Map, then pop the CompositeTransation at the top of the stack from the stack, and then, if the pop-up CompositeTransation is found to be null, it will create a CompositeTransation and push it onto the stack, and finally The stack is stored in the Map.

2.2 Create Datasource and Connection

After the code in section 2.1 is completed, that is, after the CompositeTransaction is created, I am very puzzled. Why didn't I see the code for getting Connection and setting some parameters, such as turning off automatic submission? In fact, the JTA Api that uses the Atomikos framework is slightly different from the way the native bottom layer is implemented through Hibernate.

Unfortunately, I tried to directly trace the invokeWithinTransaction() method of TransactionAspectSupport, but failed to find the creation process of Datasource and Connection.

Since you can't find the code, let's speculate. We are now doing distributed transactions across multiple databases. The code for operating the database must be XXXMapper. When executing the interface in the Mapper, Mybatis will apply for the Datasource from the SqlSessionFactory for us. The Datasource itself does not have the ability to transmit instructions to the database. Ability, it will definitely continue to look for the database connection pool and get a connection (Connection).

Remember that in Section 1.3, we defined the AtomikosDataSourceBean, which is a Datasource based on the Atomickos framework. So, next, let's see if there is a code to get the database connection in AtomikosDataSourceBean.

When Spring Boot integrates with the Atomickos framework, it builds an AtomikosDataSourceBean by itself, which seems to only adapt to the Spring framework and add some methods necessary for the life cycle of Spring Beans. Its real methods are all defined in com.atomikos.jdbc.AtomikosDataSourceBean, combined with the process of creating Datasource in section 1.3, it is easy to find that the underlying AtomikosDataSourceBean depends on DruidXADataSource.

So now let's take a look at how AtomikosDataSourceBean gets Connection, AbstractDataSourceBean is the parent class of AtomikosDataSourceBean.

Insert picture description here


The connectionPool in the red box is the connection pool provided by Atomikos. The request goes to ConnectionPool #retrieveFirstAvailableConnection(). Looking at the method name, it feels that it is used to obtain the first available connection.

Continue to go down, the request went to AtomikosConnectionProxy #newInstance().

Insert picture description here


In the above figure, Connection c is the database connection provided by DruidXADatasource, and its type is java.sql.Connection. But the Atomikos framework obviously does not intend to use this Connection directly. It encapsulates the Connection in AtomikosConnectionProxy. This AtomikosConnectionProxy is very special. It inherits InvocationHandler. I saw InvocationHandler and conditional reflection. I immediately thought of JDK dynamic proxy. As expected, the second in the figure above A red box creates a dynamic proxy, and the proxy interface includes java.sql.Connection.

It is worth noting that the type of this dynamic proxy object is ConnectionProxyImpl, which is provided by druid. This shows that druid deliberately leaves the implementation method to allow other frameworks to increase the functions of the Connection provided by itself, which can not be achieved by druid itself. For example, Atomikos provides the function of implementing distributed transactions. And druid itself focuses on the realization of database connection pool, database connection and other functions is enough.

Summary: When

Java's JTA Api uses the Atomikos framework to implement distributed transactions, first use the DruidXADatasource data source to obtain the available database connections in the Druid connection pool, and then create a dynamic proxy object (Connection) based on the database connection. The Atomikos framework has been enhanced to realize the function of distributed transactions.

As for what functions have been added, how do they implement distributed transactions? This depends on section 2.3.

2.3 Send XA START

I wrote about native XA distributed transactions before, and its execution process is nothing more than:

  1. For the transaction, generate txid
  2. Use the XA start txid and XA end txid instructions to wrap the business code to be executed. It is XAResource that executes the start and end commands.
  3. Send XA prepare txid to the database involved, and get the returned result
  4. If all databases return OK, then send XA commit txid to all databases again to commit the transaction.

Through the analysis in section 2.2, we have generated distributed transactions, so where are the XA start txid and XA end txid instructions executed?

First, the request interface, when executing SQL, the request will be intercepted by the invoke() of AtomikosConnectionProxy.

When intercepting for the first time, the method that needs to be executed is getAutoCommit(), and the returned result is true.

During the second interception, the method that needs to be executed is prepareStatement(). According to the previous experience of learning native database connection code, I saw prepareStatment() and immediately thought that I must fill in the SQL that needs to be executed. The request goes to AtomikosConnectionProxy #enlist().

Insert picture description here


Continuing to track the code in the red box, I found that the core code is in the checkEnlistBeforeUse() of NotInBranchStateHandler.

Insert picture description here


The second parameter in the red box is XAResource, which is the class used to perform Start or End operations.

Insert picture description here


In BranchEnlistedStateHandler, I found two variables:

  1. CompositeTransaction represents a distributed transaction
  2. XAResourceTransaction represents a sub-transaction of a database operation in a distributed transaction.

As mentioned before, a distributed transaction may involve operations on multiple databases, then each database operation will actually create a sub-transaction, and these sub-transactions will be added to the distributed transaction ( Each sub-transaction has its own transaction id). When CompositeTransaction stores these sub-transactions, Vector data structure is used, and the code location is: CoordinatorImp

private Vector<Participant> participants_ = new Vector<Participant>();

Continue to look at the branch.resume() method. This method internally executes the start() of XAResource. The implementation class of XAResource is MysqlXAConnection. You can look at the source code of its start() method:

Insert picture description here


here is an instruction: XA START xxxxxxxxxxxxxxxxxx, and The instruction is sent to Mysql, where xxxxxxx refers to the id of the sub-transaction.

Then, go back to the invoke() method of AtomikosConnectionProxy and execute method.invoke(delegate, args); through breakpoints, it is easy to know:

  1. method is Connection.prepareStatement()
  2. delegate is the Connection object
  3. args is the SQL statement and input parameters that need to be executed this time

Here is to call the native Connection, execute prepareStatement(), and put it into a List. If in this sub-transaction, the operation of the database involves multiple sql, then every time sql is executed, a PrepareStatement will be created and put into the List. So, here List finally stores all the SQL that needs to be executed in this sub-transaction.

In the same sub-transaction, when you operate the same database for the second time and execute sql, it represents the sub-transaction of that database (XAResourceTransaction will not be created), so XA START will only be executed when XAResourceTransaction is created for the first time instruction. This is because AtomikosConnectionProxy #invoke() will execute isEnlistedInGlobalTransaction() to determine whether the database of this operation has a corresponding sub-transaction and whether it has been added to the global transaction. If not, create a sub-transaction and execute the XA START instruction, otherwise Will not be created.

After executing the XA START instruction on all databases, and executing prepareStatement() in turn, after collecting the SQL that needs to be executed, the next step must be to execute the XA END instruction on each database and send the XA PREPARE instruction to complete the 2PC The prepare stage.

Summary:

Java's operation of the database, the bottom layer is actually completed by java.sql.Connection. The Atomikos framework creates a dynamic proxy object for interfaces such as Connection, and all requests for Connection are forwarded to the invoke() method of AtomikosConnectionProxy. After receiving the SQL execution request of a certain database for the first time, Atomikos will create a sub-transaction for this database operation, execute the XA START instruction, and load the SQL to be executed this time into the PrepareStatement, put it into the List collection, and then perform the following operations again. When operating this database, you can directly use the created sub-transactions and fill in sql directly.

2.4 Send XA END instruction

Back to TransactionInterceptor #invoke(), in the classic TransactionAspectSupport #invokeWithinTransaction(), we have created a distributed transaction, executed the business code, and created prepareStatement for the underlying operations on the database, and stored it in a sub-transaction.

Review the transaction interceptor provided by spring again:

Insert picture description here

Since SQL has not been executed at this time, there are two scenarios here:

  1. The business code itself executes normally and prepares to execute commitTransactionAfterReturning() of TransactionAspectSupport.
  2. The business code itself reports an error and prepares to execute completeTransactionAfterThrowing().

No matter which scenario it is, XAResource.end will be triggered, but the first scenario is XAResource.commit, and the second scenario is XAResource.rollback(). Let's take the first scenario as an example to see how the XAResource.end command is issued.

First, the processCommit() method of AbstractPlatformTransactionManager will be called.

TriggerBeforeCompletion(status) is executed in this method; it will execute the close() operation with the help of Connection.

Since you are performing operations on Connection, you don't have to think about it. It will definitely be intercepted by AtomikosConnectionProxy and handed over to its invoke().

A lot of methods will be experienced here, but in fact, only the following sentences are helpful to us:

  1. BranchEnlistedStateHandler #sessionClosed() calls the constructor of BranchEndedStateHandler, where the sub-transaction of this request to the database is passed in as an input parameter.
  2. The sub-transaction XAResourceTransaction executes the suspend() method. The core code in this method is as follows, where xaresource_ is MysqlXAConnection.
xaresource_.end ( xid_, XAResource.TMSUCCESS );

Next, doCommit() of JtaTransactionManager will be executed to commit the transaction.

2.5 Send XA COMMIT command

Come to the doCommit() method of JtaTransactionManager, UserTransactionImp #commit() will be executed here, and then the commit() of CompositeTransactionImp will be executed.

Insert picture description here


Since we have seen the words CompositeTransaction, we can conclude that what we are about to commit is the entire distributed transaction, not just a sub-transaction.

Then read down and found that CoordinatorImp's terminate(), this method looks too much like XA's 2PC protocol.

Insert picture description here


From prepare(), to commit(), to rollback(), it has everything that should be in 2PC.

First look at prepare(). The code traces to the prepare() method of ActiveStateHandler. In this method, all sub-transactions in the current distributed transaction are traversed, and the prepare() method of XAResourceTransaction is executed at the bottom layer, that is, for each A database, respectively send XA prepare().