操作符
简单来讲,操作符就是解决某个具体应用问题的模式,在针对不同问题时,需要使用不同的操作符,如合并、过滤、异常捕获等
我的理解就是,如果把一个Observalbe
比做一个数组,那么操作符就是数组的方法,可以使用map
、filter
、forEach
来对这个数组进行操作
操作符大致分为以下几类
- 创建类
- 转化类
- 过滤类
- 合并类
- 多播类
- 错误处理类
- 辅助工具类
- 条件分支类
- 数学和合计类
创建类
如果使用new Observable
这样的方式来创建,定制性很高,如果只是使用一些简单的数据来创建Observable
,那么RxJS
中提供了专门用来创建Observable
的操作符
部分操作符接受一个可选参数**scheduler**
,不过我暂时没有学习,所以先忽略掉
of
of
操作符可以指定固定的数据来创建Observable
对象
import { of } from "rxjs";
const source$ = of(1, 2, 3);
source$.subscribe(value => console.log(value));
// 1 2 3
range
of
是指定数据,range
则是指定一个范围
import { range } from "rxjs";
const source$ = range(5, 10);
source$.subscribe(value => console.log(value));
// 5 6 7 8 ... 14
range
函数接受 2 个可选参数,第一个参数为起始值,默认值为 0,第二个参数为个数,默认值为undefined
后续的每一个数都会加 1,向上述例子,从 5 开始,每次加 1,知道个数达到 10 个,也就是到达数字 14 时结束
generate
range
可以连续的创建数字,如果想要每次输出相隔两个数字呢?
generate
就像是RxJS
中的for
循环
import { generate } from "rxjs";
const source$ = generate(0, x => x < 10, x => x + 1);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 5 6 7 8 9
generate
可接受 4 个参数,第一个参数为初始值,第二个参数为判断条件,第三个参数为值的递增,第四个参数可以是结果的处理函数
const source$ = generate(0, x => x < 10, x => x + 1, x => x * 10);
source$.subscribe(value => console.log(value));
// 0 10 20 30 40 50 60 70 80 90
增加第四个参数后,输入会根据处理函数的返回值
相较于for
循环来看,RxJS
中的generate
更纯了,函数式,比如for
循环,每次都会对条件变量做处理,所以不能用const
repeat
repeat
可以对上游的数据进行重复的输出,如果想要一个1,2,3,1,2,3,1,2,3
的数据,用of
当然是可以做,但是用repeat
更加清晰
import { of } from "rxjs";
import { repeat } from "rxjs/operators";
const source$ = of(1, 2, 3).pipe(repeat(3));
source$.subscribe(value => console.log(value));
// 1 2 3 1 2 3 1 2 3
重复 3 次of(1,2,3)
的数据,这里调用了一个函数叫pipe
,在RxJS v5
中,这里的调用方式应该是
of(1, 2, 3).repeat(3)
就是链式操作,但在RxJS v6
中,需要使用pipe
操作符来,pipe
就是一个管道,数据从管道进入,进过处理,再从管道出去
EMPTY、NEVER、throwError
EMPTY
是直接返回一个完成的Observable
,会直接调用观察者的completed
函数
NEVER
则是返回一个永不完结的Observable
,不会调用观察者的任何函数
throwError
是抛出一个错误,如果观察者有error
函数则调用,否则抛出一个错误
import { EMPTY, NEVER, throwError } from "rxjs";
const source1$ = EMPTY;
const source2$ = NEVER;
const source3$ = throwError("error");
source1$.subscribe({
complete() {
console.log("completed");
},
});
source2$.subscribe(value => console.log(value));
source3$.subscribe({
error(err) {
console.log(err);
},
});
// completed
// 什么都没有
// error
interval 和 timer
上面的操作符,都是产生同步数据的,最简单的产生异步数据的方法就是interval
和timer
,从名字可以看出,它们的功能和setInterval
和setTimeout
类似
interval
import { interval } from "rxjs";
const source$ = interval(1000);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 ...
interval可接受1个参数,如上每隔
1000ms`输出依次,从 0 开始,每次加 1
1 秒后输出 0,再一秒后输出 1,再一秒后输出 2...永不完结
timer
import { timer } from "rxjs";
const source$ = timer(1000);
source$.subscribe(value => console.log(value));
// 0
timer
可接受 2 个参数,不同的是第一个参数可以为数字,也可以为Date
对象,如果是数字,则代表毫秒,等待相应时间后输出 0
上例中,一秒后输出 0,完结
第二个参数则是周期,如果有第二个参数,timer
也能用不完结
const source$ = timer(1000, 1000);
source$.subscribe(value => console.log(value));
// 0 1 2 3 4 ...
这样的效果就和interval
一样了
from
from
可以把一个像Observable
的参数,转为真正的Observable
对象
一个数组像Observable
,一个伪数组对象也像,字符串也像,Promise
也,都可以通过from
转为真正的Observable
对象
数组
import { from } from "rxjs";
const source$ = from([1, 2, 3]);
source$.subscribe(value => console.log(value));
// 1 2 3
伪数组
const source$ = from({
0: 0,
1: 1,
2: 2,
length: 3,
});
source$.subscribe(value => console.log(value));
// 1 2 3
字符串
const source$ = from("hello RxJS");
source$.subscribe(value => console.log(value));
// h e l l o R x J S
Promise
const source$ = from(Promise.resolve("resolve"));
source$.subscribe(value => console.log(value));
// 输出resolve
这里的Promise
是resolve
的,如果是reject
,则会触发抛出错误或调用观察者的error
函数
const source$ = from(Promise.reject("reject"));
source$.subscribe(value => console.log(value));
// 抛出错误reject
fromEvent
fromEvent
可以把网页中的DOM
事件转化为Observalbe
对象
import { fromEvent } from "rxjs";
const source$ = fromEvent(document, "click");
source$.subscribe(value => console.log(value));
// MouseEvent
fromEvent
可以接受 3 个参数,第一个参数为事件的DOM
对象,第二个参数为事件的名称,第三个参数是EventListenerOptions
const source$ = fromEvent(document, "click", { once: true });
source$.subscribe(value => console.log(value));
如这个事件因为传入了第三个参数,只会触发一次
repeatWhen
repeat
能够重复订阅上游的Observable
,但是不能控制订阅时间,如等待 2 秒再重新订阅
repeatWhen
就可以满足上面的需求
import { of } from "rxjs";
import { delay, repeatWhen } from "rxjs/operators";
const source$ = of(1, 2, 3).pipe(
repeatWhen(notifications => notifications.pipe(delay(2000)))
);
source$.subscribe(value => console.log(value));
// 1 2 3 等待2秒 1 2 3
当上游Observable
completed
的时候,会传给repeatWhen
一个notifications
参数,这个参数也是Observable
对象
defer
defer
可以只在观察中订阅的时候才创建Observable
对象,通常用于Observable
工厂函数
import { defer, fromEvent, interval } from "rxjs";
const clicksOrInterval = defer(function() {
return Math.random() > 0.5 ? fromEvent(document, "click") : interval(1000);
});
clicksOrInterval.subscribe(x => console.log(x));
刷新页面后,可能输出MouseEvent
,也可能输出interval
的数字