[functional JS ES6+] 비동기/동시성 프로그래밍 2 : 지연평가와 Promise의 결합, 병렬적 평가

작성:    

업데이트:

카테고리:

태그: , ,

본 포스트는 인프런의 함수형 프로그래밍과 JavaScript ES6+ 강의(링크)를 듣고 정리한 내용입니다.


지연평가 + Promise

filter, take : Kleisli Composition

go([1, 2, 3, 4, 5, 6],
  L.map(a => Promise.resolve(a * a)),
  L.filter(a => a % 2),
  L.map(a => a * a),
  take(2),
  log);

// Promise reject에 같이 넣어보낼 symbol
const nop = Symbol('nop');

// L.filter의 Promise 대응
L.filter = curry(function *(f, iter) {
  for (const a of iter) {
    const b = go1(a, f)
    if (b instanceof Promise) yield b.then(b => b ? a : Promise.reject(nop));
    else if (b) yield a;
  }
})

// take의 Promise 대응
const take = curry((l, iter) => {
  let res = [];
  iter = iter[Symbol.iterator]();
  return function recur() {
    let cur;
    while(!(cur = iter.next()).done) {
      const a = cur.value;
      if (a instanceof Promise) {
        return a
          .then(a => (res.push(a), res).length == l ? res : recur())
          .catch(e => e == nop ? recur() : Promise.reject(e));
      }
      res.push(a);
      if (res.length == l) return res;
    }
    return res;
  } ();
})
  • 재귀적 유명함수를 return에 사용해 길이가 l이 될 때까지 res 배열에 a를 푸시
  • 그런데 만약 Promise의 인스턴스여서 return된 a가 reject라면 내용을 확인
  • 만약 nop이면 위의 filter에서 이후를 더 진행해야 한다는 의미의 reject이므로 recur()을 재호출
  • nop이 아니라면 그 외의 상황으로 오류가 난 것이므로 Promise.reject(e)를 그대로 반환하여 이후 함수 진행 중단
  • 함수열 중간에 Promise 객체가 reject가 반환되면 이후의 .then 함수열은 실행하지 않고 바로 .catch문으로 진입해 코드 실행


reduce에서 nop 지원

// 기존 코드
const reduce = curry((f, acc, iter) => {
  if (!iter) {
    iter = acc[Symbol.iterator]();
    acc = iter.next().value;
  } else {
    iter = iter[Symbol.iterator]();
  }
  return go1(acc, function recur(acc) {
    let cur;
    while (!(cur = iter.next()).done) {
      const a = cur.value;
      acc = f(acc, a);
      if (acc instanceof Promise) return acc.then(recur);
    }
    return acc;
  });
});
// Promise 대응 reduce

const reduceF = (acc, a, f) => {
  a instanceof Promise ? 
  // a가 Promise instance인 경우 then으로 풀어 값으로 평가한 뒤 f(acc, a)
  a.then(a => f(acc, a), e => e == nop ? acc : Promise.reject(e)) :
  // a가 Promise instance가 아닌 경우 값이므로 바로 f(acc, a)
  f(acc, a)
}

const head = iter => go1(take(1, iter), ([h]) => h);

const reduce = curry((f, acc, iter) => {
  if (!iter) return reduce(f, head(iter = acc[Symbol.iterator]()), iter);
  iter = iter[Symbol.iterator]();
  return go1(acc, function recur(acc) {
    let cur;
    while (!(cur = iter.next()).done) {
      acc = reduceF(acc, cur.value, f);
      if (acc instanceof Promise) return acc.then(recur);
    }
    return acc;
  });
});
  • recur 함수 내부에서 a와 acc를 처리할 때, 비동기 처리가 필요
  • 해당 부분의 함수가 길어지므로 별도의 함수(reduceF)로 작성한 뒤 호출
  • a가 Promise의 instance인지 판단하여 지연성을 두고 다르게 처리하는 것은 이전의 refactoring과 동일
  • then(a => f1(a), e => f2(e))
    • f1: 이전의 로직의 반환값을 인자로 다음 로직 진행
    • f2: 이전이 만약 reject를 반환했다면, 해당 reject에 대한 로직 진행
    • catch 문을 then 문에 합성하는 개념
  • 첫 값이 없는 경우 꺼내는 함수(head)를 작성해 로직 간소화 및 비동기 처리
  • 첫 값을 뽑아 다시 reduce에 인자로 전달하며 진행


