合并类操作符
如果把Observable
数据流比做一条河流,两条河流汇聚的时候将会合并成一条大江,但是在RxJS
中,合并并不是这么简单,不同的Observable
数据流合并有不同的规则,有些是相互交错合并,有些是像排队一样,一条数据流走完,另一条才能跟在后面
concat
在Javascript
中,数组有一个concat
方法,这个方法可以把其他数组的值按照参数顺序合并到当前数组
const a = [1, 2, 3];
const b = [4, 5, 6];
const c = [7, 8, 9];
const d = [].concat(a, b, c);
// 1 2 3 4 5 6 7 8 9
在RxJS
中,concat
操作符也有类似的功能,可以把多个Observable
对象依次合并
import { of, concat } from "rxjs";
const source$ = concat(
of(1, 2, 3),
of(4, 5, 6),
of(7, 8, 9)
);
source$.subscribe(value => console.log(value));
// 1 2 3 4 5 6 7 8 9
concat
操作符的合并,必须等上一个Observable
对象完结后,才会继续合并后面的Observable
对象,如果一个Observable
对象永不完结,那么concat
操作符不会返回任何数据,这一点在合并异步数据时就能体现
合并异步Observable
对象
const source$ = concat(
timer(2000).pipe(mapTo("a")),
timer(1000).pipe(mapTo("b"))
);
source$.subscribe(value => console.log(value));
// 2秒后输出 a
// 1秒后输出 b
这里涉及到一个操作符mapTo
,可以理解这个数据流只会返回传入参数的数据
merge
merge
操作符在合并同步数据上,是和concat
操作符一样的,但是在合并异步数据流时,就能展现出差异
import { merge, timer } from "rxjs";
import { mapTo } from "rxjs/operators";
const source$ = merge(
timer(2000).pipe(mapTo("a")),
timer(1000).pipe(mapTo("b"))
);
source$.subscribe(value => console.log(value));
// 1秒后输出 b
// 2秒后输出 a
可以看出,merge
操作符在合并异步数据流时,是不会按照参数顺序依次等待的,而是先完结,先合并
merge
操作符还可以有一个number
类型可选参数concurrent
,这个参数的作用将限制merge
的同时合并
不使用concurrent
参数
import { interval, merge } from "rxjs";
import { mapTo } from "rxjs/operators";
const source$ = merge(
interval(1000).pipe(mapTo("a")),
interval(1000).pipe(mapTo("b")),
interval(1000).pipe(mapTo("c"))
);
source$.subscribe(value => console.log(value));
// 1秒后输出 a b c
// 再1秒后输出 a b c
// ...
使用concurrent
参数
const source$ = merge(
interval(1000).pipe(mapTo("a")),
interval(1000).pipe(mapTo("b")),
interval(1000).pipe(mapTo("c")),
2
);
source$.subscribe(value => console.log(value));
// 1秒后输出 a b
// 再1秒后输出 a b
// ,,,
使用concurrent
参数后,同时进行的Observable
数据流只会有限制的个数,后面限制住的Observable
对象会在前面的Observable
对象完结后,再加入merge
合并
zip
zip
的意思是拉链,拉链的链齿是一一对应的,zip
就能将数据一一对应起来
import { zip, of } from "rxjs";
const source$ = zip(
of(1, 2, 3),
of("a", "b", "c")
);
source$.subscribe(value => console.log(value));
// [ 1, "a" ]
// [ 2, "b" ]
// [ 3, "c" ]
可以看见每次输出是一个数组,元素的值和Observable
数据流是一一对应`,这里是同步的数据
如果是异步的数据呢?
import { zip, of, interval } from "rxjs";
const source$ = zip(
of("a", "b", "c"),
interval(1000)
);
source$.subscribe({
next: value => console.log(value),
complete: () => console.log("complete")
});
// [ "a", 0 ]
// [ "b", 1 ]
// [ "c", 2 ]
// complete
如果是异步的数据,就算一个Observable
对象先吐出数据,zip
为了做到一一对应,会把这个数据暂存下来,等另一个Observable
对象吐出数据后,一一对应后再输出
zip
操作符会在任一Observable
对象完结后,退订所有的Observable
对象,所以在of("a", "b", "c")
完结后,输出了complete
不过zip
会有数据堆积的问题,如果一个Observable
对象吐出数据很快,另一个很慢,那zip
会把吐出快的数据堆积起来,等吐出慢点吐出数据后一一对应,这样会导致堆积的数据越来越多,内存也占用越多
上述的zip
操作符例子,都只有2个Observable
对象,其实zip
操作符是可以处理个Observable
对象的,数据也是一一对应,在理解了像拉链一一对应的两个Observable
对象后,多个也不难理解了
combineLatest
combineLatest
和zip
一样,也会将数据放入数组中输出,不同的是,只要有Observable
吐出数据,combineLatest
就会取当前数据流产生的最新数据输出,并且combineLatest
的参数需要放入一个数组中
import { combineLatest, of, interval } from "rxjs";
// 参数是一个数组
const source$ = combineLatest([
interval(2000),
interval(1000)
]);
source$.subscribe({
next: value => console.log(value),
complete: () => console.log("complete")
});
// 等待两秒后
// [ 0, 0 ] // 第一次输出
// [ 0, 1 ] // 一个产生新数据(1),会使用另一个最新的数据(0)
// [ 0, 2 ]
// [ 1, 2 ]
// [ 1, 3 ]
// [ 1, 4 ]
// ...
当interval(1000)
操作符产生数据时,interval(2000)
还没有产生数据,所以不会有输出
2秒后,interval(2000)
产生数据了,可以一一对应,所以输出[ 0, 0 ]
,同时interval(1000)
输出了新值1,这时候interval(2000)
没有产生新数据,所以会用之前产生的数据输出[ 0, 1 ]
combineLatest
如果输出,会取所有数据流的最新数据,即使有一个数据流完结,另一个没有完结,combineLatest
还是会使用完结的数据流的最新数据持续输出
combineLatest
的第一次输出顺序也值得研究
同步数据流
const source$ = combineLatest([
of("a", "b", "c"),
of(1, 2, 3),
]);
// ["c", 1]
// ["c", 2]
// ["c", 3]
同步、异步数据流
const source$ = combineLatest([
of(1, 2, 3),
interval(10000),
interval(5000),
]);
// 10秒后
// [3, 0, 0]
// ...
我的理解是,如果是同步数据流,会根据顺序最后一个数据流吐出数据后,combineLatest
开始输出,如果是异步数据流,则会等待最慢的数据流吐出数据后,combineLatest
再输出
withLatestFrom
withLatestFrom
的功能类似combineLatest
,不过写法不同
import { interval } from "rxjs";
import { withLatestFrom } from "rxjs/operators";
const source$ = interval(1000).pipe(
withLatestFrom(interval(2000), interval(500))
);
source$.subscribe({
next: value => console.log(value),
complete: () => console.log("complete")
});
// [1, 0, 3]
// [2, 0, 5]
// [3, 1, 7]
// ...
可以看到,输出是连接withLatestFrom
的Observable
对象主导的,只有interval(1000)
输出的时候,withLatestFrom
才会输出,这时候interval(500)
的值已经是3了
race
race
操作符就像是Javascript
中的Promise.race
只会输出最快完成的Promise
一样,race
操作符也只会输出最快的Observable
对象产生的值
import { interval, race } from "rxjs";
import { mapTo } from "rxjs/operators";
const source$ = race(
interval(500).pipe(mapTo("a")),
interval(1000).pipe(mapTo("b")),
interval(2000).pipe(mapTo("c"))
);
source$.subscribe({
next: value => console.log(value),
complete: () => console.log("complete")
});
// a
// a
// a
// ...
interval(500)
是最快输出的,race
会退订其他的Observable
对象,只输出interval(500)
的Observable
对象
forkJoin
race
像是Javascript
中的Promise.race
,forkJoin
则像是Javascript
中的Promise.all
,它会等待所有的Observable
对象完结后再输出
import { timer, forkJoin } from "rxjs";
const source$ = forkJoin([
timer(1000),
timer(2000),
timer(3000)
]);
source$.subscribe(value => console.log(value));
// 等待3秒
// [ 0, 0, 0 ]
需要注意的是,Observable
对象也得放在数组中
startWith
startWith
操作符会在Observable
对象被订阅的时候,先吐出若干条数据,相当于初始值或者是默认值
import { interval } from "rxjs";
import { startWith } from "rxjs/operators";
const source$ = interval(5000).pipe(startWith("a"));
source$.subscribe(value => console.log(value));
// a
// 5秒后 0
// 1
// ...
与之对应的还有一个endWith