Skip to Content
JavaScript대규모 비동기 작업 안전하게 처리하기

대규모 비동기 작업 안전하게 처리하기

개요

자바스크립트로 프론트, 백엔드(서버리스), 자동화 스크립트 등 여러가지 목적으로 작업하다 보면 가끔 보면 외부 API 대량 호출, 트랜잭션 처리 등 수십 개 이상의 비동기 작업을 처리해야 하는 상황이 종종 발생합니다.

이 때 모든 비동기 작업을 동시에 실행하면 네트워크 병목, API의 Rate Limit 초과 (429), 처리되지 않은 에러가 발생해 전체 작업이 중단되는 등 예상치 못한 문제가 발생하곤 합니다.

이러한 문제를 피하기 위해, 대규모 비동기 작업을 어떻게 제어하고 안정적으로 실행할 수 있는지에 대해 정리해보았습니다.

병렬 vs 동시성

이 글에서는 네트워크, I/O와 같은 작업을 중심으로 Promise를 사용해 대량의 비동기를 처리하는 방법을 설명합니다.

비슷한 맥락에서 병렬성(parallelism)과 동시성(concurrency)이 자주 혼용되곤 하는데요,

  • 병렬성: 여러 작업이 실제로 동시에 실행되는 구조입니다. CPU 연산이 많은 이미지 처리, 대규모 데이터 변환, 압축 등의 작업은 병렬 처리가 필요하며, JavaScript 환경에서는 웹 워커(브라우저)나 워커 스레드(Node.js)를 통해 구현할 수 있습니다.
  • 동시성: 하나의 실행 흐름에서 여러 작업을 겹쳐 처리하는 방식입니다. 비동기 네트워크 요청이나 파일 I/O와 같은 작업은 실제로는 단일 스레드 내에서 동시적으로 처리됩니다.

아래 내용에서는 동시성을 다루는 방법을 설명합니다. 만약 병렬성이 필요한 작업이라면 웹 워커나, 워커 스레드를 활용하실 수 있습니다.

Promise 실행 제어하기

대량의 비동기 작업을 처리할 때 Promise를 어떤 방식으로 실행하느냐에 따라 결과가 크게 달라질 수 있습니다.

비동기 작업을 순차적으로 처리할 수도 있고, 한꺼번에 모두 실행할 수도 있으며, 일정 개수씩 나눠 실행하거나 동시에 실행되는 작업 수를 제한하는 방식도 사용할 수 있습니다.

제약 조건이나 작업의 규모 등 상황에 따라 적합한 조건이 달라지기 때문에 알맞은 방식을 선택해 사용하면 됩니다.

Promise 순차적으로 실행하기

순차 실행 방식 시각화

가장 단순한 방식은 작업을 하나씩 순서대로 실행하는 것입니다. 동시에 여러 작업을 처리하지 않고, 항상 하나가 끝난 뒤 다음 작업을 시작합니다. 작업 간에 순서가 중요하거나, 한번에 한 작업만 실행되도록 하거나, 외부 시스템에 과도한 부하를 주지 않도록 해야 할 때 사용합니다.

Array.forEachArray.map 같은 고차 함수는 비동기 함수와 함께 사용할 경우 순차 실행을 보장하지 않습니다. 아래는 NodeList.prototype.forEach에 대한 MDN의 폴리필 코드인데요, 내부적으로 await를 지원하지 않기 때문에 순차적으로 실행이 불가능합니다.

if (window.NodeList && !NodeList.prototype.forEach) { NodeList.prototype.forEach = function (callback, thisArg) { thisArg = thisArg || window; for (var i = 0; i < this.length; i++) { callback.call(thisArg, this[i], i, this); } }; }

순차 실행이 필요한 경우에는 for 루프 또는 for await...of 구문을 사용해야 합니다.

for (const task of tasks) { await task(); }

다만 비동기 방식의 이점을 살릴 수 없고, 작업 간에 순서를 보장할 필요가 없고 외부 리소스에 부담을 줄 이유도 없다면 굳이 순차적으로 불필요한 시간을 대기하는 것 보다는 여러 작업을 동시에 실행하는게 좋습니다.

모든 Promise를 동시에 실행하기

모든 작업 동시 실행

Promise.all을 사용하면 모든 비동기 작업을 동시에 시작할 수 있습니다. 작업 수가 너무 많지 않고, 외부 시스템에 과도한 부하를 주지 않는다면 가장 일반적으로 사용되는 방식입니다.

await Promise.all(tasks.map((task) => task()));

모든 작업이 동시에 실행되며, 하나라도 실패하면 전체가 reject됩니다. 따라서 실패 가능성이 낮거나, 비동기 함수에 재시도 처리 등을 해둔경, 전체 작업의 성공이 모두 보장되어야 하는 경우에 적합합니다.

