博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Fescar - RM 全局事务提交回滚流程
阅读量:6251 次
发布时间:2019-06-22

本文共 9782 字,大约阅读时间需要 32 分钟。

开篇

 这篇文章的目的主要是讲解RM在接收TC的请求后执行全局分支事务提交(doBranchCommit)和全局分支事务回滚(doBranchRollback)的流程。

 全局的分支事务提交过程和回滚过程也算RM处理流程中核心的一环,了解以后并结合之前讲解的本地事务提交流程就能够较好的理解整个过程了。

全局事务操作流程

整体流程

public class RMHandlerAT extends AbstractRMHandlerAT implements                    RMInboundHandler, TransactionMessageHandler {    private DataSourceManager dataSourceManager = DataSourceManager.get();    @Override    protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)         throws TransactionException {        String xid = request.getXid();        long branchId = request.getBranchId();        String resourceId = request.getResourceId();        String applicationData = request.getApplicationData();        LOGGER.info("AT Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);        BranchStatus status = dataSourceManager.branchCommit(xid, branchId, resourceId, applicationData);        response.setBranchStatus(status);        LOGGER.info("AT Branch commit result: " + status);    }    @Override    protected void doBranchRollback(BranchRollbackRequest request, BranchRollbackResponse response)         throws TransactionException {        String xid = request.getXid();        long branchId = request.getBranchId();        String resourceId = request.getResourceId();        String applicationData = request.getApplicationData();        LOGGER.info("AT Branch rolling back: " + xid + " " + branchId + " " + resourceId);        BranchStatus status = dataSourceManager.branchRollback(xid, branchId, resourceId, applicationData);        response.setBranchStatus(status);        LOGGER.info("AT Branch rollback result: " + status);    }}

说明:

  • doBranchCommit()通过dataSourceManager.branchCommit()去执行分支事务提交。
  • doBranchRollback()通过dataSourceManager.branchRollback()去执行分支事务回滚。
  • dataSourceManager是DataSourceManager对象。

doBranchCommit流程

public class DataSourceManager implements ResourceManager {    private ResourceManagerInbound asyncWorker;    public void setAsyncWorker(ResourceManagerInbound asyncWorker) {        this.asyncWorker = asyncWorker;    }    public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData)         throws TransactionException {        return asyncWorker.branchCommit(xid, branchId, resourceId, applicationData);    }}public class AsyncWorker implements ResourceManagerInbound {    public BranchStatus branchCommit(String xid, long branchId, String resourceId, String applicationData)      throws TransactionException {        if (ASYNC_COMMIT_BUFFER.size() < ASYNC_COMMIT_BUFFER_LIMIT) {            ASYNC_COMMIT_BUFFER.add(new Phase2Context(xid, branchId, resourceId, applicationData));        } else {            LOGGER.warn("Async commit buffer is FULL.             Rejected branch [" + branchId + "/" + xid + "] will be handled by housekeeping later.");        }        return BranchStatus.PhaseTwo_Committed;    }    public synchronized void init() {        LOGGER.info("Async Commit Buffer Limit: " + ASYNC_COMMIT_BUFFER_LIMIT);        timerExecutor = new ScheduledThreadPoolExecutor(1,            new NamedThreadFactory("AsyncWorker", 1, true));        timerExecutor.scheduleAtFixedRate(new Runnable() {            @Override            public void run() {                try {                    doBranchCommits();                } catch (Throwable e) {                    LOGGER.info("Failed at async committing ... " + e.getMessage());                }            }        }, 10, 1000 * 1, TimeUnit.MILLISECONDS);    }    private void doBranchCommits() {        if (ASYNC_COMMIT_BUFFER.size() == 0) {            return;        }        Map
> mappedContexts = new HashMap<>(); Iterator
iterator = ASYNC_COMMIT_BUFFER.iterator(); while (iterator.hasNext()) { Phase2Context commitContext = iterator.next(); List
contextsGroupedByResourceId = mappedContexts.get(commitContext.resourceId); if (contextsGroupedByResourceId == null) { contextsGroupedByResourceId = new ArrayList<>(); mappedContexts.put(commitContext.resourceId, contextsGroupedByResourceId); } contextsGroupedByResourceId.add(commitContext); iterator.remove(); } for (String resourceId : mappedContexts.keySet()) { Connection conn = null; try { try { DataSourceProxy dataSourceProxy = DataSourceManager.get().get(resourceId); conn = dataSourceProxy.getPlainConnection(); } catch (SQLException sqle) { LOGGER.warn("Failed to get connection for async committing on " + resourceId, sqle); continue; } List
contextsGroupedByResourceId = mappedContexts.get(resourceId); for (Phase2Context commitContext : contextsGroupedByResourceId) { try { UndoLogManager.deleteUndoLog(commitContext.xid, commitContext.branchId, conn); } catch (Exception ex) { LOGGER.warn("Failed to delete undo log [" + commitContext.branchId + "/" + commitContext.xid + "]", ex); } } } finally { if (conn != null) { try { conn.close(); } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx); } } } } }}

