主题
并发请求
概念
- 并发:并发是多个任务同时交替的执行(因为cpu执行指令的速度非常之快,它可以不必按顺序一段代码一段代码的执行,这样效率反而更加低下),这样看起来就是一起执行的,所以叫并发
- 并行:可以理解为多个物理cpu或者有分布式系统,是真正的
'同时'
执行 - 并发控制:意思是多个并发的任务,一旦有任务完成,就立刻开启下一个任务
- 切片控制:将并发任务切片的分配出来,比如10个任务,切成2个片,每片有5个任务,当
前一片的任务
执行完毕,再开始下一个片
的任务,这样明显效率没并发控制那么高
了
切片控制
假设现在有 20 个请求,如何 3 个为一组的进行并发请求,前 3 个请求完后再进行后 3 个的并发请求
typescript
async function sliceControl<T = any>(request: (() => Promise<T>)[], maxNum: number) {
// 先分片
const sections: (() => Promise<T>)[][] = []
for (let i = 0, len = request.length; i < len; i += maxNum) {
sections.push(request.slice(i, i + maxNum))
}
const result = []
// 利用 for await 让现在切片完成,再开始下一个切片的请求
for (const section of sections) {
const sliceResult = await Promise.allSettled(section.map((i) => i()))
result.push(...sliceResult)
}
return result
}
const url = new Array(20).fill('').map((_, i) => () => fetch(`http://127.0.0.1:3001/api/id/${i}`))
sliceControl(url, 3)
并发控制
- 要求最大并发数 maxNum
- 每当有一个请求返回,就留下一个空位,可以增加新的请求
- 所有请求完成后,结果按照 urls 里面的顺序依次打出(发送请求的函数可以直接使用fetch即可)
typescript
async function concurrencyControl<T = any>(requests: (() => Promise<T>)[], maxNum: number) {
// 维护一个 promise 队列,保证返回结果的顺序正确
const promises = []
// 当前的并发池,用 Set 结构方便删除
const pool = new Set()
// 开始并发执行所有的任务
for (const request of requests) {
// 开始执行前,先 await 判断当前的并发任务是否超过限制
if (pool.size >= maxNum) {
await Promise.race(pool).catch((e) => e)
}
const promise = request()
// 请求结束后,从 pool 里面移除
const cb = () => {
pool.delete(promise)
}
promise.then(cb, cb)
pool.add(promise)
promises.push(promise)
}
return await Promise.allSettled(promises)
}
const url = new Array(20).fill('').map((_, i) => () => fetch(`http://127.0.0.1:3001/api/id/${i}`))
concurrencyControl<Response>(url, 3)
增加报错重试
typescript/** * @param request 请求 * @param count 允许重试次数 */ function executeWithRetry<T>(request: () => Promise<T>, count: number) { return new Promise((resolve, reject) => { const exec = () => { request().then(resolve).catch((e) => { count-- if (count < 0) { reject(e) } else { exec() } }) } exec() }) } async function concurrencyControl<T = any>(requests: (() => Promise<T>)[], maxNum: number, retry: number) { const promises = [] const pool = new Set() for (const request of requests) { if (pool.size >= maxNum) { await Promise.race(pool).catch((e) => e) } const promise = executeWithRetry(request, retry) const cb = () => { pool.delete(promise) } promise.then(cb, cb) pool.add(promise) promises.push(promise) } return await Promise.allSettled(promises) }