작업 수가 너무 많거나 외부 API의 처리 능력이 제한적인 상황에서는 네트워크 병목이나 Rate Limit 초과로 이어질 수 있어 작업의 수를 나눠야 할 수도 있습니다.

Chunk 단위 동시 실행

chunk promise all

작업을 일정 개수의 묶음(Chunk)으로 나누고, 각 묶음을 병렬로 실행한 뒤 다음 묶음으로 넘어가는 방식입니다. 예를 들어 한 번에 10개씩 실행하고, 모두 완료되면 다음 10개를 처리하는 식입니다.

const chunk = (input, size) => { return input.reduce((arr, item, idx) => { return idx % size === 0 ? [...arr, [item]] : [...arr.slice(0, -1), [...arr.slice(-1)[0], item]]; }, []); }; for (const group of chunk(tasks, 10)) { await Promise.all(group.map((task) => task())); }

모든 작업을 동시에 실행시키기는 너무 많아서 나눠서 처리해야 할 때 유용한 절충안입니다.

Promise Pool

promise pool

동시에 실행할 수 있는 작업 수를 고정하고, 그 수만큼만 비동기 작업을 실행하도록 처리하는 방식입니다. 일정한 수의 작업만 동시에 실행되기 때문에, 외부 API나 시스템 자원에 과도한 부하를 주지 않고 안정적인 실행 흐름을 유지할 수 있습니다.

앞서 설명한 Chunk 방식은 일정한 개수의 작업을 동시에 실행한 뒤, 다음 묶음을 기다리는 구조입니다. 반면 Promise Pool은 실행 중인 작업이 끝나는 즉시 다음 작업을 바로 투입하는 구조로, 실행 편차가 큰 작업이 섞여 있을 때 전체 처리 시간을 더 줄일 수 있는 장점이 있습니다.

chunk 방식과 pool 비교

직접 구현한다면 아래처럼 N개의 함수를 Promise.all로 동시에 실행되게 하면서, 각 함수에서는 처리중인 작업이 완료되면 작업 목록에 있는 작업을 가져와 처리하는 식으로 구현할 수 있습니다.

type Task<T> = () => Promise<T>; const promisePool = async <T>( tasks: Task<T>[], concurrency: number ): Promise<T[]> => { const results: T[] = new Array(tasks.length); const indexedTasks = tasks.map((task, index) => ({ task, index })); const taskQueue = [...indexedTasks]; const worker = async () => { while (taskQueue.length > 0) { const item = taskQueue.shift(); if (!item) break; const { task, index } = item; const result = await task(); results[index] = result; } }; await Promise.all(Array.from({ length: concurrency }, () => worker())); return results; }; async function main() { const consoleTime = () => { const now = new Date(); const hh = String(now.getHours()).padStart(2, "0"); const mm = String(now.getMinutes()).padStart(2, "0"); const ss = String(now.getSeconds()).padStart(2, "0"); return `${hh}:${mm}:${ss}`; }; const sleep = (s: number) => new Promise<void>((resolve) => setTimeout(resolve, s * 1000)); const createApiTask = (payload: number): (() => Promise<number>) => { return async () => { console.log(`${consoleTime()} ${payload}초 소요되는 작업 시작`); await sleep(payload); console.log(`${consoleTime()} ${payload}초 소요되는 작업 완료`); return payload; }; }; const inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; const tasks = inputs.map(createApiTask); const result = await promisePool(tasks, 3); console.log("모든 작업 완료:", result); } main();

이미 구현된 라이브러리로는 @supercharge/promise-pool을 사용할 수 있습니다. 빌더 패턴으로 동시에 실행 가능한 작업의 수, 에러 핸들링 등 더 자세한 처리가 가능합니다.

import { PromisePool } from "@supercharge/promise-pool"; const { results, errors } = await PromisePool.withConcurrency(3) .for(inputs) .process((payload) => createApiTask(payload));

구현된 코드를 읽어보면 작업을 순회하면서, 한번에 최대 작업 가능한 숫자만큼만 작업이 실행될 수 있게 조절하면서 비동기 작업을 실행시켜줍니다.