说明:

  • doBranchCommit()操作的核心实现通过AsyncWorker完成,AsyncWorker类其实是一个生成消费模型。
  • doBranchCommit()把需要提交的任务添加到AsyncWorker的ASYNC_COMMIT_BUFFER队列当中。
  • AsyncWorker内部timerExecutor负责启动执行commit动作线程执行doBranchCommits()动作。
  • doBranchCommits动作内部负责删除多余的UndoLog, UndoLogManager.deleteUndoLog。
  • doBranchCommit()的本质任务就是删除备份的回滚日志而已。

doBranchRollback流程

public class DataSourceManager implements ResourceManager {    public BranchStatus branchRollback(String xid, long branchId, String resourceId, String applicationData)        throws TransactionException {        DataSourceProxy dataSourceProxy = get(resourceId);        if (dataSourceProxy == null) {            throw new ShouldNeverHappenException();        }        try {            // 执行回滚操作            UndoLogManager.undo(dataSourceProxy, xid, branchId);        } catch (TransactionException te) {            if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {                return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;            } else {                return BranchStatus.PhaseTwo_RollbackFailed_Retryable;            }        }        return BranchStatus.PhaseTwo_Rollbacked;    }}public final class UndoLogManager {    private static String SELECT_UNDO_LOG_SQL =     "SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE log_status = 0 AND branch_id = ? AND xid = ? FOR UPDATE";    public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId)        throws TransactionException {        assertDbSupport(dataSourceProxy.getTargetDataSource().getDbType());        Connection conn = null;        ResultSet rs = null;        PreparedStatement selectPST = null;        try {            conn = dataSourceProxy.getPlainConnection();            // The entire undo process should run in a local transaction.            conn.setAutoCommit(false);            // Find UNDO LOG            selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);            selectPST.setLong(1, branchId);            selectPST.setString(2, xid);            rs = selectPST.executeQuery();            // 遍历所有回滚日志            while (rs.next()) {                Blob b = rs.getBlob("rollback_info");                String rollbackInfo = StringUtils.blob2string(b);                BranchUndoLog branchUndoLog = UndoLogParserFactory.getInstance().decode(rollbackInfo);                for (SQLUndoLog sqlUndoLog : branchUndoLog.getSqlUndoLogs()) {                    TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());                    sqlUndoLog.setTableMeta(tableMeta);                    AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(                                                      dataSourceProxy.getDbType(), sqlUndoLog);                    undoExecutor.executeOn(conn);                }            }            deleteUndoLog(xid, branchId, conn);            conn.commit();        } catch (Throwable e) {            if (conn != null) {                try {                    conn.rollback();                } catch (SQLException rollbackEx) {                    LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);                }            }            throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s", branchId, xid), e);        } finally {            try {                if (rs != null) {                    rs.close();                }                if (selectPST != null) {                    selectPST.close();                }                if (conn != null) {                    conn.close();                }            } catch (SQLException closeEx) {                LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);            }        }    }}

说明:

  • doBranchRollback操作的核心实现通过UndoLogManager完成,UndoLogManager.undo()负责执行回滚。
  • undo()操作的核心是通过SELECT_UNDO_LOG_SQL日志去获取回滚日志内容。
  • 根据undoLog对象通过UndoExecutorFactory.getUndoExecutor获取回滚的执行者Executor对象。
  • undoExecutor.executeOn(conn)执行回滚操作,不同的回滚操作对象不同的undoExecutor
  • deleteUndoLog(xid, branchId, conn)执行日志删除操作。

期待

 下篇文章会针对undoExecutor作具体的介绍。

Fescar源码分析连载

转载地址:http://affsa.baihongyu.com/

你可能感兴趣的文章
Nginx的第三方模块ngx-fancyindex安装
查看>>
TCP有限状态机
查看>>
XenServer常用Debug问题的命令介绍
查看>>
算法分析-快速排序QUICK-SORT
查看>>
Web服务基础六之编译安装配置RHEL+Apache+MySQL+PHP+ZendOptimize
查看>>
log4net 使用
查看>>
通过bat文件运行jar包程序
查看>>
关于hive RegexSerDe的源码分析
查看>>
V$INSTANCE视图
查看>>
OpenCart之侧边浮动联系我们表单(Side Contact Us Form)
查看>>
PureWhite OpenCart 商城自适应主题模板 ABC-0009
查看>>
docker整理文档
查看>>
zabbix安装配置
查看>>
Awk练习笔记
查看>>
RAID级别详解,如何在Linux下实现软RAID图文解析。
查看>>
CentOS 配置***客户端
查看>>
线上应用故障排查之二:高内存占用
查看>>
书写「简历」时,需要规避的错误
查看>>
我的友情链接
查看>>
老毛桃 win7
查看>>