forked from doocs/advanced-java
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
docs(from-readers): add an article about current limiting
- Loading branch information
Showing
2 changed files
with
363 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,362 @@ | ||
# 如何限流?在工作中是怎么做的?说一下具体的实现? | ||
- Author: [HuiFer](https://github.com/huifer) | ||
- Description: 该文简单介绍限流相关技术以及实现 | ||
|
||
## 什么是限流 | ||
> 限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。 | ||
## 限流方法 | ||
|
||
### 计数器 | ||
|
||
#### 实现方式 | ||
- 控制单位时间内的请求数量 | ||
```java | ||
|
||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
public class Counter { | ||
/** | ||
* 最大访问数量 | ||
*/ | ||
private final int limit = 10; | ||
/** | ||
* 访问时间差 | ||
*/ | ||
private final long timeout = 1000; | ||
/** | ||
* 请求时间 | ||
*/ | ||
private long time; | ||
/** | ||
* 当前计数器 | ||
*/ | ||
private AtomicInteger reqCount = new AtomicInteger(0); | ||
|
||
public boolean limit() { | ||
long now = System.currentTimeMillis(); | ||
if (now < time + timeout) { | ||
// 单位时间内 | ||
reqCount.addAndGet(1); | ||
return reqCount.get() <= limit; | ||
} else { | ||
// 超出单位时间 | ||
time = now; | ||
reqCount = new AtomicInteger(0); | ||
return true; | ||
} | ||
} | ||
} | ||
|
||
``` | ||
|
||
- 劣势 | ||
- 假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 `n-1` (n为限流请求数量),在下一分钟的 00:01 发送n个请求,这样在2秒钟内请求到达了 `2n - 1` 个. | ||
- 设每分钟请求数量为60个,每秒可以处理1个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时2秒钟有120个请求(每秒60个请求),远远大于了每秒钟处理数量的阈值 | ||
|
||
|
||
### 滑动窗口 | ||
#### 实现方式 | ||
- 滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位 | ||
- 把一分钟分成若干等分(6份,每份10秒), 在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加1.当等分数量越大限流统计就越详细 | ||
|
||
```java | ||
package com.example.demo1.service; | ||
|
||
import java.util.Iterator; | ||
import java.util.Random; | ||
import java.util.concurrent.ConcurrentLinkedQueue; | ||
import java.util.stream.IntStream; | ||
|
||
public class TimeWindow { | ||
private ConcurrentLinkedQueue<Long> queue = new ConcurrentLinkedQueue<Long>(); | ||
|
||
/** | ||
* 间隔秒数 | ||
*/ | ||
private int seconds; | ||
|
||
/** | ||
* 最大限流 | ||
*/ | ||
private int max; | ||
|
||
public TimeWindow(int max, int seconds) { | ||
this.seconds = seconds; | ||
this.max = max; | ||
|
||
/** | ||
* 永续线程执行清理queue 任务 | ||
*/ | ||
new Thread(() -> { | ||
while (true) { | ||
try { | ||
// 等待 间隔秒数-1 执行清理操作 | ||
Thread.sleep((seconds - 1) * 1000L); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); | ||
} | ||
clean(); | ||
} | ||
}).start(); | ||
|
||
} | ||
|
||
public static void main(String[] args) throws Exception { | ||
|
||
final TimeWindow timeWindow = new TimeWindow(10, 1); | ||
|
||
// 测试3个线程 | ||
IntStream.range(0, 3).forEach((i) -> { | ||
new Thread(() -> { | ||
|
||
while (true) { | ||
|
||
try { | ||
Thread.sleep(new Random().nextInt(20) * 100); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); | ||
} | ||
timeWindow.take(); | ||
} | ||
|
||
}).start(); | ||
|
||
}); | ||
|
||
} | ||
|
||
/** | ||
* 获取令牌,并且添加时间 | ||
*/ | ||
public void take() { | ||
|
||
|
||
long start = System.currentTimeMillis(); | ||
try { | ||
|
||
|
||
int size = sizeOfValid(); | ||
if (size > max) { | ||
System.err.println("超限"); | ||
|
||
} | ||
synchronized (queue) { | ||
if (sizeOfValid() > max) { | ||
System.err.println("超限"); | ||
System.err.println("queue中有 " + queue.size() + " 最大数量 " + max); | ||
} | ||
this.queue.offer(System.currentTimeMillis()); | ||
} | ||
System.out.println("queue中有 " + queue.size() + " 最大数量 " + max); | ||
|
||
} | ||
|
||
} | ||
|
||
|
||
public int sizeOfValid() { | ||
Iterator<Long> it = queue.iterator(); | ||
Long ms = System.currentTimeMillis() - seconds * 1000; | ||
int count = 0; | ||
while (it.hasNext()) { | ||
long t = it.next(); | ||
if (t > ms) { | ||
// 在当前的统计时间范围内 | ||
count++; | ||
} | ||
} | ||
|
||
return count; | ||
} | ||
|
||
|
||
/** | ||
* 清理过期的时间 | ||
*/ | ||
public void clean() { | ||
Long c = System.currentTimeMillis() - seconds * 1000; | ||
|
||
Long tl = null; | ||
while ((tl = queue.peek()) != null && tl < c) { | ||
System.out.println("清理数据"); | ||
queue.poll(); | ||
} | ||
} | ||
|
||
} | ||
|
||
``` | ||
|
||
### Leaky Bucket 漏桶 | ||
#### 实现方式 | ||
- 规定固定容量的桶,有水进入,有水流出. 对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度. | ||
```java | ||
public class LeakBucket { | ||
/** | ||
* 时间 | ||
*/ | ||
private long time; | ||
/** | ||
* 总量 | ||
*/ | ||
private Double total; | ||
/** | ||
* 水流出去的速度 | ||
*/ | ||
private Double rate; | ||
/** | ||
* 当前总量 | ||
*/ | ||
private Double nowSize; | ||
|
||
|
||
public boolean limit() { | ||
long now = System.currentTimeMillis(); | ||
nowSize = Math.max(0, (nowSize - (now - time) * rate)); | ||
time = now; | ||
if ((nowSize + 1) < total) { | ||
nowSize++; | ||
return true; | ||
} else { | ||
return false; | ||
} | ||
|
||
} | ||
} | ||
``` | ||
|
||
### Token Bucket 令牌桶 | ||
#### 实现方式 | ||
- 规定固定容量的桶,token 以固定速度往桶内填充,当桶满时 token 不会被继续放入,每过来一个请求把 token 从桶中移除,如果桶中没有 token 不能请求 | ||
|
||
```java | ||
public class TokenBucket { | ||
/** | ||
* 时间 | ||
*/ | ||
private long time; | ||
/** | ||
* 总量 | ||
*/ | ||
private Double total; | ||
/** | ||
* token 放入速度 | ||
*/ | ||
private Double rate; | ||
/** | ||
* 当前总量 | ||
*/ | ||
private Double nowSize; | ||
|
||
|
||
public boolean limit() { | ||
long now = System.currentTimeMillis(); | ||
nowSize = Math.min(total, nowSize + (now - time) * rate); | ||
time = now; | ||
if (nowSize < 1) { | ||
// 桶里没有token | ||
return false; | ||
} else { | ||
// 存在token | ||
nowSize -= 1; | ||
return true; | ||
} | ||
} | ||
|
||
|
||
|
||
} | ||
``` | ||
|
||
## 工作中的使用 | ||
### spring cloud gateway | ||
- spring cloud gateway 默认使用redis进行限流,笔者一般只是修改修改参数属于拿来即用.并没有去从头实现上述那些算法. | ||
```xml | ||
<dependency> | ||
<groupId>org.springframework.cloud</groupId> | ||
<artifactId>spring-cloud-starter-gateway</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-starter-data-redis-reactive</artifactId> | ||
</dependency> | ||
``` | ||
|
||
```yaml | ||
spring: | ||
cloud: | ||
gateway: | ||
routes: | ||
- id: requestratelimiter_route | ||
uri: lb://pigx-upms | ||
order: 10000 | ||
predicates: | ||
- Path=/admin/** | ||
filters: | ||
- name: RequestRateLimiter | ||
args: | ||
redis-rate-limiter.replenishRate: 1 # 令牌桶的容积 | ||
redis-rate-limiter.burstCapacity: 3 # 流速 每秒 | ||
key-resolver: "#{@remoteAddrKeyResolver}" #SPEL表达式去的对应的bean | ||
- StripPrefix=1 | ||
``` | ||
```java | ||
@Bean | ||
KeyResolver remoteAddrKeyResolver() { | ||
return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); | ||
} | ||
``` | ||
|
||
|
||
### sentinel | ||
- 通过配置来控制每个url的流量 | ||
```xml | ||
<dependency> | ||
<groupId>com.alibaba.cloud</groupId> | ||
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> | ||
</dependency> | ||
``` | ||
```yaml | ||
spring: | ||
cloud: | ||
nacos: | ||
discovery: | ||
server-addr: localhost:8848 | ||
sentinel: | ||
transport: | ||
dashboard: localhost:8080 | ||
port: 8720 | ||
datasource: | ||
ds: | ||
nacos: | ||
server-addr: localhost:8848 | ||
dataId: spring-cloud-sentinel-nacos | ||
groupId: DEFAULT_GROUP | ||
rule-type: flow | ||
namespace: xxxxxxxx | ||
``` | ||
- 配置内容在nacos上进行编辑 | ||
```json | ||
[ | ||
{ | ||
"resource": "/hello", | ||
"limitApp": "default", | ||
"grade": 1, | ||
"count": 1, | ||
"strategy": 0, | ||
"controlBehavior": 0, | ||
"clusterMode": false | ||
} | ||
] | ||
``` | ||
- resource:资源名,即限流规则的作用对象。 | ||
- limitApp:流控针对的调用来源,若为 default 则不区分调用来源。 | ||
- grade:限流阈值类型,QPS 或线程数模式,0代表根据并发数量来限流,1代表根据QPS来进行流量控制。 | ||
- count:限流阈值 | ||
- strategy:判断的根据是资源自身,还是根据其它关联资源 (refResource),还是根据链路入口 | ||
- controlBehavior:流控效果(直接拒绝 / 排队等待 / 慢启动模式) | ||
- clusterMode:是否为集群模式 | ||
### 总结 | ||
> sentinel和spring cloud gateway两个框架都是很好的限流框架,但是在我使用中还没有将[spring-cloud-alibaba](https://github.com/alibaba/spring-cloud-alibaba)接入到项目中进行使用,所以我会选择**spring cloud gateway**,当接入完整的或者接入Nacos项目使用setinel会有更加好的体验. |