RxJS を学ぼう #2 - よく使う ( と思う ) オペレータ15選

RxJS 5.x系には約90ものオペレータがありますが1)4.x系の頃は約140あったので、これでも相当少なくなった方なのです…、いきなりその全てを習得するのは無理がありますし必ずしも全てを知る必要はありません。頻繁に利用するのはそのうちのごく一部であり、あくまで所感ですが二十も覚えれば大概のアプリは作れることでしょう。

今回はいま現在実際に開発している弊社プロダクトのコードから特に使用頻度の高いオペレータ15個をご紹介します。

Converting events to Operators

Observable.of

引数に渡された値をそのまま流して終了するだけの Observable を生成します。可変長引数なのでカンマ区切りで複数指定することが出来、その場合は前から順に一つずつ出力します。全て出力し終わると最後にcompleteイベントが発火します。

単一の値
Rx.Observable.of(1).subscribe(
  (x) => console.log(x),
  (error: Error) => console.log(error),
  () => console.log('-------- completed!!! --------')
);
1
"-------- completed!!! --------"
可変長引数
const numbers = Rx.Observable.of(10, 20, 30);
numbers.subscribe(
  (x) => console.log(x),
  (error: Error) => console.log(error),
  () => console.log('-------- completed!!! --------')
);
10
20
30
"-------- completed!!! --------"
複数の Observable を concat して値を流す例
const numbers = Rx.Observable.of(10, 20, 30);
const letters = Rx.Observable.of('foo', 'bar', 'baz');
numbers.concat(letters).subscribe(
  (x) => console.log(x),
  (error: Error) => console.log(error),
  () => console.log('-------- completed!!! --------')
);
10
20
30
"foo"
"bar"
"baz"
"-------- completed!!! --------"

明示した値を流すだけのシンプルな Observable が欲しい時や開発途中のちょっとした動作確認などでよく使います。

Observable.from

配列といった Iterable な引数をとり、それらを順番に流す Observable を生成します。

Rx.Observable.from(['a','b','c']).subscribe(
  (x) => console.log(x),
  (error: Error) => console.log(error),
  () => console.log('-------- completed!!! --------')
);
"a"
"b"
"c"
"-------- completed!!! --------"

from の引数に文字列を渡すと、一文字ずつ Iterate する StringIterator に変換されてから出力されます。

Rx.Observable.from('hello').subscribe(
  (x) => console.log(x),
  (error: Error) => console.log(error),
  () => console.log('-------- completed!!! --------')
);
"h"
"e"
"l"
"l"
"o"
"-------- completed!!! --------"

Time-based Operators

Obervable.interval

setInterval のように指定された間隔ごとに値を流す Observable を生成します。流れてくる値は0, 1, 2... といった整数のシーケンスです。crearIntervalのようにシーケンスを止めるには、Subscription.unsubscribe メソッドを呼び出します。この場合 complete イベントは発火しません。

const intervalSource = Rx.Observable.interval(1000);
const subscription = intervalSource.subscribe(
  (x) => console.log(x),
  (error: Error) => console.log(error),
  () => console.log('-------- completed!!! --------')
);
setTimeout(() => subscription.unsubscribe(), 5000);
0
1
2
3
4

Transforming Operators

Observable.map

Array.prototype.map のような挙動をイメージしていただければ OK。任意の関数を Observable ソースによって生成され流れてくる値に適用し、その戻り値を再び Observable として返します。

