本文共 9782 字,大约阅读时间需要 32 分钟。
public class RMHandlerAT extends AbstractRMHandlerAT implements RMInboundHandler, TransactionMessageHandler { private DataSourceManager dataSourceManager = DataSourceManager.get(); @Override protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); LOGGER.info("AT Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData); BranchStatus status = dataSourceManager.branchCommit(xid, branchId, resourceId, applicationData); response.setBranchStatus(status); LOGGER.info("AT Branch commit result: " + status); } @Override protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response) throws TransactionException { String xid = request.getXid(); long branchId = request.getBranchId(); String resourceId = request.getResourceId(); String applicationData = request.getApplicationData(); LOGGER.info("AT Branch rolling back: " + xid + " " + branchId + " " + resourceId); BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData); response.setBranchStatus(status); LOGGER.info("AT Branch rollback result: " + status); }}
public class DataSourceManager implements ResourceManager { private ResourceManagerInbound asyncWorker; public void setAsyncWorker(ResourceManagerInbound asyncWorker) { this.asyncWorker = asyncWorker; } public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException { return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData); }}public class AsyncWorker implements ResourceManagerInbound { public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData) throws TransactionException { if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) { ASYNC_COMMIT_BUFFER.add(new Phase2Context(xid, branchId, resourceId, applicationData)); } else { LOGGER.warn("Async commit buffer is FULL. Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later."); } return BranchStatus.PhaseTwo_Committed; } public synchronized void init() { LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT); timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true)); timerExecutor.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { doBranchCommits(); } catch (Throwable e) { LOGGER.info("Failed at async committing ... " + e.getMessage()); } } }, 10, 1000 * 1, TimeUnit.MILLISECONDS); } private void doBranchCommits() { if (ASYNC_COMMIT_BUFFER.size() == 0) { return; } Map> mappedContexts = new HashMap<>(); Iterator iterator = ASYNC_COMMIT_BUFFER.iterator(); while (iterator.hasNext()) { Phase2Context commitContext = iterator.next(); List contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId); if (contextsGroupedByResourceId == null) { contextsGroupedByResourceId = new ArrayList<>(); mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId); } contextsGroupedByResourceId.add(commitContext); iterator.remove(); } for (String resourceId : mappedContexts.keySet()) { Connection conn = null; try { try { DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId); conn = dataSourceProxy.getPlainConnection(); } catch (SQLException sqle) { LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle); continue; } List contextsGroupedByResourceId = mappedContexts.get(resourceId); for (Phase2Context commitContext : contextsGroupedByResourceId) { try { UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn); } catch (Exception ex) { LOGGER.warn("Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex); } } } finally { if (conn != null) { try { conn.close(); } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx); } } } } }}
public class DataSourceManager implements ResourceManager { public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData) throws TransactionException { DataSourceProxy dataSourceProxy = get(resourceId); if (dataSourceProxy == null) { throw new ShouldNeverHappenException(); } try { // 执行回滚操作 UndoLogManager.undo(dataSourceProxy, xid, branchId); } catch (TransactionException te) { if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) { return BranchStatus.PhaseTwo_RollbackFailed_Unretryable; } else { return BranchStatus.PhaseTwo_RollbackFailed_Retryable; } } return BranchStatus.PhaseTwo_Rollbacked; }}public final class UndoLogManager { private static String SELECT_UNDO_LOG_SQL = "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE log_status = 0 AND branch_id = ? AND xid = ? FOR UPDATE"; public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException { assertDbSupport(dataSourceProxy.getTargetDataSource().getDbType()); Connection conn = null; ResultSet rs = null; PreparedStatement selectPST = null; try { conn = dataSourceProxy.getPlainConnection(); // The entire undo process should run in a local transaction. conn.setAutoCommit(false); // Find UNDO LOG selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); selectPST.setLong(1, branchId); selectPST.setString(2, xid); rs = selectPST.executeQuery(); // 遍历所有回滚日志 while (rs.next()) { Blob b = rs.getBlob("rollback_info"); String rollbackInfo = StringUtils.blob2string(b); BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo); for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) { TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); } } deleteUndoLog(xid, branchId, conn); conn.commit(); } catch (Throwable e) { if (conn != null) { try { conn.rollback(); } catch (SQLException rollbackEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); } } throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid), e); } finally { try { if (rs != null) { rs.close(); } if (selectPST != null) { selectPST.close(); } if (conn != null) { conn.close(); } } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); } } }}