// http://github.com/supercharge/promise-pool/blob/main/src/promise-pool-executor.ts#L397 async process (): Promise<ReturnValue<T, R>> { let index = 0 // 입력으로 받은 items(Iterable)를 순차적으로 비동기 반복 for await (const item of this.items()) { // 중간에 stop() 호출된 경우 반복 중단 if (this.isStopped()) { break } // 순서 보존 옵션이 활성화된 경우, 결과 배열에 'notRun' 심볼로 자리 확보 if (this.shouldUseCorrespondingResults()) { this.results()[index] = PromisePool.notRun } // 비동기 작업 실행: // - createTaskFor로 Promise 생성 // - then: 결과 저장 및 작업 제거 // - catch: 에러 처리 // - finally: 처리 완료 항목 기록 및 콜백 호출 // 실행되는 작업 목록(this.tasks)에 등록 this.startProcessing(item, index) index += 1 // 실행 슬롯 확보 대기: // 현재 실행 중인 작업 수가 설정된 concurrency 이상이면, Promise.race(this.tasks())로 하나가 끝날 때까지 대기 await this.waitForProcessingSlot() } // 모든 입력 아이템 반복이 끝나면 남아 있는 실행 중 작업들이 완료될 때까지 기다리고, 최종 결과와 에러 리스트를 반환 return await this.drained() }

Promise Queue

promise queue

Promise 기반의 작업을 메시지 큐처럼 큐에 쌓아두고, 설정한 최대 동시 실행 수만큼만 순차적으로 실행하는 방식입니다. 작업 요청이 들어오는 시점이 제각각이거나, 동시에 실행되는 작업 수를 제한해야 하는 경우, 또는 작업별 우선순위를 고려해 높은 우선순위의 작업부터 실행해야 할 때 사용할 수 있습니다.

개인적으로는 토이 프로젝트에서 외부에서 요청이 들어올 때 + 특정 시간마다 작업을 실행해야 하는 케이스를 구현하기 구현하기 위해 사용했습니다. 브라우저를 사용하는 작업이라 충돌이나 메모리 이슈를 방지하기 위해, 기존에 실행 중인 작업이 있다면 그 작업이 끝난 후에 다음 작업이 실행되도록 처리하기 위해 큐에 넣어두고 순차적으로 실행시키도록 구현했습니다.

작업을 큐에 쌓아두고 우선순위대로 처리하거나, 한번에 N개씩 작업을 실행시키는 경우 유용하게 사용할 수 있을 것 같습니다.

실제 프로덕션 환경에서 비동기 처리를 제어한다면 bullmq 같은 메시지 큐 기반 아키텍처로 옮겨가는 것이 일반적이지만, 그보다는 소규모 서버리스 환경이나 스크립트 작업에서 유용하게 활용할 수 있는 방식입니다.

구현된 라이브러리로는 p-queue가 있습니다. 아래처럼 concurrency로 최대 실행 가능한 작업 수를 설정하고 프로미스 작업을 .add 메소드로 추가할 수 있습니다.

import PQueue from "p-queue"; const queue = new PQueue({ concurrency: 3 }); // 한번에 3개의 작업만 실행 const inputs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; const tasks = inputs.map((n) => queue.add(createApiTask(n))); const result = await Promise.all(tasks);

작업 추가 시 priority를 설정해 우선 순위가 높은 작업을 먼저 실행시킬 수도 있습니다.

속도 제한 제어하기

Queue 방식에서는 동시에 실행되는 작업 수(concurrency)를 제한할 수 있지만, 앞서 개요에서 언급한 것처럼 외부 API의 Rate Limit(예: 1분에 N건, 1시간에 M건) 같은 속도 제한 정책까지 정확히 제어하기에는 부족합니다.

이런 케이스를 대응하는 경우에는 호출 간 시간 간격, 분당/시간당 요청 수 를 제어할 수 있어야 합니다. 대표적으로 bottleneck 같은 라이브러리를 사용하면, 호출 속도, 간격, 동시 실행 수를 모두 세밀하게 조절할 수 있습니다.

아래 예시 코드 10초에 최대 2건까지만 실행하고, 동시에 최대 2건까지 실행할 수 있도록 설정한 코드입니다. 실행 결과를 보면 43초에 최초 요청을 2건 넣고, 2건의 작업이 끝나도 새로운 요청을 보낼 수 있을 때 까지 대기 후, 2건의 토큰이 리필된 10초 후에 추가적인 요청을 보내는 것을 볼 수 있습니다.

