简单注解实现集群同步锁 简单注解实现集群同步锁(spring+redis+注解)
partner4java 人气:0互联网面试的时候,是不是面试官常问一个问题如何保证集群环境下数据操作并发问题,常用的synchronized肯定是无法满足了,或许你可以借助for update对数据加锁。本文的最终解决方式你只要在方法上加一个@P4jSyn注解就能保证集群环境下同synchronized的效果,且锁的key可以任意指定。本注解还支持了锁的超时机制。
本文需要对Redis、spring和spring-data-redis有一定的了解。当然你可以借助本文的思路对通过注解对方法返回数据进行缓存,类似com.google.code.simple-spring-memcached的@ReadThroughSingleCache。
第一步: 介绍两个自定义注解P4jSyn、P4jSynKey
P4jSyn:必选项,标记在方法上,表示需要对该方法加集群同步锁;
P4jSynKey:可选项,加在方法参数上,表示以方法某个参数作为锁的key,用来保证更多的坑,P4jSynKey并不是强制要添加的,当没有P4jSynKey标记的情况下只会以P4jSyn的synKey作为锁key。
package com.yaoguoyin.redis.lock; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <b>同步锁:</b><br/> * 主要作用是在服务器集群环境下保证方法的synchronize;<br/> * 标记在方法上,使该方法的执行具有互斥性,并不保证并发执行方法的先后顺序;<br/> * 如果原有“A任务”获取锁后任务执行时间超过最大允许持锁时间,且锁被“B任务”获取到,在“B任务”成功货物锁会并不会终止“A任务”的执行;<br/> * <br/> * <b>注意:</b><br/> * 使用过程中需要注意keepMills、toWait、sleepMills、maxSleepMills等参数的场景使用;<br/> * 需要安装redis,并使用spring和spring-data-redis等,借助redis NX等方法实现。 * * @see com.yaoguoyin.redis.lock.P4jSynKey * @see com.yaoguoyin.redis.lock.RedisLockAspect * * @author partner4java * */ @Target({ ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface P4jSyn { /** * 锁的key<br/> * 如果想增加坑的个数添加非固定锁,可以在参数上添加@P4jSynKey注解,但是本参数是必写选项<br/> * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey<br/> * */ String synKey(); /** * 持锁时间,超时时间,持锁超过此时间自动丢弃锁<br/> * 单位毫秒,默认20秒<br/> * 如果为0表示永远不释放锁,在设置为0的情况下toWait为true是没有意义的<br/> * 但是没有比较强的业务要求下,不建议设置为0 */ long keepMills() default 20 * 1000; /** * 当获取锁失败,是继续等待还是放弃<br/> * 默认为继续等待 */ boolean toWait() default true; /** * 没有获取到锁的情况下且toWait()为继续等待,睡眠指定毫秒数继续获取锁,也就是轮训获取锁的时间<br/> * 默认为10毫秒 * * @return */ long sleepMills() default 10; /** * 锁获取超时时间:<br/> * 没有获取到锁的情况下且toWait()为true继续等待,最大等待时间,如果超时抛出 * {@link java.util.concurrent.TimeoutException.TimeoutException} * ,可捕获此异常做相应业务处理;<br/> * 单位毫秒,默认一分钟,如果设置为0即为没有超时时间,一直获取下去; * * @return */ long maxSleepMills() default 60 * 1000; }
package com.yaoguoyin.redis.lock; import java.lang.annotation.ElementType; import java.lang.annotation.Inherited; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * <b>同步锁 key</b><br/> * 加在方法的参数上,指定的参数会作为锁的key的一部分 * * @author partner4java * */ @Target({ ElementType.PARAMETER }) @Retention(RetentionPolicy.RUNTIME) @Inherited public @interface P4jSynKey { /** * key的拼接顺序 * * @return */ int index() default 0; }
这里就不再对两个注解进行使用上的解释了,因为注释已经说明的很详细了。
使用示例:
package com.yaoguoyin.redis.lock; import org.springframework.stereotype.Component; @Component public class SysTest { private static int i = 0; @P4jSyn(synKey = "12345") public void add(@P4jSynKey(index = 1) String key, @P4jSynKey(index = 0) int key1) { i++; System.out.println("i=-===========" + i); } }
第二步:切面编程
在不影响原有代码的前提下,保证执行同步,目前最直接的方式就是使用切面编程
package com.yaoguoyin.redis.lock; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.core.BoundValueOperations; import org.springframework.data.redis.core.RedisTemplate; /** * 锁的切面编程<br/> * 针对添加@RedisLock 注解的方法进行加锁 * * @see com.yaoguoyin.redis.lock.P4jSyn * * @author partner4java * */ @Aspect public class RedisLockAspect { @Autowired @Qualifier("redisTemplate") private RedisTemplate<String, Long> redisTemplate; @Around("execution(* com.yaoguoyin..*(..)) && @annotation(com.yaoguoyin.redis.lock.P4jSyn)") public Object lock(ProceedingJoinPoint pjp) throws Throwable { P4jSyn lockInfo = getLockInfo(pjp); if (lockInfo == null) { throw new IllegalArgumentException("配置参数错误"); } String synKey = getSynKey(pjp, lockInfo.synKey()); if (synKey == null || "".equals(synKey)) { throw new IllegalArgumentException("配置参数synKey错误"); } boolean lock = false; Object obj = null; try { // 超时时间 long maxSleepMills = System.currentTimeMillis() + lockInfo.maxSleepMills(); while (!lock) { long keepMills = System.currentTimeMillis() + lockInfo.keepMills(); lock = setIfAbsent(synKey, keepMills); // 得到锁,没有人加过相同的锁 if (lock) { obj = pjp.proceed(); } // 锁设置了没有超时时间 else if (lockInfo.keepMills() <= 0) { // 继续等待获取锁 if (lockInfo.toWait()) { // 如果超过最大等待时间抛出异常 if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) { throw new TimeoutException("获取锁资源等待超时"); } TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills()); } else { break; } } // 已过期,并且getAndSet后旧的时间戳依然是过期的,可以认为获取到了锁 else if (System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills))) { lock = true; obj = pjp.proceed(); } // 没有得到任何锁 else { // 继续等待获取锁 if (lockInfo.toWait()) { // 如果超过最大等待时间抛出异常 if (lockInfo.maxSleepMills() > 0 && System.currentTimeMillis() > maxSleepMills) { throw new TimeoutException("获取锁资源等待超时"); } TimeUnit.MILLISECONDS.sleep(lockInfo.sleepMills()); } // 放弃等待 else { break; } } } } catch (Exception e) { e.printStackTrace(); throw e; } finally { // 如果获取到了锁,释放锁 if (lock) { releaseLock(synKey); } } return obj; } /** * 获取包括方法参数上的key<br/> * redis key的拼写规则为 "RedisSyn+" + synKey + @P4jSynKey * */ private String getSynKey(ProceedingJoinPoint pjp, String synKey) { try { synKey = "RedisSyn+" + synKey; Object[] args = pjp.getArgs(); if (args != null && args.length > 0) { MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Annotation[][] paramAnnotationArrays = methodSignature.getMethod().getParameterAnnotations(); SortedMap<Integer, String> keys = new TreeMap<Integer, String>(); for (int ix = 0; ix < paramAnnotationArrays.length; ix++) { P4jSynKey p4jSynKey = getAnnotation(P4jSynKey.class, paramAnnotationArrays[ix]); if (p4jSynKey != null) { Object arg = args[ix]; if (arg != null) { keys.put(p4jSynKey.index(), arg.toString()); } } } if (keys != null && keys.size() > 0) { for (String key : keys.values()) { synKey = synKey + key; } } } return synKey; } catch (Exception e) { e.printStackTrace(); } return null; } @SuppressWarnings("unchecked") private static <T extends Annotation> T getAnnotation(final Class<T> annotationClass, final Annotation[] annotations) { if (annotations != null && annotations.length > 0) { for (final Annotation annotation : annotations) { if (annotationClass.equals(annotation.annotationType())) { return (T) annotation; } } } return null; } /** * 获取RedisLock注解信息 */ private P4jSyn getLockInfo(ProceedingJoinPoint pjp) { try { MethodSignature methodSignature = (MethodSignature) pjp.getSignature(); Method method = methodSignature.getMethod(); P4jSyn lockInfo = method.getAnnotation(P4jSyn.class); return lockInfo; } catch (Exception e) { e.printStackTrace(); } return null; } public BoundValueOperations<String, Long> getOperations(String key) { return redisTemplate.boundValueOps(key); } /** * Set {@code value} for {@code key}, only if {@code key} does not exist. * <p> * See http://redis.io/commands/setnx * * @param key * must not be {@literal null}. * @param value * must not be {@literal null}. * @return */ public boolean setIfAbsent(String key, Long value) { return getOperations(key).setIfAbsent(value); } public long getLock(String key) { Long time = getOperations(key).get(); if (time == null) { return 0; } return time; } public long getSet(String key, Long value) { Long time = getOperations(key).getAndSet(value); if (time == null) { return 0; } return time; } public void releaseLock(String key) { redisTemplate.delete(key); } }
RedisLockAspect会对添加注解的方法进行特殊处理,具体可看lock方法。
大致思路就是:
1、首选借助redis本身支持对应的setIfAbsent方法,该方法的特点是如果redis中已有该数据不保存返回false,不存该数据保存返回true;
2、如果setIfAbsent返回true标识拿到同步锁,可进行操作,操作后并释放锁;
3、如果没有通过setIfAbsent拿到数据,判断是否对锁设置了超时机制,没有设置判断是否需要继续等待;
4、判断是否锁已经过期,需要对(System.currentTimeMillis() > getLock(synKey) && (System.currentTimeMillis() > getSet(synKey, keepMills)))进行细细的揣摩一下,getSet可能会改变了其他人拥有锁的超时时间,但是几乎可以忽略;
5、没有得到任何锁,判断继续等待还是退出。
第三步:spring的基本配置
#*****************jedis连接参数设置*********************# #redis服务器ip # redis.hostName=127.0.0.1 #redis服务器端口号# redis.port=6379 #redis服务器外部访问密码 redis.password=XXXXXXXXXX #************************jedis池参数设置*******************# #jedis的最大分配对象# jedis.pool.maxActive=1000 jedis.pool.minIdle=100 #jedis最大保存idel状态对象数 # jedis.pool.maxIdle=1000 #jedis池没有对象返回时,最大等待时间 # jedis.pool.maxWait=5000 #jedis调用borrowObject方法时,是否进行有效检查# jedis.pool.testOnBorrow=true #jedis调用returnObject方法时,是否进行有效检查 # jedis.pool.testOnReturn=true
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:jee="http://www.springframework.org/schema/jee"xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"xmlns:redis="http://www.springframework.org/schema/redis" xmlns:cache="http://www.springframework.org/schema/cache" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd"> <!-- 开启注解 --> <aop:aspectj-autoproxy /> <bean class="com.yaoguoyin.redis.lock.RedisLockAspect" /> <!-- 扫描注解包范围 --> <context:component-scan base-package="com.yaoguoyin" /> <!-- 引入redis配置 --> <context:property-placeholder location="classpath:config.properties" /> <!-- 连接池 --> <bean id="poolConfig" class="redis.clients.jedis.JedisPoolConfig"> <property name="minIdle" value="${jedis.pool.minIdle}" /> <property name="maxIdle" value="${jedis.pool.maxIdle}" /> <property name="maxWaitMillis" value="${jedis.pool.maxWait}" /> </bean> <!-- p:password="${redis.pass}" --> <bean id="redisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" p:host-name="${redis.hostName}" p:port="${redis.port}" p:password="${redis.password}" p:pool-config-ref="poolConfig" /> <!-- 类似于jdbcTemplate --> <bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate" p:connection-factory-ref="redisConnectionFactory" /> </beans>
redis的安装本文就不再说明。
测试
package com.yaoguoyin.redis; import org.junit.runner.RunWith; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = { "classpath:META-INF/spring/redis.xml" }) public class BaseTest extends AbstractJUnit4SpringContextTests { }
package com.yaoguoyin.redis.lock; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import com.yaoguoyin.redis.BaseTest; public class RedisTest extends BaseTest { @Autowired private SysTest sysTest; @Test public void testHello() throws InterruptedException { for (int i = 0; i < 100; i++) { new Thread(new Runnable() { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } sysTest.add("xxxxx", 111111); } }).start(); } TimeUnit.SECONDS.sleep(20); } @Test public void testHello2() throws InterruptedException{ sysTest.add("xxxxx", 111111); TimeUnit.SECONDS.sleep(10); } }
你可以对
void com.yaoguoyin.redis.lock.SysTest.add(@P4jSynKey(index=1) String key, @P4jSynKey(index=0) int key1)
去除注解@P4jSyn进行测试对比。
ps:本demo的执行性能取决于redis和Java交互距离;成千山万单锁并发建议不要使用这种形式,直接通过redis等解决,本demo只解决小并发不想耦合代码的形式。
以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,同时也希望多多支持!
加载全部内容