Skip to content

并发请求

概念

  • 并发:并发是多个任务同时交替的执行(因为cpu执行指令的速度非常之快,它可以不必按顺序一段代码一段代码的执行,这样效率反而更加低下),这样看起来就是一起执行的,所以叫并发
  • 并行:可以理解为多个物理cpu或者有分布式系统,是真正的'同时'执行
  • 并发控制:意思是多个并发的任务,一旦有任务完成,就立刻开启下一个任务
  • 切片控制:将并发任务切片的分配出来,比如10个任务,切成2个片,每片有5个任务,当前一片的任务执行完毕,再开始下一个片的任务,这样明显效率没并发控制那么高

切片控制

假设现在有 20 个请求,如何 3 个为一组的进行并发请求,前 3 个请求完后再进行后 3 个的并发请求

typescript
async function sliceControl<T = any>(request: (() => Promise<T>)[], maxNum: number) {
  // 先分片
  const sections: (() => Promise<T>)[][] = []
  for (let i = 0, len = request.length; i < len; i += maxNum) {
    sections.push(request.slice(i, i + maxNum))
  }

  const result = []
  // 利用 for await 让现在切片完成,再开始下一个切片的请求
  for (const section of sections) {
    const sliceResult = await Promise.allSettled(section.map((i) => i()))
    result.push(...sliceResult)
  }

  return result
}

const url = new Array(20).fill('').map((_, i) => () => fetch(`http://127.0.0.1:3001/api/id/${i}`))
sliceControl(url, 3)

并发控制

  • 要求最大并发数 maxNum
  • 每当有一个请求返回,就留下一个空位,可以增加新的请求
  • 所有请求完成后,结果按照 urls 里面的顺序依次打出(发送请求的函数可以直接使用fetch即可)
typescript
async function concurrencyControl<T = any>(requests: (() => Promise<T>)[], maxNum: number) {
  // 维护一个 promise 队列,保证返回结果的顺序正确
  const promises = []
  // 当前的并发池,用 Set 结构方便删除
  const pool = new Set()
  // 开始并发执行所有的任务
  for (const request of requests) {
    // 开始执行前,先 await 判断当前的并发任务是否超过限制
    if (pool.size >= maxNum) {
      await Promise.race(pool).catch((e) => e)
    }
    const promise = request()
    // 请求结束后,从 pool 里面移除
    const cb = () => {
      pool.delete(promise)
    }
    promise.then(cb, cb)
    pool.add(promise)
    promises.push(promise)
  }

  return await Promise.allSettled(promises)
}

const url = new Array(20).fill('').map((_, i) => () => fetch(`http://127.0.0.1:3001/api/id/${i}`))
concurrencyControl<Response>(url, 3)

增加报错重试

typescript
/**
 * @param request 请求
 * @param count 允许重试次数
 */
function executeWithRetry<T>(request: () => Promise<T>, count: number) {
  return new Promise((resolve, reject) => {
    const exec = () => {
      request().then(resolve).catch((e) => {
        count--
        if (count < 0) {
          reject(e)
        } else {
          exec()
        }
      })
    }
    exec()
  })
}

async function concurrencyControl<T = any>(requests: (() => Promise<T>)[], maxNum: number, retry: number) {
  const promises = []
  const pool = new Set()
  for (const request of requests) {
    if (pool.size >= maxNum) {
      await Promise.race(pool).catch((e) => e)
    }
    const promise = executeWithRetry(request, retry)
    const cb = () => {
      pool.delete(promise)
    }
    promise.then(cb, cb)
    pool.add(promise)
    promises.push(promise)
  }

  return await Promise.allSettled(promises)
}