import Bottleneck from "bottleneck"; const limiter = new Bottleneck({ maxConcurrent: 2, // 동시에 최대 2개까지 실행 reservoir: 2, // 10초 동안 최대 2개의 작업 허용 reservoirRefreshAmount: 2, // 리필 주기만큼 기다리고 리필할 개수 reservoirRefreshInterval: 10_000, // 리필 주기 (10초) }); async function main() { const inputs = [1, 2, 3, 4, 5, 6]; const promises = inputs.map((n) => limiter.schedule(createTask(n))); const results = await Promise.all(promises); console.log("모든 작업 완료:", results); } // 실행 결과 // [오후 9:36:43] 1초 작업 시작 // [오후 9:36:43] 2초 작업 시작 // [오후 9:36:44] 1초 작업 완료 // [오후 9:36:45] 2초 작업 완료 // [오후 9:36:53] 3초 작업 시작 // 10초가 지나서 토큰 2개 리필, 새로운 요청 보내기 // [오후 9:36:53] 4초 작업 시작 // [오후 9:36:56] 3초 작업 완료 // [오후 9:36:57] 4초 작업 완료 // [오후 9:37:03] 5초 작업 시작 // 10초가 지나서 토큰 2개 리필, 새로운 요청 보내기 // [오후 9:37:03] 6초 작업 시작 // [오후 9:37:08] 5초 작업 완료 // [오후 9:37:09] 6초 작업 완료

에러 핸들링 하기

all sellted

Promise.all은 언급한것 처럼 전체 작업 중 하나라도 실패 시 전체가 reject 되기 때문에, 실패한 작업이 있어도 전체를 중단시키지 않고 결과를 모두 수집하고 싶다면 Promise.allSettled를 사용해야 합니다.

interface PromiseFulfilledResult<T> { status: "fulfilled"; value: T; } interface PromiseRejectedResult { status: "rejected"; reason: any; } type PromiseSettledResult<T> = PromiseFulfilledResult<T> | PromiseRejectedResult; allSettled<T>(values: Iterable<T | PromiseLike<T>>): Promise<PromiseSettledResult<Awaited<T>>[]>;

allSettled의 타입을 보면 성공한 요청은 status가 fulfilled, 실패한 작업은 rejected로 구분해주고 있습니다.

const tasks = [1, 2, 3, 4].map((i) => i % 2 === 0 ? Promise.resolve(i) : Promise.reject(new Error(`${i} failed`)) ); const results = await Promise.allSettled(tasks); for (const result of results) { if (result.status === "fulfilled") { console.log("성공:", result.value); } else { console.error("실패:", result.reason); } } // 실패: Error: 1 failed // at ... // 성공: 2 // 실패: Error: 3 failed // at ... // 성공: 4

재시도 처리하기

모든 에러가 재시도로 해결되는건 아니지만, 일시적인 네트워크 오류나 서버 과부하 등의 이유로 API 호출이나 비동기 작업이 실패하는 경우, 동일한 작업을 몇 차례 반복 시도하면 성공할 수 도 있습니다.

이런 케이스에서는 비동기 함수에 재시도(retry) 로직을 래핑해서 간결한 코드로 안정성을 높일 수 있습니다.

하지만 고정된 시간마다 비동기 작업을 실행하는건 외부 시스템에 일정한 압력을 계속 가하게 되어, 오히려 상태를 악화시키거나 이미 서버가 다운되어 큰 의미가 없을 수 있습니다.

이럴 때는 재시도 횟수에 따라 대기 시간을 기하급수적으로 늘려서 점점 요청 빈도를 줄이는 지수 백오프(Exponential Backoff) 전략을 사용할 수 있습니다.

Tanstack Query나 SWR 등 프론트엔드 캐시 관리 라이브러리를 사용하셨다면 익숙하셨을 것 같은데요, 구현을 위해서는 딜레이 되는 시간 * multiplier ^ 재시도 수 를 계산해서 처리할 수 있습니다.

await new Promise((res) => setTimeout(res, delayMs * 2 ** i));

이미지 출처: Understanding Exponential Retries: A Key Strategy in Resilient Systems

이 외 딜레이에 무작위성을 추가하여 여러 클라이언트에서 동시에 재시도 하는 것을 방지하기 위한 Jitter 방법도 있으니 상황에 따라 사용하면 될 것 같습니다.

  • Full Jitter delay = random(0, baseDelay * 2^i)

  • Equal Jitter (AWS) delay = (baseDelay * 2^i) / 2 + random(0, (baseDelay * 2^i) / 2)

  • Decorrelated Jitter delay = min(maxDelay, random(baseDelay, previousDelay * 3))

타임 아웃 처리하기

특정 작업에서 오류 등으로 응답이 장기간 오지 않을 경우, 전체 시스템이 장시간 대기하게 될 수도 있습니다. 이런 케이스를 방지하기 위해 특정 시간을 넘길 경우 타임아웃 에러를 추가할 수 있습니다.

타임아웃은 보통 Promise.race를 사용해 구현합니다. 여러 프로미스 중 가장 먼저 완료된 프로미스를 반환하는 특성을 사용해서 N초 뒤 에러를 반환하는 프로미스와 작업을 Promise.race로 실행시키는 래핑 함수를 만들어 타임아웃을 구현할 수 있습니다.

const withTimeout = <T>(promise: Promise<T>, timeoutMs: number): Promise<T> => { const timeout = new Promise<never>((_, reject) => setTimeout(() => reject(new Error("Timeout exceeded")), timeoutMs) ); return Promise.race([promise, timeout]); };
Last updated on