RxJS を学ぼう #3 - 知ってると便利な ( かもしれない ) オペレータ 8選

RxJS を学ぼう #2 – よく使う ( と思う ) オペレータ15選 というエントリにて『頻出するオペレータ』をご紹介しました。RxJS 5.x系には約90ものオペレータがありますが、いきなりその全てを習得するのは無理がありますし必ずしも全てを知る必要はありません。

今回は前回ご紹介したものほど使う頻度は高くないけど、知っているとなにかと便利なオペレータをご紹介します。前回と今回のを合わせればそこそこのアプリケーションは作れてしまうのではないでしょうか。

Observable.do

Observable ソースから流れてくる全ての値に対して任意の処理 ( 副作用 ) を実行します。ただし、do の戻り値は ソースから流れてきた Observable そのままで、副作用の結果は流れません

const clicks = Rx.Observable.fromEvent(document.querySelector('#target'), 'click');
const positions = clicks
  .do((event: MouseEvent) => console.log(event.type))
  .map((event: MouseEvent) => event.clientX)
  .subscribe(
    (x) => console.log(x)

See the Pen Observable.do - DEMO by wakamsha (@wakamsha) on CodePen.

デバッグの際にその地点での値を確認するときによく使います。

[ Tips ] console.log 目的であれば do は不要

少々トリッキーですが、上記のコードをこのように変えても同様の結果が得られます。

const positions = clicks
  .map((event: MouseEvent) => console.log(event.type) || event.clientX)

ログ出力したい値が流れてくる直後に console.log を記述し、|| を挟んでの本来の処理内容が続くようにしています。console.log の戻り値は undefined すなわち false なので処理は必ず || 以降に続きます。しかし console.log もまた必ず実行されるので期待する両方の処理結果が得られるというわけです。

Observable.never

何の値も流さない Observable ソースです。つまり nextcomplete も発火しません。終了すら通知しないので一見すると何のためにあるのか謎なオペレータですが、主にテスト目的や他の Observable との合成する必要があるときなどで使うことがあります。必須の引数に値を渡したいけどまだ実装されていなかったり、ある特定の条件下では渡す値が無いといった場面でも仮の値として使われたりします。

const info = () => {
  console.log('Will not be called.');
};
Rx.Observable
  .never()
  .startWith('Start streaming.')
  .subscribe(
    (x) => console.log(x),
    (error: Error) => info(),
    () => info()
  );

See the Pen Observable.never - DEMO by wakamsha (@wakamsha) on CodePen.

類似するオペレータに Observable.empty がありますが、こちらは complete だけ発火させるという違いがあります。

Observable.delay

その名の通り Observable ソースから来る値が流れるのを遅らせます。引数は number型もしくは Date型を指定することができ、number ( 相対時間 ) だとミリ秒単位で流れるのを遅らせ、Date ( 絶対時間 ) だとその日時まで遅らせます。

Rx.Observable
  .fromEvent(document.querySelector('#btn'), 'click')
  .delay(1000)
  .mapTo(1)
  .scan((acc, val) => acc + val, 0)
  .subscribe(
    (x) => console.log(x)
  );

See the Pen Observable.delay - DEMO by wakamsha (@wakamsha) on CodePen.

Observable.switch

Observable の更に内部にある Observable ( Observable<Observable<T>> ) を flat にするオペレータです。これだけだと flatMap と同じですが、switch はその名の通りそれまで流れてたもの ( 内部 Observab ) をキャンセルして後から来る Observable へとスイッチしていきます。最終的には最新の値のみが subscribe されます。 『Observable を発行する Observable』とも言われます。

click  : ---------c-c------------------c--.. 
        map(() => Rx.Observable.interval(1000))
source : ---------o-o------------------o--..
                   \ \                  \----0----1--...
                    \ ----0----1----2----3----4--...
                     ----0----1----2----3----4--...
                     switch()
example: -----------------0----1----2--------0----1--...

図解するとこのような流れになります。クリックイベントが発火する度に新しい内部 Observable が作られ、それまで流れてた Observable ソースからそちらに切り替わることによって新しい内部 Observable から流れてくる値を受け取ります。

Rx.Observable
  .fromEvent(document.querySelector('#btn'), 'click')
  .mapTo(1)
  .scan((acc, val) => acc + val, 0)
  .map((val: number) => Rx.Observable.interval(1000).take(5).map(x => x + val))
  .switch()
  .subscribe(
    x => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
1  //=> ( click )
2
3
4
2  //=> ( click )
3
4
5

See the Pen Observable.switch - DEMO by wakamsha (@wakamsha) on CodePen.

Observable.throttle

throttle とは、最初のイベントを処理した後に続く同様のイベントを指定した時間は間引き、時間経過後に処理を実行するというアーキテクチャです。Rx 固有のものではなく、jQuery や lodash.js にも同様のメソッドが用意されています。

RxJS における throttle は、『指定した時間』を 別 ( 内部 ) の Observable のタイマーが有効になっている期間 として指定します。具体的にはObservable.interval(1000) の duration 期間などです。

Rx.Observable
  .fromEvent(document.querySelector('#target'), 'mousemove')
  .throttle(() => Rx.Observable.interval(1000))
  .subscribe(
    (event: MouseEvent) => console.log(`x: ${event.clientX}, y: ${event.clientY}`),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
"x: 93, y: 11"
"x: 180, y: 87"
"x: 303, y: 106"
"x: 174, y: 67"

See the Pen Observable.throttle - DEMO by wakamsha (@wakamsha) on CodePen.

このデモでは、ターゲット領域上でマウスカーソルを動かす度に値が流れますが、続く throttle() に渡した Observable.interval(1000) によって最初のイベント発生処理から1秒間は処理が間引かれており、1秒経過後に二回目のイベントを処理しています。

ウィンドウのリサイズなど大量のイベントが発生するようなシチュエーションにおいて適度に処理を間引いて負荷を下げるといった用途で使われます。

Observable.throttleTime

throttle が 引数に Observable を取ることによって間引き時間を指定したのに対し、throttleTime はシンプルに数値型 ( ミリ秒 ) を受け取って指定します。

// どちらも同じ
throttle(() => Rx.Observable.interval(1000));
throttleTime(1000);

jQuery や lodash.js のように使うのであれば throttleTime を使うのが良いでしょう。

RxJS 4.x 系までは throttle が数値型を引数に取っていましたが、5系からは Observable を受け取るようになり、新たに追加された throttleTime で数値型を受け取るようになりました。

Observable.debounce

throttle が指定した時間の間は同一イベントを処理しないのに対し、debounce は指定した時間内に同一イベントが発生すると処理せず、発生しなければ処理を実行するというアーキテクチャです。

Rx.Observable
  .fromEvent(document.querySelector('#target'), 'mousemove')
  .debounce(() => Rx.Observable.interval(1000))
  .subscribe(
    (event: MouseEvent) => console.log(`x: ${event.clientX}, y: ${event.clientY}`),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );

See the Pen Observable.debounce - DEMO by wakamsha (@wakamsha) on CodePen.

RxJS における debounce も Observable を引数に取ることで時間を指定します。数値型 ( ミリ秒 ) を引数に取りたい場合は debounceTime オペレータを使います。

YouTube などのビデオプレイヤーには、マウスカーソルを一定時間動かさないと再生 / 停止ボタンやシークバーといったコントローラが非表示になるというインタラクションが良く見られますが、こういったものは debounce を使うことで簡単に実装出来ますね。

こちらも throttle と同じく jQuery, lodash.js にも同様のメソッドが用意されています。

Observable.partition

Observable ソースを二つに分割するオペレータで、使い方は filter オペレータと非常によく似ています。filter は任意のフィルタリング関数において true となる値だけを返すのに対して、partition は true となる値と false となる値の Observable を配列で返します。

[true を返す Observable, false を返す Observable];

1つ目は true、2つ目は false となる値の Observable です。

const [evens, odds]: [Rx.Observable<number>, Rx.Observable<number>] = Rx.Observable
  .from([1,2,3,4,5,6])
  .partition(val => val % 2 == 0);
Rx.Observable
  .merge(
    evens.map(val => `Even: ${val}`),
    odds.map(val => `Odd: ${val}`)
  )
  .subscribe(
    x => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
"Even: 2"
"Even: 4"
"Even: 6"
"Odd: 1"
"Odd: 3"
"Odd: 5"
"-------- completed!!! --------"

1から6までの整数を奇数と偶数に分配し、それぞれの値を返す Observable の配列を作るというものです。以下のデモから実際の動きをご覧いただけます。

See the Pen Observable.partition - DEMO 1 by wakamsha (@wakamsha) on CodePen.

他にも以下のように partition を使って Success と Error といった Observable ソースに分割することも可能です。

const [success, error] = Rx.Observable
  .from([1,2,3,4,5,6])
  .map(val => {
    if (val > 3) {
      throw `${val} greater than 3!!!`;
    }
    return { success: val };
  })
  .catch(val => Rx.Observable.of({error: val}))
  .partition(res => res.success);
"Success! 1"
"Success! 2"
"Success! 3"
"Error! 4 greater than 3!!!"
"-------- completed!!! --------"

map 内のフィルタリング関数で値が3より大きいかどうかを評価しています。大きいと throw 処理によって catch オペレータ行きとなり Error オペレータとなります。3 より小さいと成功とみなされて {success: val} 値が流れる success Observable を返します。以下のデモから実際の動きをご覧いただけます。

See the Pen Observable.partition - DEMO 2 by wakamsha (@wakamsha) on CodePen.

締め

頻繁に登場するものではないものの、どこかで役立ちそうなオペレータをご紹介しました。一つ一つは地味な機能で Angular や React といったフレームワーク / ライブラリのような華やかなものではありませんが、小さなものを少しずつ積み重ねていくことで中 ~ 大規模なアプリケーションも作れるようになることでしょう ( たぶん ) 。