지연된 함수열의 병렬적 평가

  • 자바스크립트는 싱글스레드이기 때문에 동시에 여러 작업은 불가능하고 비동기 IO로 처리
  • 하나의 스레드에서도 CPU를 점유하는 작업을 보다 효율적으로 처리
  • 한 node 환경이 아니라, 네트워크나 브라우저, 기타 IO 등 외부 환경으로 작업을 보내 한 번에 처리한 뒤, 이 결과를 받음


C.reduce

C : Concurrency 병행성

const C = {};
C.reduce = curry((f, acc, iter) => iter ?
  reduce(f, acc, [...iter]) :
  reduce(f, [...acc]));
  • iterator가 있는 경우 [...iter]를, 없는 경우 [...acc](이것이 곧 iter)를 reduce에 인자로 전달
  • 비동기성을 고려하지 않고, 모든 배열을 평가해 reduce로 넘긴다는 것


console.time('');
go([1, 2, 3, 4, 5],
  L.map(a => delay1000(a * a)),
  L.filter(a => a % 2),
  C.reduce(add),
  log,
  _ => console.timeEnd('')); // 1005.9492...ms
  • L.map과 L.filter를 사용하므로 go함수 내부에서 세로(함수별로 하나씩) 방향으로 작업을 처리
  • 때문에 기존 reduce를 사용하면 매번 delay1000 함수 실행
  • C.reduce를 활용하면 배열 내 요소 각각마다 delay1000 함수를 실행하는 것이 아니라, 배열을 펼쳐 한 번에 평가 후 다음 함수로 전달


Promise.reject의 평가

  • 함수를 진행하다보면 이후에 비동기적으로 Symbol(nop)을 인식해 이후 작업을 처리해준다.
  • 하지만 이후에 이를 처리하더라도 일단 reject가 넘어갔으니 console에 error가 찍히게 된다.
  • 이를 방지하기 위해 “뒤에서 비동기적으로 처리할거야”라는 메시지를 call stack에 전달하는 방법을 알아보자.
C.reduce = curry((f, acc, iter) => iter ?
  reduce(f, acc, catchNoop([...iter])) :
  reduce(f, catchNoop([...acc])));
  • 임시로 catch를 해두는 방식으로 해결


주의할 것 : catch된 것을 보내면 이후에 catch 불가능

C.reduce = curry((f, acc, iter) => {
  let iter2 = iter ? [...iter] : [...acc];
  iter2 = iter2.map(a => a.catch(function () {})); 🔆

  ...
});
  • catch가 이미 된 iterator는 이후에 또 다시 catch 불가능
  • **오류 기록을 위한 명색뿐인 catch를 하는 것일 뿐, catch 이전의 Promise를 그대로 전달하는 것


정리

const C = {};
function noop() {}
const catchNoop = arr => 
  (arr.forEach(a => a instanceof Promise ? a.catch(noop) : a), arr);

C.reduce = curry((f, acc, iter) => iter ?
  reduce(f, acc, catchNoop([...iter])) :
  reduce(f, catchNoop([...acc])));
  • noop : 아무 일도 하지 않는 함수
  • catchNoop : 배열을 받아 배열 요소 각각 확인하여 Promise instance이면 a.catch(noop)한 것들의 배열을 받는 함수
  • 이를 iter 유무에 따라 즉시 평가한 배열들을 인자로 넣어 반환


C.take

C.take = curry((l, iter) => take(l, catchNoop([...iter])));
  • 배열의 최대 길이와 배열을 인자로 전달받는다.
  • 이를 take 함수 취한 것을 반환
  • l은 그 자체로 최대 길이, 배열은 병렬 처리를 위해 모두 spread하여 즉시 평가 후 catchNoop에 전달
  • 이 반환값을 다시 반환


C.map, C.filter

즉시 병렬적으로 평가

특정 함수 라인에서만 병렬적으로 실시, 나머지는 동기적으로 할 때


C.takeAll = C.take(Infinity);
C.map = curry(pipe(L.map, C.takeAll));
C.filter = curry(pipe(L.filter, C.takeAll));


C.map(a => delay1000(a * a), [1, 2, 3, 4]).then(log) // 1초 뒤에 [1, 4, 9, 16]
C.filter(a => delay1000(a % 2), [1, 2, 3, 4]).then(log) // 1초 뒤에 [1, 4, 9, 16]

댓글남기기