请求合并简述

在高并发场景下,往往会出现大量请求同时访问同一个接口的情况。 当然,可以通过缓存、消息队列等机制来缓解被请求方压力。 不过缓存是通过将结果存在访问速度更快的内存中,而消息队列则以降低并发量来减轻压力。 除此之外还有一种策略可以缓解这种情况:请求合并。

业界已经有的请求合并方案: Hystrix

PS:Redis也有pipeline技术来合并请求

本文主要是通过一个简单地例子,来完成一个请求合并的demo,帮助理解这一概念。

没有合并的请求

在合并之前,先看一个简单地无合并的接口,以查询数据库中的记录为例:

1
2
3
4
5
6
7
// 数据库中fip表的接口
private LocalFipMapper localFipMapper;

// 根据fip的id查询fip记录
public ResBizFip queryFipById(String localId){
    return localFipMapper.getFipById(localId);
}

每来一个请求就访问一次数据库,请求量大的情况下数据库将不堪重负。

请求合并

构建一个请求类

1
2
3
4
5
6
7
@Data
public class BatchRequest {
    // fip 的id
    private String localId;
    // 存放结果
    private CompletableFuture<LocalFipEntity> completedFuture;
}

请求合并服务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Service
public class CollapsingService {
    // 阻塞队列
    private BlockingQueue<BatchRequest> requestQueue = new LinkedBlockingQueue<>();
    @Autowired
    private LocalFipMapper localFipMapper;
    private volatile int count;

    @PostConstruct
    public void init() {
        // 定时任务线程池
        ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1);
        
        scheduledExecutor.scheduleAtFixedRate(() -> {
            List<BatchRequest> requests = new ArrayList<>();
            List<String> localIds = new ArrayList<>();
            while ((request = requestQueue.poll()) != null) {
                requests.add(request);
                localIds.add(request.getLocalId());
            }
            if (!localIds.isEmpty()) {
                try {
                    List<LocalFipEntity> fips = localFipMapper.listFip(localIds);
                    Map<String, LocalFipEntity> fipMap = fips.stream()
                            .collect(Collectors.toMap(LocalFipEntity::getId, f -> f));
                    // 将查询结果放入CompletedFuture
                    requests.forEach(f ->
                            f.getCompletedFuture().complete(fipMap.get(f.getLocalId()))
                    );
                } catch (Exception e) {
                    e.printStackTrace();
                    requests.forEach(f ->
                            f.getCompletedFuture().obtrudeException(e)
                    );
                }
                count += localIds.size();
                System.out.println("Batch Num: " + localIds.size()+". Total Num: " + count);
            }
            // 每隔 10ms 执行一次
        }, 0, 10, TimeUnit.MILLISECONDS);
    }

    public LocalFipEntity getFipById(String localId) {
        BatchRequest request = new BatchRequest();
        request.setLocalId(localId);
        request.setCompletedFuture(new CompletableFuture<>());
        requestQueue.offer(request);
        try {
            LocalFipEntity fip = request.getCompletedFuture().get(10, TimeUnit.SECONDS);
            return fip;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }
}

测试

测试发送1000个请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Test
public void batchQuery() throws InterruptedException {
    String[] idArray = ids.split(",");
    Random random = new Random();
    int len = idArray.length;
    int queryTime = 1000;
    CountDownLatch queryLatch = new CountDownLatch(queryTime);
    CountDownLatch mainLatch = new CountDownLatch(1);

    Executor executor = new ThreadPoolExecutor(
            32,
            100,
            60,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(queryTime));
    long start = System.currentTimeMillis();
    for (int i = 0; i < queryTime; i++) {
        executor.execute(() -> {
                    try {
                        mainLatch.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    collapsingService.getFipById(idArray[random.nextInt(len)]);
                    queryLatch.countDown();
                }
        );
    }
    mainLatch.countDown();
    queryLatch.await();
    System.out.println((System.currentTimeMillis() - start) + "ms.");
}

测试结果:

1
2
3
4
5
6
7
Batch Num: 32. Total Num: 32
Batch Num: 16. Total Num: 48
Batch Num: 16. Total Num: 64
...
Batch Num: 32. Total Num: 976
Batch Num: 24. Total Num: 1000
450ms.

关于时间

请求合并除了能降低被请求方的访问压力,在一些情况下还能提升效率。 在测试中,通过调整线程池的核心线程数来控制同时请求数,得出一些数据,仅供参考:

线程数:32 时间:400~600ms

线程数:100 时间:150~200ms

但是对于某一个请求来说,由于批量处理的定时任务周期执行,其响应时间会变成(周期+原响应时间)