Rx.Observable
  .from([1, 2, 3])
  .map(value => value * 10)
  .subscribe(
    (x) => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
10
20
30
"-------- completed!!! --------"

最も使用頻度が高いオペレータの一つです。

Observable.mapTo

Observable.map と似ていますが、こちらは流れてくる値を毎回同じ固定値にして返します。👍ボタンを押すと +1👎ボタンを押すと -1を返したいといったシチュエーションで使えます。

Rx.Observable
  .fromEvent(document.querySelector('#like'), 'click')
  .mapTo(1)
  .subscribe(
    (x) => console.log(x)
  );
Rx.Observable
  .fromEvent(document.querySelector('#unlike'), 'click')
  .mapTo(-1)
  .subscribe(
    (x) => console.log(x)
  );

See the Pen Observable.mapTo - DEMO by wakamsha (@wakamsha) on CodePen.

Observable.scan

非常に重要度の高いオペレータです。公式ドキュメントでは以下のように解説されています。

Observableソースにアキュムレータ関数を適用し、オプションのシード値を使用して各中間結果を返します。
reduce関数に似ていますが、ソースが値を流すたびに現在の累積を出力します。

Observable ソースから流れてきた値に何かしらの処理 ( アキュムレータ関数 ) を適用してその結果を Observable として返します。これだけだと map と同じですが、scanは前回流れてきた値、すなわち前回までの累積結果 ( 中間結果 ) も一緒に流れてきます

const clicks = Rx.Observable.fromEvent(document.querySelector('#btn'), 'click');
const ones = clicks.mapTo(1);
const seed = 0;
const count = ones.scan((acc, one) => acc + one, seed);
count.subscribe(
  (x) => console.log(x)
);
1
2
3
4

以下のデモにて実際の動きをご覧いただけます。

See the Pen Observable.scan - DEMO by wakamsha (@wakamsha) on CodePen.

ボタンをクリックする度に1という値が流れ、前回までの値に その1を足した結果を返します。前回までの値とボタンクリックから来る1は、それぞれaccone という引数に格納されています。

ここでいうところのアキュムレータ ( Accumulator ) とは、累積結果と思っていただければひとまず OK です。

Observable.switchMap

個人的に理解に一番苦労したオペレータです。

Observable ソースから値が流れ出す ( A ) のを起点として別の Observable ( B ) を流すことが出来ます。また、その際に起点として流れてきた値 [ A ] を Observable [ B ] は引数として受け取る事ができます。

const ones = Rx.Observable
  .fromEvent(document.querySelector('#btn'), 'click')
  .mapTo(1);
const seed = 0;
const increase = ones.scan((acc, one) => acc + one, seed);
const interval = Rx.Observable.interval(1000);
increase
  .switchMap(x => interval.take(5).map(index => index * x))
  .subscribe(
    (x) => console.log(x)
  );

文章だけだとややこしいだけなので以下のデモを見てみましょう。

See the Pen Observable.switchMap - DEMO by wakamsha (@wakamsha) on CodePen.

ボタンをクリックする度に値 1 が流れます。流れる値は scan によって累積されるので、ボタンをクリックする度に1, 2, 3...と流れる値が増えていきます。ここまでが Observable [ A ] です。

この [ A ] から来る値を受け取ってマップするのが [ B ] です。[ B ] は [ A ] から受け取った値xと inverval() からくる整数を掛け合わせる処理を5回繰り返します。

[ B ] の処理は [ A ] から値が流れて来るタイミングで開始されますが、[ B ] の処理が完了する前に次の [ A ] の値が流れてくると、それまで実行中だった処理が中断され、新しく流れてきた値と共にまた最初から処理を開始します。キー入力する度に検索結果を返すインクリメンタルサーチなどは不要な非同期処理が発生しがちなので、それらを適宜キャンセルしたいときなどで活用できます。

Filtering Operators

Observable.filter

Array.prototype.filter のような挙動をイメージしていただければ OK。任意のフィルタリング関数を流れてくる値に適用して true となるものだけを Observable として返します。

Rx.Observable.from([1, 2, 3, 4, 5, 6])
  .filter(x => x % 2 === 0)
  .subscribe(
    (x) => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
2
4
6
"-------- completed!!! --------"

Observable.skip

その名の通り Observable ソースから流れてくる値のうち、指定した数 ( 回数 ) をスキップします。

Observable.take

Observable ソースから流れてくる値のうち、指定した数 ( 回数 ) だけ受け取ります。全て受け取ると complete イベントが発火します。

Rx.Observable.interval(1000)
  .skip(1)
  .take(5)
  .subscribe(
    (x) => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
1
2
3
4
5
"-------- completed!!! --------"

Observable はよく川の流れ ( Stream ) と比喩されるように記述したオペレータの順に処理が行われます。上記のデモは『最初の1回をスキップし、その後値を5回受け取る』という処理内容でしたが、skip(1)take(5) の順序を入れ替えると『5回流すうちの最初の1回をスキップする』という結果に変わります。

1
2
3
4
"-------- completed!!! --------"

画面遷移リンクやフォームの送信など1回だけしか実行したくない ( 連打させたくない ) ときは take(1) とするだけで実装出来てしまいます。

Observable.distinctUntilChanged

Observable ソースから来る全ての値のうち、前回の値から変化したものだけを流します。例えばfrom([1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4]) という Obserbable ソースの場合、distinctUntilChanged を通って流れる値は 1, 2, 1, 2, 3, 4 となります。つまり二番目の1、三・四番目の2、十番目の3はそれら一つ前の値と被っているのでスキップされます。

Rx.Observable
  .from([1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4])
  .distinctUntilChanged()
  .subscribe(
    (x) => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
1
2
1
2
3
4
"-------- completed!!! --------"

数値や文字列といったプリミティブな値であればdistinctUntilChanged()と記述するだけで前回の値との比較が出来ますが、比較対象がオブジェクト形式の場合は filter のときと同じような任意の比較関数を自前で用意して判定します。比較関数は前回の値と今回の値の二つを引数として受け取ります。

type Person = {
  age: number;
  name: string;
}
Rx.Observable.of<Person>(
  { age: 4, name: 'Foo'},
  { age: 7, name: 'Bar'},
  { age: 5, name: 'Foo'},
  { age: 6, name: 'Foo'}
)
  .distinctUntilChanged((prev: Person, current: Person) => prev.name === current.name)
  .subscribe(x => console.log(x));
Object { age: 4, name: "Foo" }
Object { age: 7, name: "Bar" }
Object { age: 5, name: "Foo" }

Combining Operators

Observable.merge

可変長引数で複数の Observable を受取り、各 Observable に値が流れてくる度にそれをそのまま流します。名前の通り、複数ある川が一本に合流するといったイメージです。

const click = Rx.Observable.fromEvent(document.querySelector('#btn'), 'click');
const interval = Rx.Observable.interval(2000);
Rx.Observable.merge(
  click.mapTo('clicked!'),
  interval.take(10)
).subscribe(
  (x => console.log(x))
);

See the Pen Observable.merge - DEMO by wakamsha (@wakamsha) on CodePen.

こちらのデモではボタンをクリックすると interval(1000) が発動して1秒毎に整数値がログ出力されます。それ以降にボタンをクリックするとClickという文字列が整数値に紛れ込んで流れます。

map 同様、非常に使用頻度の高いオペレータです。

Observable.startWith

指定した引数の値を Observable ソースにから来る値より先に流します。Array.prototype.unshift のような挙動をイメージしていただければ OK です。

Rx.Observable.from([1, 2, 3])
  .startWith(4)
  .subscribe(
    (x) => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
4
1
2
3
"-------- completed!!! --------"

初期値など必ず値を流したいといった場面で使います。

Observable.combineLatest

switchMap に次いで理解に苦労したオペレータです。merge 同様、可変長引数で複数の Observable を受け取ります。ここまでは merge と同じですが、merge が各 Observable に値が流れる度にその値だけを流すのに対して、combineLatest は他の Observable の直近の値も一緒に流します

const numbers = Rx.Observable
  .interval(1000)
  .take(5);
const letters = Rx.Observable
  .interval(1500)
  .take(5)
  .map((index: number) => 'ABCDEFG'[index]);
Rx.Observable.combineLatest(
  numbers,
  letters,
  (num: number, letter: string): string => num + letter
).subscribe(
  (x) => console.log(x),
  (error: Error) => console.log(error),
  () => console.log('-------- completed!!! --------')
);
"0A"
"1A"
"2A"
"2B"
"3B"
"3C"
"4C"
"4D"
"4E"
"-------- completed!!! --------"

初めての方は何を言ってるのかサッパリかもしれませんので、デモを見てみましょう。

See the Pen Observable.combineLatest - DEMO by wakamsha (@wakamsha) on CodePen.

1秒毎に整数を 5回流すものと 1.5秒ごとにアルファベットを 5回流すという二つの Observable を用意しました。combineLatest はいずれかの値が流れるともう片方の直近の値も一緒に流します。それらの値は3つ目 ( 最後の ) 引数である関数で受け取ることが出来、この戻り値を再び Observable として返します。引数は可変長となっており、その順序は combineLatest の引数に渡した順序と同じです。

注意したいのは、全ての Observable が最初の値を流しはじめるまで結果値が流れてこないということです。上の例で言うと、開始から1秒後に numbers の最初の値が流れ始めますが、letters が流れ始めるのは開始から1.5秒後なので、それまでどちらの値も受け取ることが出来ません。ここをしっかり理解しておかないと期待するタイミングで値を受け取れないといった落とし穴にハマりやすくなります。

こちらの Marble Diagram が理解の手助けになると思うので、併せて参考いただければと思います。

ちなみに combineLatest の最後の引数に関数を指定せず、その直後の map オペレータに渡す Iterator ( 関数 ) の引数として受け取ることもできます。ただし、map に渡す Iterator は引数を一つしか受け取れないので配列形式で受け取ることになります

// 本来の書き方
Rx.Observable.combineLatest(
  numbers,
  letters,
  (num: number, letter: string): string => num + letter
);
// こっちでもOK
Rx.Observable.combineLatest(
  numbers,
  letters
).map(([num, letter]: [number, string]) => num + letter);

Observable.withLatestFrom

ある Observable ソースからの値にもう一方の Observable ソースの最新値を合成するオペレータです。combineLatest と似ていますが、combineLatest が各 Observable いずれかの値が流れるタイミングでイベント発火するのに対して withLatestFrom は最初に起点となる Observable の値が流れるタイミングの時だけイベント発火します。以下はそのデモです。

const number$ = Rx.Observable.interval(1000)
  .take(5);
const letter$ = Rx.Observable.interval(2000)
  .take(5)
  .map(index => 'ABCDEFG'[index]);
number$
  .withLatestFrom(
    letter$,
    (num, letter) => num + letter
  )
  .subscribe(
    (x) => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
"1A"
"2A"
"3B"
"4B"
"-------- completed!!! --------"

それぞれの値は combineLatest と同じく、オペレータに渡す最後の引数の関数で受け取ることが出来ます。最後に関数を渡さなければ、直後の map オペレータに渡す Iterator の引数として受け取ることも出来ます。これも combineLatest と同様ですね。

試しに numbers.withLatestFrom(letters)letters.withLatestFrom(numbers) と書き換えみると、よりその仕組が理解出来るのではないでしょうか。結果は以下のように変わります。

letter$
  .withLatestFrom(
    number$,
    (letter, num) => num + letter
  )
  .subscribe(
    (x) => console.log(x),
    (error: Error) => console.log(error),
    () => console.log('-------- completed!!! --------')
  );
"A1"
"B3"
"C4"
"D4"
"E4"
"-------- completed!!! --------"

締め

map や filter というメソッド名から察する限り、基本的には lodash.js に見られる文字列・配列操作と似たイメージかと思われます。個人的には Node.js の Stream にも通ずるものを感じます。徐々に慣れていきましょう。

脚注

脚注
1 4.x系の頃は約140あったので、これでも相当少なくなった方なのです…