分布式锁的应用场景与分布式锁实现(二):基于Redis实现分布式锁
分布式锁的应用场景与分布式锁实现(一):传统锁处理并发及传统锁的问题
基于Redis实现分布式锁
所有代码已同步到GitCode:https://gitcode.net/ruozhuliufeng/distributed-project.git
基本实现
借助Redis中的命令setnx(key,value),key不存在就新增,存在就什么都不做。同时有多个客户端发送setnx命令,只有一个客户端可以成功,返回1(true);其他客户端返回0(false)。
- 多个客户端同时获取锁(setnx)
- 获取成功,执行业务逻辑,执行完成释放锁(del)
- 其他客户端等待重试
改造StockService方法:
/*** 减库存*/@Overridepublic void checkAndLock() {// 加锁setnxBoolean lock = this.redisTemplate.opsForValue().setIfAbsent("lock", "1");// 重试:递归调用if (!lock){try {Thread.sleep(50);this.deduct();} catch (InterruptedException e) {e.printStackTrace();}} else {try {// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣减库存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {// 解锁this.redisTemplate.delete("lock");}}}
其中,加锁也可以使用循环:
// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "1")){try {Thread.sleep(40);} catch (InterruptedException e) {e.printStackTrace();}
}
解锁:
this.redisTemplate.delete("lock");
重置缓存值,使用Jmeter进行压力测试,并获取库存余量:0
防死锁
代码均已上传至GitCode,可根据提交信息获取文件的更改内容
]
问题:setnx刚刚获取到锁,当前服务器宕机,导致del释放锁无法执行,进而导致锁无法释放(死锁)
解决:给锁设置过期时间,自动释放锁
设置过期时间的两种方式:
- 通过expire设置过期时间(缺乏原子性:如果setnx与expire之间出现异常,锁也无法释放)
- 通过set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)
防误删
问题:可能会释放其他服务器的锁
场景:如果业务逻辑的执行时间是7s。执行流程如下:
- index1业务逻辑没执行完,3秒后锁被自动释放
- index2业务获取到锁,执行业务逻辑,3秒后锁被自动释放
- index3业务获取到锁,执行业务逻辑
- index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放
- 最终等于没锁的情况,仍会导致超卖现象发生
解决:setnx获取到锁时,设置一个指定的唯一值(例如:uuid),释放前获取这个值,判断是否是自己的锁。
实现如下:
问题:删除操作缺乏原子性
场景:
- index1执行删除时,查询到的lock值确实和uuid相同
- index1执行删除前,lock刚好过期时间已到,被redis自动释放
- index2获取到了lock
- index1执行了删除,此时会把index2的lock删除
解决方案:没有一个命令可以同时做到判断+删除,所以只能通过其他方式实现(lua脚本)
Redis中的lua脚本
lua脚本可以一次性发送多个指令给redis,由于Redis是单线程的,执行指令遵守one-by-one规则
现实问题
Redis采用单线程架构,可以保证单个命令的原子性,但是无法保证一组命令在高并发场景下的原子性。例如:
在串行场景下:A和B的值肯定都是3
在并发场景下:A和B的值可能在0-6之间。
极限情况下1:
则A的结果是0,B的结果是3
极限情况下2:
则A和B的结果都是6。
如果Redis客户端通过lua脚本把3个命令一次性发送给redis服务器,那么这三个指令就不会被其他客户端指令打断。Redis也保证脚本会以原子性(atomic)的方式执行:当某个脚本正在运行的时候,不会有其他脚本或Redis命令被执行。这和使用MULTI/EXEC包围的事务很类似。
但是MULTI/EXEC方法来使用事务功能,将一组命令打包执行,无法进行业务逻辑的操作。这期间有某一条命令执行报错(例如给字符串自增),其他的命令还是会执行,并不会回滚。
lua介绍
lua是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放,其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
设计目的
其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。
lua特性
- 轻量级:它用标准C语言编写并以源代码形式开放,编译后仅仅一百余K,可以很方便的嵌入到别的程序里。
- 可扩展:Lua提供了非常易于使用的扩展接口和机制:由于宿主语言(通常是C或C++)提供这些功能,Lua可以使用它们,就像是本来就内置的功能一样。
- 其他特性:
- 支持面向过程(procedure-oriented)编程和函数式变成(functional programming);
- 自动内存管理;只提供了一种通用类型的表(table),用它可以实现数组、哈希表、集合、对象;
- 语言内置模式匹配;闭包(closure);函数也可以看做一个值;提供多线程(协同进程,并非操作系统所支持的线程)支持;
- 通过闭包和table可以很方便的支持面向对象变成所需要的一些关键机制,比如数据抽象、虚函数、继承和重载等。
lua基本语言
这里不做深究,感兴趣可以到官方教程或菜鸟教程,这里以Redis中可能会用到的部分语法作介绍。
变量:
a = 5 -- 全局变量
local b = 10 -- 局部变量,redis只支持局部变量
a,b = 10,2*x -- 等价于 a = 10; b = 2*x
流程控制:
if(布尔表达式 1)
then -- [ 在布尔表达式 1 为true时执行改语句块 ]
elseif(布尔表达式 2)
then -- [ 在布尔表达式 2 为true时执行改语句块 ]
else -- [ 在以上表达式都不为true时执行改语句块 ]
end
Redis执行lua脚本-EVAL指令
在Redis中需要通过eval命令执行lua脚本。
格式:
EVAL script numkeys key [key ...] arg [arg ...]
script: lua脚本字符串,这段lua脚本不需要(也不应该)定义函数
numkeys: lua脚本中keys数组的大小
key [key ...]: KEYS数组中的元素
arg [arg ...]: ARGV数组中的元素
- 案例1:基本案例
EVAL "return 10" 0
# 输出:10
- 案例2:动态传参
EVAL "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 5 10 20 30 40 50 60 70 80 90
# 输出:10 20 60 70
# 下标从1开始
EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 10 20
# 输出:0
EVAL "if KEYS[1] > ARGV[1] then return 1 else return 0 end" 1 20 10
# 输出:1
传入了两个参数 10 和 20,KEYS的长度是1,所以KEYS中有一个元素10,剩下的一个20就是ARGV数组中的元素
redis.call()中的redis是redis中提供的lua脚本类库,仅在redis环境中使用该类库。
- 案例3:执行redis类库方法
set a 10 -- 设置一个a 值为10
EVAL "return redis.call('get','a')" 0
# 通过return把call方法返回给redis客户端,打印:10
注意:**脚本里使用的所有键都应该由KEYS数组来传递。**但并不是强制性的,代价是这样写出的脚本不能被Redis集群所兼容。
- 案例4:给Redis类库方法动态传参
EVAL "return redis.call('set',KEYS[1],ARGV[1])" 1 b 20
以上案例基本可以应付Redis分布式锁所需要的脚本知识了。
- 案例5:pcall函数的使用(了解)
-- 当call()在执行命令的过程中发生错误时,脚本会停止运行,并返回一个脚本错误,输出错误信息
EVAL "return redis.call('sets', KEYS[1], ARGV[1]),redis.call('set', KEYS[2], ARGV[2])" 2 c d 20 30
- pcall函数不影响后续指令的执行
EVAL "return redis.pcall('sets',KEYS[1],ARGV[1]),redis.pcall('set' ,KEYS[2],ARGV[2])" 2 c d 20 30
注意:set方法写成了sets,肯定会报错。
使用lua保证删除原子性
删除lua脚本:
if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end
更新代码:
package tech.msop.distributed.lock.service.impl;import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;/*** 库存服务实现类 <br/>*/
@Service
@Slf4j
public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity>implements IStockService {@Autowiredprivate StringRedisTemplate redisTemplate;/*** 减库存*/@Overridepublic void checkAndLock() {String uuid = UUID.randomUUID().toString();// 加锁 setnxwhile (!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)) {// 重试循环try {Thread.sleep(30);} catch (InterruptedException e) {throw new RuntimeException(e);}}try {// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock");// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.更新到数据库redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {String script = "if redis.call('get',KEYS[1]) == ARGV[1] " +"then " +" return redis.call('del',KEYS[1]) " +"else " +" return 0 " +"end";redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList("lock"),uuid);}}
}
进行压力测试并查询库存余量:0
可重入锁
以上加锁命令存在一个问题,由于加锁命令使用了SETNX,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行。
用一段Java代码解释可重入:
public synchronized void a(){b();
}
public synchronized void b(){// pass
}
假设X线程在a方法获取锁之后,继续执行b方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。
锁明明是X线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己。
而可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加1,然后在执行方法逻辑。退出加锁方法之后,加锁此处再减1,当加锁次数为0时,锁才被真正的释放。
可以看到可重入锁的最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。
通过阅读JDK中的可重入锁源码,可知可重入锁ReentrantLock的加锁流程:ReentrantLock.lock) —> NonfairSync.lock() —> AQS.acquire(1) —> NonfairSync.tryAcquire(1) —> Sync.nonfairTryAcquire(1)
- CAS获取锁,如果没有线程占用锁 (state == 0),加锁成功并记录当前线程为有锁线程(两次)
- 如果state的值不为0,说明锁已经占用,则判断当前线程是否为有锁线程,若是,则重入(state + 1)
- 若否,加锁失败,入队等待
可重入锁的解锁流程:Reentrant.unlock() —> AQS.release(1) —> SyncRelease(1)
- 判断当前线程是否为有锁线程,不是则抛出异常
- 对state的值减1之后,判断state的值是否为0,为0则解锁成功,返回true
- 如果减1后的值不为0,返回false
确定解决方案:Redis+hash
加锁脚本
参照ReentrantLock中的非公平可重入锁实现分布式可重入锁:hash + lua脚本
Redis提供了Hash(哈希表)这种可以存储键值对数据结构。所以我们可以使用Redis Hash存储锁的重入次数,然后利用lua脚本判断逻辑。通过JDK的可重入锁分析,继续分析Redis中的加锁流程:
- 判断锁是否存在(exists),不存在则直接获取锁 hset key field value
- 如果锁存在则判断是否是自己的锁(hexists),如果是自己的锁则重入:hincrby key field increment
- 否则重试:递归/循环
- 锁lua脚本
if (redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1)
thenredis.call('hincrby', KEYS[1], ARGV[1], 1);redis.call('expire', KEYS[1], ARGV[2]);return 1;
elsereturn 0;
end-- key: lock
-- arg: uuid 30
假设值为:KEYS[lock],ARGV[uuid,expire]
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁此处加1
解锁脚本
分析Redis的解锁流程:
- 判断自己的锁是否存在(hexists),不存在则返回nil
- 如果自己的锁存在,则减1(hincrby - 1),判断减一后的值是否为0,为0,则返回1
- 不为0,返回0
- 解锁lua脚本
-- 判断hash set 可重入key的值是否等于0
-- 如果为 nil 代表 自己的锁不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then return nil;
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then return 0;
else redis.call('del', KEYS[1]); return 1;
end-- key: lock
-- arg: uuid
代码实现
由于后续会有基于Zookeeper和基于MySQL实现的分布式锁,我们可以通过工厂类,获取不同类型的分布式锁。
DistributedLockClient工厂类具体实现:
@Component
public class DistributedLockClient {@Autowiredprivate StringRedisTemplate redisTemplate;private String uuid;public DistributedLockClient() {this.uuid = UUID.randomUUID().toString();}public DistributedRedisLock getRedisLock(String lockName){return new DistributedRedisLock(redisTemplate, lockName, uuid);}
}
DistributedRedisLock实现如下:
public class DistributedRedisLock implements Lock {private StringRedisTemplate redisTemplate;private String lockName;private String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid;}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加锁方法* @param time* @param unit* @return* @throws InterruptedException*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1){this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), getId(), String.valueOf(expire))){Thread.sleep(50);}return true;}/*** 解锁方法*/@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), getId());if (flag == null){throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}/*** 给线程拼接唯一标识* @return*/String getId(){return uuid + ":" + Thread.currentThread().getId();}
}
使用及测试
在业务代码中使用:
package tech.msop.distributed.lock.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.lock.DistributedLockClient;
import tech.msop.distributed.lock.lock.DistributedRedisLock;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;/*** 库存服务实现类 <br/>*/
@Service
@Slf4j
public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity>implements IStockService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate DistributedLockClient distributedLockClient;/*** 减库存*/@Overridepublic void checkAndLock() {DistributedRedisLock redisLock = this.distributedLockClient.getRedisLock("lock");redisLock.lock();try {// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣减库存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {redisLock.unlock();}}
}
使用Jmet测试并查询库存余量:0
测试可重入性:
自动续期
借助Timer定时器+lua脚本实现自动续期:
if (redis.call('hexists',KEYS[1],ARGV[1]) == 1)
then return redis.call('expire',KEYS[1],ARGV[2])
else return 0
end-- key: lock
-- arg: uuid 30
修改Redis分布式锁,实现锁自动续期:
package tech.msop.distributed.lock.lock;import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;import java.util.Arrays;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;/*** 基于Redis实现分布式锁*/
public class DistributedRedisLock implements Lock {private final StringRedisTemplate redisTemplate;private final String lockName;private final String uuid;private long expire = 30;public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName, String uuid) {this.redisTemplate = redisTemplate;this.lockName = lockName;this.uuid = uuid + ":" + Thread.currentThread().getId();}@Overridepublic void lock() {this.tryLock();}@Overridepublic void lockInterruptibly() throws InterruptedException {}@Overridepublic boolean tryLock() {try {return this.tryLock(-1L, TimeUnit.SECONDS);} catch (InterruptedException e) {e.printStackTrace();}return false;}/*** 加锁方法** @param time 超时时间* @param unit 时间单位* @return 加锁是否成功* @throws InterruptedException 异常*/@Overridepublic boolean tryLock(long time, TimeUnit unit) throws InterruptedException {if (time != -1) {this.expire = unit.toSeconds(time);}String script = "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" redis.call('hincrby', KEYS[1], ARGV[1], 1) " +" redis.call('expire', KEYS[1], ARGV[2]) " +" return 1 " +"else " +" return 0 " +"end";while (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), uuid, String.valueOf(expire))) {Thread.sleep(50);}// 在加锁成功返回之前,开启定时器,自动续期renewExpire();return true;}/*** 解锁方法*/@Overridepublic void unlock() {String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " +"then " +" return nil " +"elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " +"then " +" return redis.call('del', KEYS[1]) " +"else " +" return 0 " +"end";Long flag = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class), Arrays.asList(lockName), uuid);if (flag == null) {throw new IllegalMonitorStateException("this lock doesn't belong to you!");}}@Overridepublic Condition newCondition() {return null;}/*** 给线程拼接唯一标识** @return 唯一标识*/
// String getId() {
// return uuid + ":" + Thread.currentThread().getId();
// }private void renewExpire(){String script = "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " +"then " +" return redis.call('expire', KEYS[1], ARGV[2]) " +"else " +" return 0 " +"end";new Timer().schedule(new TimerTask() {@Overridepublic void run() {if (redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class),Arrays.asList(lockName),uuid,String.valueOf(expire))){renewExpire();}}},this.expire * 1000 /3);}}
在tryLock方法中使用:
构造方法作如下修改:
解锁方法作如下修改:
手写分布式锁小结
特征
-
独占排他使用 setnx
-
防止死锁:设置锁的过期时间
- 如果Redis客户端程序从Redis服务中获取到锁突然宕机,无法执行后续操作并释放锁,则其他程序也无法获取到锁,并导致服务阻塞
- 解决:设置锁的过期时间,即使未手动释放锁,也会在一定时间后自动过期释放
- 不可重入:需要可重入
-
原子性
- 获取锁和设置过期时间必须具有原子性:set key value ex 3 nx
- 判断和释放锁之间,也需要原子性:借助lua脚本实现
-
防误删:解铃还须系铃人
- 先判断是否是自己的锁,再删除
-
可重入性:hash(key field value) + lua脚本
-
自动续期:Timer定时器 + lua脚本
- 程序执行时间过长,超过锁的过期时间,为了防止锁机制失效,需要判断程序是否执行完成,未完成则需要续期锁的过期时间
-
在集群情况下,导致锁机制失效:
- 客户端程序C1,从主服务器中获取锁
- 从服务器还没来得及同步数据,主服务器宕机
- 于是从服务器升级为主服务器
- 客户端程序C2就从新主服务器中获取到锁,导致锁机制失效
锁操作
加锁
- setnx:独占排他、死锁、不可重入、原子性
- set k v ex 30 nx:独占排他、死锁 不可重入
- hash + lua脚本:可重入锁
- 判断锁是否被占用(exists),如果没有被占用则直接获取锁(hset/hincrby),并设置过期时间(expire)
- 如果锁被占用,则判断是否当前线程占用的(hexists),如果是则重入(hincrby)并重置过期时间(expire)
- 否则获取锁失败,在代码中重试
- Time定时器 + lua脚本:实现锁的自动续期
- 判断锁是否是自己的锁(hexists==1),如果是自己的锁,则执行expire重置过期时间
解锁
- del:可能导致误删
- 先判断在删除同时保证原子性:lua脚本
- hash + lua脚本:可重入
- 判断当前线程的锁是否存在,不存在则返回nil,将来在代码中排除异常
- 存在则直接减1(hincrby - 1),判断减1后的值是否为0,为0则释放锁(del),并返回1
- 不为0,则返回0
获取锁重试
- 递归获取锁
- 循环获取锁
红锁算法
Redis集群状态下的问题:
- 客户端A从master获取到锁
- 在master将锁同步到slave之前,master宕机了
- slave节点被晋级到master节点
- 客户端B取得了同一个资源被客户端A已经获取到的另一个锁
安全失效!
解决集群下锁失效,参照redis官方网站针对redlock文档:https://redis.io/topics/distlock
在算法的分布式版本中,我们假设有N个Redis服务器。这些节点是完全独立的,因此我们不使用复制或任何其他隐式协调系统。在前面已经描述了如何在单个实例中安全地获取和释放锁,在分布式锁算法中,将使用相同的方法在单个实例中获取和释放锁。 将N设置为5是一个合理的值,因此需要在不同的计算机或虚拟机上运行5个Redis主从服务器,确保它们以独立的方式发生故障。
为了获取锁,客户端执行以下操作:
- 1、客户端以毫秒为单位获取当前时间的时间戳,作为起始时间。
- 2、客户端尝试在所有N个实例中顺序使用相同的键名、相同的随机值来获取锁定。每个实例尝试获取锁都需要时间,客户端应该设置一个远小于总锁定时间的超时时间。例如,如果自动释放锁时间为10秒,则尝试获取锁的超时时间可能在5到50毫秒之间。这样可以防止客户端长时间与处于故障状态的Redis节点进行通信:如果某个示例不可用,尽快尝试与下一个实例进行通信。
- 3、客户端获取当前时间 减去 步骤1 中获取的起始时间,来计算获取锁所花费的时间。当且仅当客户端能够在大多数实例(至少3个)中获取锁时,并且获取锁所花费的总时间小于锁有效时间,则认为已获取锁。
- 4、如果获取了锁,则将锁有效时间减去获取锁花费的时间,如步骤3中所计算
- 5、如果客户端由于某种原因(无法锁定N / 2 + 1个实例或有效时间为负)而未能获得该锁,它将尝试解锁所有实例(即使没有锁定成功的实例)。
每台计算机都有一个本地时钟,我们通常可以依靠不同的计算机产生很小的时钟漂移。只有在拥有锁的客户端将在锁的有效时间内(如步骤3中获得的)减去一段时间(仅几毫秒)的情况下终止工作,才能保证这一点。以补偿进程之间的时钟漂移。
当客户端无法获取锁时,它应该在随机延迟后重试,以避免同时获取同一资源的多个客户端之间不同步(这可能会导致脑裂的情况:没人胜)。同样,客户端在大多数Redis实例中尝试获取锁的速度越快,出现裂脑情况(以及需要重试)的窗口就越小,因此在理想情况下,客户端应尝试将SET命令发送到N个实例同时使用多路复用。
值得强调的是,对于未能获得大多数锁的客户端,尽快释放(部分)获得的锁有多么重要,这样就不必等待锁定期满才能再次获得锁(但是,如果发生了网络分区,并且客户端不再能够与Redis实例进行通信,则在等待密钥到期时需要付出可用性损失)。
Redisson中的分布式锁
Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet、Set、Multimap、SortedSet、Map、List、Queue、BlockingQueue、Semaphore、Lock、AtomicLong、CountDownLatch、Publish/Subscribe、Bloom filter、Remote Service、Spring cache、Executor Service、Live Object Service、Scheduler Service)。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
官方文档地址:https://github.com/redisson/redisson/wiki
可重入锁(Reentrant Lock)
基于Redis的Redisson分布式可重入锁RLock
Java对象实现了java.util.concurrent.locks.Lock
接口。
众所周知,如果负责存储这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson示例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间是30秒钟,也可以通过吸怪Config.lockWatchdogTimeout
来另行指定。
RLock
对象完全符合Java的Lock规范。也就是说只有锁的进程才能解锁,其他进程则会抛出IllegalMonitorStateException
错误。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间锁便自动解开了。
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {try {...} finally {lock.unlock();}
}
- 1、引入Redisson依赖
<!-- Redisson -->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.21.3</version>
</dependency>
- 2、添加配置
package tech.msop.distributed.lock.config;import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** Redisson 配置*/
@Configuration
public class RedissonConfig {/*** Redisson 客户端配置** @return Redisson客户端*/@Beanpublic RedissonClient redissonClient() {// 初始化配置对象Config config = new Config();// 单机Redis服务config.useSingleServer().setAddress("redis://127.0.0.1:6379") // redis服务地址,必须 redis://ip:port
// .setDatabase(0) // 指定Redis数据库编号
// .setUsername("") // redis 用户名
// .setPassword("")// redis 密码
// .setConnectionMinimumIdleSize(10)// 连接池最小空闲连接数
// .setConnectionPoolSize(50) // 连接池最大线程数
// .setIdleConnectionTimeout(60000) // 线程超时时间
// .setConnectTimeout() // 客户端获取redis 链接的超时时间
// .setTimeout()// 响应超时时间;return Redisson.create(config);}
}
- 3、代码中使用
package tech.msop.distributed.lock.service.impl;import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import tech.msop.distributed.lock.constants.StockConstant;
import tech.msop.distributed.lock.entity.StockEntity;
import tech.msop.distributed.lock.lock.DistributedLockClient;
import tech.msop.distributed.lock.lock.DistributedRedisLock;
import tech.msop.distributed.lock.mapper.StockMapper;
import tech.msop.distributed.lock.service.IStockService;/*** 库存服务实现类 <br/>*/
@Service
@Slf4j
public class StockServiceImpl extends ServiceImpl<StockMapper, StockEntity>implements IStockService {@Autowiredprivate StringRedisTemplate redisTemplate;@Autowiredprivate DistributedLockClient distributedLockClient;@Autowiredprivate RedissonClient redissonClient;/*** 减库存*/@Overridepublic void checkAndLock() {RLock lock = redissonClient.getLock("lock");lock.lock();try {// 1. 查询库存信息String stock = redisTemplate.opsForValue().get("stock").toString();// 2. 判断库存是否充足if (stock != null && stock.length() != 0) {Integer st = Integer.valueOf(stock);if (st > 0) {// 3.扣减库存redisTemplate.opsForValue().set("stock", String.valueOf(--st));}}} finally {lock.unlock();}}
}
- 4、使用jmeter进行压力测试并获取库存余量:0
公平锁(Fair Lock)
基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock
接口的一种RLock
对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会在等待5秒后继续下一个线程,也就是说如果前面5个线程都处于等待状态,那么后面的线程会等待至少25秒。
RLock fairLock = redissonClient.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
fairLock.lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
fairLock.unlock();
联锁(MultiLock)(了解)
基于Redis的Redisson分布式联锁RedissonMultiLock
对象可以将多个RLock
对象关联为一个联锁,每个RLock
对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 所有的锁都上锁成功才算成功。
lock.lock();
...
lock.unlock();
红锁(RedLock)(了解)
基于Redis的Redisson红锁RedissonRedLock
对象实现了Redlock介绍的加锁算法。该对象也可以用来将多个RLock
对象关联为一个红锁,每个RLock
对象实例可以来自于不同的Redisson实例。
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
读写锁(ReadWriteLock)
基于Redis的Redisson分布式可重入读写锁RReadWriteLock
Java对象实现了java.util.concurrent.locks.ReadWriteLock
接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
添加StockController方法:
@GetMapping("test/read")
public String testRead(){String msg = stockService.testRead();return "测试读";
}@GetMapping("test/write")
public String testWrite(){String msg = stockService.testWrite();return "测试写";
}
添加StockService方法:
public String testRead() {RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");rwLock.readLock().lock(10, TimeUnit.SECONDS);System.out.println("测试读锁。。。。");// rwLock.readLock().unlock();return null;
}public String testWrite() {RReadWriteLock rwLock = this.redissonClient.getReadWriteLock("rwLock");rwLock.writeLock().lock(10, TimeUnit.SECONDS);System.out.println("测试写锁。。。。");// rwLock.writeLock().unlock();return null;
}
打开开两个浏览器窗口测试:
- 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始
- 同时访问读:不用等待
- 先写后读:读要等待(约10s)写完成
- 先读后写:写要等待(约10s)读完成
信号量(Semaphore)
基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore
采用了与java.util.concurrent.Semaphore
相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.trySetPermits(3);
semaphore.acquire();
semaphore.release();
在StockController添加方法:
@GetMapping("test/semaphore")
public String testSemaphore(){this.stockService.testSemaphore();return "测试信号量";
}
在StockService添加方法:
public void testSemaphore() {RSemaphore semaphore = this.redissonClient.getSemaphore("semaphore");semaphore.trySetPermits(3);try {semaphore.acquire();TimeUnit.SECONDS.sleep(5);System.out.println(System.currentTimeMillis());semaphore.release();} catch (InterruptedException e) {e.printStackTrace();}
}
添加测试用例:并发10次,循环一次
控制台效果:
控制台1:
1606960790234
1606960800337
1606960800443
1606960805248控制台2:
1606960790328
1606960795332
1606960800245控制台3:
1606960790433
1606960795238
1606960795437
由此可知:
1606960790秒有3次请求进来:每个控制台各1次
1606960795秒有3次请求进来:控制台2有1次,控制台3有2次
1606960800秒有3次请求进来:控制台1有2次,控制台2有1次
1606960805秒有1次请求进来:控制台1有1次
闭锁(CountDownLatch)
基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch
采用了与java.util.concurrent.CountDownLatch
相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
需要两个方法:一个等待,一个计数countDown
给StockController添加测试方法:
@GetMapping("test/latch")
public String testLatch(){stockService.testLatch();return "班长锁门。。。";
}@GetMapping("test/countdown")
public String testCountDown(){stockService.testCountDown();return "出来了一位同学";
}
给StockService添加测试方法:
public void testLatch() {RCountDownLatch cdl = this.redissonClient.getCountDownLatch("cdl");cdl.trySetCount(6);try {cdl.await();} catch (InterruptedException e) {e.printStackTrace();}
}public void testCountDown() {RCountDownLatch cdl = this.redissonClient.getCountDownLatch("cdl");cdl.countDown();
}
重启测试,打开两个页面:当第二个请求执行6次之后,第一个请求才会执行。
相关文章:

分布式锁的应用场景与分布式锁实现(二):基于Redis实现分布式锁
分布式锁的应用场景与分布式锁实现(一):传统锁处理并发及传统锁的问题 基于Redis实现分布式锁 所有代码已同步到GitCode:https://gitcode.net/ruozhuliufeng/distributed-project.git 基本实现 借助Redis中的命令setnx(key&a…...

【JavaSE】Java基础语法(四十二):NIO
文章目录 1. 概述2. NIO与BIO的区别3. NIO三大模块4. NIO创建缓冲区对象【应用】5. NIO缓冲区添加数据【应用】6. NIO缓冲区获取数据【应用】7. 小结 1. 概述 BIO Blocking IO,阻塞型IONIO No Blocking IO,非阻塞型IO阻塞IO的弊端 在等待的过程中,什么事也做不了非阻塞IO的好处…...

Linux---systemctl
1. systemctl命令 Linux系统很多软件(内置或第三方)均支持使用systemctl命令控制:启动、停止、开机自启。 能够被systemctl管理的软件,一般也称之为:服务 语法:systemctl start | stop | status | enabl…...

零钱兑换,凑零钱问题,从暴力递归到动态规划(java)
凑零钱问题,从暴力递归到动态规划 leetcode 322 题 零钱兑换暴力递归(这个会超时,leetcode 跑不过去)递归缓存动态规划优化暴力递归动态规划专题 leetcode 322 题 零钱兑换 322 零钱兑换 - 可以打开链接测试 给你一个整数数组 c…...

Vue登录界面精美模板分享
文章目录 🐒个人主页🏅Vue项目常用组件模板仓库📖前言:🎀源码如下: 🐒个人主页 🏅Vue项目常用组件模板仓库 📖前言: 本篇博客主要提供vue组件之登陆组件源码…...

Linux设备驱动程序(二)——建立和运行模块
文章目录 前言一、设置测试系统二、Hello World 模块1、代码详解2、执行效果 三、内核模块相比于应用程序1、用户空间和内核空间2、内核的并发3、当前进程4、几个别的细节 四、编译和加载1、编译模块2、加载和卸载模块3、版本依赖 五、内核符号表六、预备知识七、初始化和关停1…...

【算法】单调栈问题
文章目录 题目思路分析代码实现 题目 给定一个不含有重复值的数组arr,找到每一个i位置左边和右边离i位置最近且值比arr[i]小的位置,返回所有位置相应的消息。 比如arr{3,4,1,5,6,2,…...

Hack The Box - 关卡Dancing
SMB(全称是Server Message Block)是一个协议名,可用于在计算机间共享文件、打印机、串口等,电脑上的网上邻居就是靠它实现的。 SMB 是一种客户机/服务器、请求/响应协议。通过 SMB 协议,客户端应用程序可以在各种网络环境下读、写服务器上的…...

【软件设计与体系结构】 软件体系结构风格
软件体系结构(Software Architecture) 软件体系结构(Software Architecture)包括构成系统的设计元素的描述、 设计元素 之间的交互、 设计元素的组合模式以及在这些模式中的约束。 定义 软件体系结构表示系统的框架结构…...

detectron2 使用教程
本范例演示使用非常有名的目标检测框架detectron2 🤗🤗 在自己的数据集(balloon数据)上训练实例分割模型MaskRCNN的方法。 detectron2框架的设计有以下一些优点: 1,强大:提供了包括目标检测、实例分割、全景分割等非常广泛的视觉任务模型库。 2,灵活:可以通过注册机…...

哈希表常用数据结构
哈希表常用数据结构 查询一个元素是否出现过,或者一个元素是否在集合里的时候,就要第一时间想到哈希法。 哈希法也是空间换时间,因为我们要使用额外的数组,set或者是map来存放数据,才能实现快速的查找。 集合底层实现…...

Java字节流
4 字节流 字节流抽象基类 InputStream:这个抽象类是表示字节输入流的所有类的超类OutputStream:这个抽象类是表示字节输出流的所有类的超类子类名特点:子类名称都是以其父类名作为子类名的后缀4.1 IO流概述和分类 IO流概述: IO: 输入/输出(Input/Output)流:是一种抽象概念…...

arm3399主板-使用ubuntu20.04搭建LVS-DR(netplan)
目录 一、规划 1、网络拓扑 2、检查 二、配置设备 1、配置LVS 1.配置IP转发 2.清除防火墙 3.安装ipvsadm工具 4.配置VIP 5.netplan与NetworkManager介绍 6.添加LVS规则 1.清除防火墙 2.添加伪装IP 3.安装web服务 4. 修改内核参数,防止IP冲突 3、配置w…...

Go中同/异步与锁的应用~~sync包
Go中锁的实现~~sync包 go中sync包中提供了互斥锁; 在前面Go中channel文章中我们使用了time.Sleep()函数使得main函数的Goroutine阻塞至所有协程Goroutine结束,但这并不是一个很好的办法,因为我们实际应用中并不能准确知道协程什么时候结束(这里面要考虑服务器的性能,网络波动以…...

Flask知识点2
1、flash() get_flashed_messages() : 用来消耗flash方法中存储的消息 使用flash存储消息时,需要设置SECRET_KEY flash 内部消息存储依赖了session 2、CSRF(Cross Site Request Forgery) 跨站请求伪造,指攻击者盗用你的身份发送恶意请求 CSRFProt…...

R语言生物群落(生态)数据统计分析与绘图(从数据整理到分析结果展示)
R 语言作的开源、自由、免费等特点使其广泛应用于生物群落数据统计分析。生物群落数据多样而复杂,涉及众多统计分析方法。以生物群落数据分析中的最常用的统计方法回归和混合效应模型、多元统计分析技术及结构方程等数量分析方法为主线,通过多个来自经典…...

代码随想录训练营Day58| 739. 每日温度 496.下一个更大元素 I
目录 学习目标 学习内容 739. 每日温度 496.下一个更大元素 I 学习目标 739. 每日温度 496.下一个更大元素 I 学习内容 739. 每日温度 739. 每日温度 - 力扣(LeetCode)https://leetcode.cn/problems/daily-temperatures/ class Solution:def da…...

设计模式-命令模式
命令模式 问题背景命令模式基本介绍UML类图 解决方案UML类图代码示例 问题背景 1)随着现在科技越来越先进,我们在家庭中对物品的开关都不需要亲自走过去来进行了。我们只需要通过手机APP中的按键来远程执行这个命令。 2)其实这就是命令模式&…...

软考——下午题部分,例题一,二,三,六
例题一 11年上半年 病人,护理人员,医生 D 生命体征范围文件 日志文件 病历文件 治疗意见文件 14年上 E1 巴士司机,2 机械师,3 会计,4 主管,5 库存管理系统 D 巴士列表文件 维修记录文件 部件清单 人事档案 14年下 1 客户 2 供应商 D 销售订单表 库存…...

关于render: h => h(App)的解释
当我们第一次安装完脚手架,打开 的时候,我相信,一定有小伙伴和我一样,看到main.js里面的render: h > h(App),感觉懵懵的。 因为,在刚开始接触vue的时候,我们这里是这样写的: 而使用了脚手…...

flask实现简易图书管理系统
项目结构 技术选型 flask 做后端, 提供数据和渲染html 暂时没有提供mysql, 后续会更新操作mysql和样式美化的版本 起一个flask服务 flask是python的一个web框架, 下面演示如何提供http接口, 并返回json数据 main.py # flask创建http接口 from flask import Flask, request, jso…...

2021 年全国大学生物联网设计竞赛(华为杯)全国总决赛获奖名单
由全国高等学校计算机教育研究会主办,上海交通大学承办,华为技术有限 公司协办,中国电信天翼物联、中国移动中移物联网、霍尼韦尔 Tridium、CSA 联盟、新大陆、德州仪器 (TI)、百度、机械工业出版社华章公司联合支持的 2021 全国大学生物联网…...

操作系统复习2.3.5-管程
引入管程 PV操作困难,容易书写出错,引入管程,作为一种高级同步机制 组成 局限于管程的共享数据结构说明对该数据结构进行操作的一组过程对局部于管程的共享数据结构设置初始值的语句管程有一个名字 基本特征 局限于管程的数据只能被局限…...

List Set Map Queue Deque 之间的区别是什么?
List Set Map Queue Deque 之间的区别是什么? 1. Java 集合框架有那些接口?2. List Set Map Queue Deque 之间的区别是什么? 1. Java 集合框架有那些接口? List、Set、Map、Queue、Deque 2. List Set Map Queue Deque 之间的区别…...

unity行为决策树实战详解
一、行为决策树的概念 行为决策树是一种用于游戏AI的决策模型,它将游戏AI的行为分解为一系列的决策节点,并通过节点之间的连接关系来描述游戏AI的行为逻辑。在行为决策树中,每个节点都代表一个行为或决策,例如移动、攻击、逃跑等…...

Spring学习记录
目录 bean的单例与多例 设置 工厂模式的三种形态 简单工厂模式 代码: 运行结果: 总结: 工厂模式 代码: 运行结果: 总结: 抽象工厂模式 代码: 运行结果: 总结: …...

模板方法-
定义:又叫模板模式,是指定义一个算法骨架,并允许子类为其中的一个或多个步骤提供实现。 适用场景: 1、一次性实现一个算法不变的部分,并将可变的行为留给子类来实现 2、各子类中公共的行为被提取出来并集中到一个公共的父类中,从而避免代码重复 优点…...

[Kubernetes] - RabbitMQ学习
1.消息队列 消息: 在应用间传送的数据队列,先进先出 1.2. 作用 好处:解耦, 容错,削峰坏处:降低系统可用性,系统复杂度提高,一致性问题; RabbitMQ组成部分:…...

swagger页面 doc.html出不来,swagger-ui/index.html能出来
swagger页面 doc.html出不来,swagger-ui/index.html能出来。前前后后折腾了很久,jar包冲突,jar包版本,添加路径啥的都弄了,就是出不来。 后来全局搜索“doc.html”页面发现能出来的项目能搜到这个页面: 定…...

IEEE802.3和IEEE802.11的分类(仅为分类)
IEEE802.3标准 IEEE802.3:10兆以太网 ●10Base-5 使用粗同轴电缆,最大网段长度为500m,基带传输方法; ●10Base-2 使用细同轴电缆,最大网段长度为185m,基带传输方法; ●10Base&am…...