首先编写一个initDataSourceList方法,并利用Spring的PostConstruct注解初始化一个DataSource 列表 。相关的DB配置从db.properties读取 。
DLOCK_NUM=2DLOCK_USER_0="user1"DLOCK_PASS_0="pass1"DLOCK_INIT_SIZE_0=2DLOCK_MAX_SIZE_0=10DLOCK_URL_0="jdbc:mysql://localhost:3306/test1"DLOCK_USER_1="user1"DLOCK_PASS_1="pass1"DLOCK_INIT_SIZE_1=2DLOCK_MAX_SIZE_1=10DLOCK_URL_1="jdbc:mysql://localhost:3306/test2"
DataSource使用阿里的DruidDataSource 。
接着最重要的一个实现getConnection(String transactionId)方法 。实现原理很简单,获取transactionId的hashcode,并对DataSource的长度取模即可 。
连接池列表设计好后,就可以实现往distributed_lock表插入数据了 。
package dlock;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.sql.*;@Componentpublic class DistributedLock {@Autowiredprivate DataSourcePool dataSourcePool;/*** 根据transactionId创建锁资源*/public String createLock(String transactionId) throws Exception{if (transactionId == null) {throw new RuntimeException("transactionId是必须的");}Connection connection = null;Statement statement = null;try {connection = dataSourcePool.getConnection(transactionId);connection.setAutoCommit(false);statement = connection.createStatement();statement.executeUpdate("INSERT INTO distributed_lock(transaction_id) VALUES ('" + transactionId + "')");connection.commit();return transactionId;}catch (SQLIntegrityConstraintViolationException icv) {//说明已经生成过了 。if (connection != null) {connection.rollback();}return transactionId;}catch (Exception e) {if (connection != null) {connection.rollback();}throwe;}finally {if (statement != null) {statement.close();}if (connection != null) {connection.close();}}}}
# 根据transactionId锁住线程
接下来利用DB的select for update特性来锁住线程 。当多个线程根据相同的transactionId并发同时操作select for update的时候,只有一个线程能成功,其他线程都block住,直到select for update成功的线程使用commit操作后,block住的所有线程的其中一个线程才能开始干活 。我们在上面的DistributedLock类中创建一个lock方法 。
public boolean lock(String transactionId) throws Exception {Connection connection = null;PreparedStatement preparedStatement = null;ResultSet resultSet = null;try {connection = dataSourcePool.getConnection(transactionId);preparedStatement = connection.prepareStatement("SELECT * FROM distributed_lock WHERE transaction_id = ? FOR UPDATE ");preparedStatement.setString(1,transactionId);resultSet = preparedStatement.executeQuery();if (!resultSet.next()) {connection.rollback();return false;}return true;} catch (Exception e) {if (connection != null) {connection.rollback();}throwe;}finally {if (preparedStatement != null) {preparedStatement.close();}if (resultSet != null) {resultSet.close();}if (connection != null) {connection.close();}}}
# 实现解锁操作
当线程执行完任务后,必须手动的执行解锁操作,之前被锁住的线程才能继续干活 。在我们上面的实现中,其实就是获取到当时select for update成功的线程对应的Connection,并实行commit操作即可 。
那么如何获取到呢?我们可以利用ThreadLocal 。首先在DistributedLock类中定义
private ThreadLocal<Connection> threadLocalConn = new ThreadLocal<>();
每次调用lock方法的时候,把Connection放置到ThreadLocal里面 。我们修改lock方法 。
public boolean lock(String transactionId) throws Exception {Connection connection = null;PreparedStatement preparedStatement = null;ResultSet resultSet = null;try {connection = dataSourcePool.getConnection(transactionId);threadLocalConn.set(connection);preparedStatement = connection.prepareStatement("SELECT * FROM distributed_lock WHERE transaction_id = ? FOR UPDATE ");preparedStatement.setString(1,transactionId);resultSet = preparedStatement.executeQuery();if (!resultSet.next()) {connection.rollback();threadLocalConn.remove();return false;}return true;} catch (Exception e) {if (connection != null) {connection.rollback();threadLocalConn.remove();}throwe;}finally {if (preparedStatement != null) {preparedStatement.close();}if (resultSet != null) {resultSet.close();}if (connection != null) {connection.close();}}}
这样子,当获取到Connection后,将其设置到ThreadLocal中,如果lock方法出现异常,则将其从ThreadLocal中移除掉 。
有了这几步后,我们可以来实现解锁操作了 。我们在DistributedLock添加一个unlock方法 。
public void unlock() throws Exception {Connection connection = null;try {connection = threadLocalConn.get();if (!connection.isClosed()) {connection.commit();connection.close();threadLocalConn.remove();}} catch (Exception e) {if (connection != null) {connection.rollback();connection.close();}threadLocalConn.remove();throw e;}}
推荐阅读
- 牛膝山楂泡水功效与作用,山楂干泡水喝的功效与作用
- 黄山白菊花是苦的吗,黄山贡菊的功效与作用是什么
- 大凉山苦荞茶的功效,苦荞茶的功效与作用
- 数据库,MySQL,实战,优化,多表联合查询排序问题优化
- 一篇文章教会你使用JS+CSS实现一个简单加载进度条的效果
- H3C交换机VXLAN二层互通的典型配置
- 白帽黑客如何使用Baidu搜索引擎高级搜索技巧
- MySQL高级SQL语句
- 荣耀升级鸿蒙后怎么样?
- SpringBoot通过JdbcTemplate操作MySQL数据库
