限流是微服务架构下保障系统的主要手段之一。通常来说系统的吞吐量是可以提前预测的,当请求量超过预期的伐值时可以采取一些限制措施来保障系统的稳定运行,比如延迟处理、拒绝服务等。
// AtomicLong实现
try {
if (atomic.incrementAndGet() > limit) {
// 限流处理
}
// 处理请求
} finally {
atomic.decrementAndGet();
}
// Semaphore实现
boolean allowed = false;
try {
allowed = semaphore.tryAcquire();
if (!allowed) {
// 限流处理
}
// 处理请求
} finally {
semaphore.release(1);
}
算法实现上,可以使用一个BlockingQueue表示漏桶,请求进来时放入这个BlockingQueue中。另起一个线程以固定的速率从BlockingQueue中取出请求,再提交给业务线程池处理。漏桶算法有个弊端:无法应对短时间的突发流量。
算法实现上,可以用一个BlockingQueue表示桶,起一个线程以固定的速率向桶中添加令牌。请求到达时需要先从桶中获取令牌,否则拒绝请求或等待。此外,Google开源的guava包也提供了很完善的令牌算法。
Redis + Lua实现基于时间窗口的计算器限流
static String lua = "local key = KEYS[1] " // 限流key
+ "local limit = tonumber(ARGV[1]) "// 限流伐值
+ "local current = tonumber(redis.call('get', key) or '0') "
+ "if (current + 1) > limit then "
+ " return 0 "
+ "elseif current == 0 then "
+ " redis.call('incrby', key, '1') "
+ " redis.call('expire', key, '2') " // 设置2秒过期时间
+ " return 1 "
+ "else "
+ " redis.call('incrby', key, '1') "
+ " return 1 "
+ "end";
public boolean isLimit() {
Jedis jedis = null;
try {
jedis = pool.getResource();
// 每秒限流1000次
String key = "Limit.Key." + System.currentTimeMillis() / 1000;
String limit = "1000";
Object rlt = jedis.eval(lua, 1, key, limit);
return Long.valueOf(0) == rlt;
} finally {
if (null != jedis) {
jedis.close();
}
}
}
public class TokenBucket {
private final static int DEFAULT_SIZE = 100;
private final static int DEFAULT_BATCH = 1;
private final static int DEFAULT_FREQ = 1;
private final static TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
private AtomicInteger a = new AtomicInteger(0);
private final int size;
public TokenBucket() {
this(DEFAULT_SIZE, DEFAULT_BATCH, DEFAULT_FREQ, DEFAULT_TIME_UNIT);
}
public TokenBucket(int size, int batch, int freq, TimeUnit unit) {
this.size = size;
new Thread(new TokenProducer(batch, freq, unit)).start();
}
public boolean acquire() {
while (true) {
int current = a.get();
if (current == 0) {
return false;
}
if (a.compareAndSet(current, current - 1)) {
return true;
}
}
}
public void release(int n) {
while (true) {
int current = a.get();
int next = current + n;
if (next > size) {
next = size;
}
if (a.compareAndSet(current, next)) {
return;
}
}
}
public int current() {
return a.get();
}
class TokenProducer implements Runnable {
private int batch;
private int freq;
private TimeUnit unit;
public TokenProducer(int batch, int freq, TimeUnit unit) {
this.batch = batch;
this.freq = freq;
this.unit = unit;
}
@Override
public void run() {
while (true) {
try {
TokenBucket.this.release(this.batch);
try {
unit.sleep(freq);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
// ignore
}
}
}
}
}