本文共 9782 字,大约阅读时间需要 32 分钟。
这篇文章的目的主要是讲解RM在接收TC的请求后执行全局分支事务提交(doBranchCommit)和全局分支事务回滚(doBranchRollback)的流程。
全局的分支事务提交过程和回滚过程也算RM处理流程中核心的一环,了解以后并结合之前讲解的本地事务提交流程就能够较好的理解整个过程了。
public class RMHandlerAT extends AbstractRMHandlerAT implements RMInboundHandler, TransactionMessageHandler { private DataSourceManager dataSourceManager = DataSourceManager.get(); @Override protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); LOGGER.info("AT Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData); BranchStatus status = dataSourceManager.branchCommit(xid, branchId, resourceId, applicationData); response.setBranchStatus(status); LOGGER.info("AT Branch commit result: " + status); } @Override protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); LOGGER.info("AT Branch rolling back: " + xid + " " + branchId + " " + resourceId); BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData); response.setBranchStatus(status); LOGGER.info("AT Branch rollback result: " + status); }}
说明:
public class DataSourceManager implements ResourceManager { private ResourceManagerInbound asyncWorker; public void setAsyncWorker(ResourceManagerInbound asyncWorker) { this.asyncWorker = asyncWorker; } public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData); }}public class AsyncWorker implements ResourceManagerInbound { public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException { if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) { ASYNC_COMMIT_BUFFER.add(new Phase2Context(xid, branchId, resourceId, applicationData)); } else { LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later."); } return BranchStatus.PhaseTwo_Committed; } public synchronized void init() { LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT); timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true)); timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { doBranchCommits(); } catch (Throwable e) { LOGGER.info("Failed at async committing ... " + e.getMessage()); } } }, 10, 1000 * 1, TimeUnit.MILLISECONDS); } private void doBranchCommits() { if (ASYNC_COMMIT_BUFFER.size() == 0) { return; } Map> mappedContexts = new HashMap<>(); Iterator iterator = ASYNC_COMMIT_BUFFER.iterator(); while (iterator.hasNext()) { Phase2Context commitContext = iterator.next(); List contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId); if (contextsGroupedByResourceId == null) { contextsGroupedByResourceId = new ArrayList<>(); mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId); } contextsGroupedByResourceId.add(commitContext); iterator.remove(); } for (String resourceId : mappedContexts.keySet()) { Connection conn = null; try { try { DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId); conn = dataSourceProxy.getPlainConnection(); } catch (SQLException sqle) { LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle); continue; } List contextsGroupedByResourceId = mappedContexts.get(resourceId); for (Phase2Context commitContext : contextsGroupedByResourceId) { try { UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn); } catch (Exception ex) { LOGGER.warn("Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex); } } } finally { if (conn != null) { try { conn.close(); } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx); } } } } }}
说明:
public class DataSourceManager implements ResourceManager { public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException { DataSourceProxy dataSourceProxy = get(resourceId); if (dataSourceProxy == null) { throw new ShouldNeverHappenException(); } try { // 执行回滚操作 UndoLogManager.undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }}public final class UndoLogManager { private static String SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE log_status = 0 AND branch_id = ? AND xid = ? FOR UPDATE"; public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { assertDbSupport(dataSourceProxy.getTargetDataSource().getDbType()); Connection conn = null; ResultSet rs = null; PreparedStatement selectPST = null; try { conn = dataSourceProxy.getPlainConnection(); // The entire undo process should run in a local transaction. conn.setAutoCommit(false); // Find UNDO LOG selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); selectPST.setLong(1, branchId); selectPST.setString(2, xid); rs = selectPST.executeQuery(); // 遍历所有回滚日志 while (rs.next()) { Blob b = rs.getBlob("rollback_info"); String rollbackInfo = StringUtils.blob2string(b); BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo); for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) { TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); } } deleteUndoLog(xid, branchId, conn); conn.commit(); } catch (Throwable e) { if (conn != null) { try { conn.rollback(); } catch (SQLException rollbackEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); } } throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid), e); } finally { try { if (rs != null) { rs.close(); } if (selectPST != null) { selectPST.close(); } if (conn != null) { conn.close(); } } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); } } }}
说明:
下篇文章会针对undoExecutor作具体的介绍。
转载地址:http://affsa.baihongyu.com/