RxJs Observables

What is Observable

Observable利用观察者模式,建立了发布者publisher和订阅者subscriber之间的联系。联系本身不会修改发布的信息,只是定义声明了订阅者对发布信息的处理方式。

使用目的:

  1. 流式处理本身是延迟执行的,即在需要数据的一刻进行处理,并且不改变数据本身而是生成一个新的流。多流处理的支持。

  2. 异步回调保证了流式处理结果的实时渲染,因此Observable是高性能前端的重要概念。可以将同步方法转换成异步方法。

  3. 支持事件处理,动态注册事件及句柄。

创建Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { Observable} from 'rxjs';
// subscriber/observer is callback functions for this observable.
// {next, error, complete}
const observable = new Observable(subscriber => {
// observable can decide how frequency next/error/complete callback is called.
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);

setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);

return {unsubscribe() {}};
})

传入Observable回调函数/创建一个subscriber

1
2
3
4
5
observable.subscribe({
x => console.log('got value ' + x),
err => console.error('got error ' + err),
() => console.log('done')
})

创建multicast Observable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
function multicastSequenceSubscriber(){
const observers = [];

let timeoutId;

return (observer) => {
observers.push(observer);
// start the sequence when firstly subscribed.

if( observers.length === 1) {
timeoutId = doSequence({
next(val) {
// 将所有observer封装成一个observer
observers.forEach(obs => obs.next(val));
},

complete() {
observers.slice(0).forEach( obs => obs.complete());
}
}, seq, 0)
}

return {
unsubscribe() {
//remove observer
observers.splice(observers.indexOf(observer), 1);
// cancel subscription if no observers.
if(observers.length === 0){
clearTimeout(timeoutId);
}

}
}
}
}

const multicastObservable = new Observerble(multicastSequenceSubscriber())

// subscribe to this observable
multicastObservable.subscribe({
next(num) { console.log(num)}
complete(){}
})

multicastObservable.subscribe({
next(num) { console.log('second: ' + num)}
complete(){}
})

Observable关系操作符

Area Operator
Creation from, fromEvent, of
Comibination combineLatest,concat, merge, startWith, withLatestFrom, zip
Filtering debounceTime, distinctUtilChanged, filter, take, takeUtil
Transformation bufferTime, concatMap, map, mergeMap, scan, switchMap
Utility tap
Multicasting share

Observable的关系操作符,是通过.pipe()引入的,更多的操作符,可以参考rxjs官方API文档here

Using observables in Angular

  • EventEmitter
  • HTTP模块处理AJAX requests
  • Router, Forms 模块监听/响应用户输入

Observable v.s. Promise

  • 共同点
方面 细节
异步处理 subscribe/then callback
使用语言 支持typescript, javascript
支持HTTP 通过Angular httpClient支持, observable.toPromise()
  • 不同点
方面 细节
使用场景 O: 事件处理句柄, 流式处理; P: 链式处理
操作符复杂度 O:多样化; P:单一

Reactive Programming

使用异步数据流进行编程,通过代码来忠实反映业务之间的关系。