ZHANGYU.dev

October 14, 2023

RxJS学习笔记(三)合并类操作符

JavaScript5.5 min to read

合并类操作符

如果把Observable数据流比做一条河流,两条河流汇聚的时候将会合并成一条大江,但是在RxJS中,合并并不是这么简单,不同的Observable数据流合并有不同的规则,有些是相互交错合并,有些是像排队一样,一条数据流走完,另一条才能跟在后面

concat

Javascript中,数组有一个concat方法,这个方法可以把其他数组的值按照参数顺序合并到当前数组

const a = [1, 2, 3];
const b = [4, 5, 6];
const c = [7, 8, 9];

const d = [].concat(a, b, c);
// 1 2 3 4 5 6 7 8 9

RxJS中,concat操作符也有类似的功能,可以把多个Observable对象依次合并

import { of, concat } from "rxjs";

const source$ = concat(
    of(1, 2, 3),
    of(4, 5, 6),
    of(7, 8, 9)
);

source$.subscribe(value => console.log(value));
// 1 2 3 4 5 6 7 8 9

concat操作符的合并,必须等上一个Observable对象完结后,才会继续合并后面的Observable对象,如果一个Observable对象永不完结,那么concat操作符不会返回任何数据,这一点在合并异步数据时就能体现

合并异步Observable对象

const source$ = concat(
  timer(2000).pipe(mapTo("a")),
  timer(1000).pipe(mapTo("b"))
);

source$.subscribe(value => console.log(value));
// 2秒后输出 a
// 1秒后输出 b

这里涉及到一个操作符mapTo,可以理解这个数据流只会返回传入参数的数据

merge

merge操作符在合并同步数据上,是和concat操作符一样的,但是在合并异步数据流时,就能展现出差异

import { merge, timer } from "rxjs";
import { mapTo } from "rxjs/operators";

const source$ = merge(
  timer(2000).pipe(mapTo("a")),
  timer(1000).pipe(mapTo("b"))
);

source$.subscribe(value => console.log(value));
// 1秒后输出 b
// 2秒后输出 a

可以看出,merge操作符在合并异步数据流时,是不会按照参数顺序依次等待的,而是先完结,先合并

merge操作符还可以有一个number类型可选参数concurrent,这个参数的作用将限制merge的同时合并

不使用concurrent参数

import { interval, merge } from "rxjs";
import { mapTo } from "rxjs/operators";

const source$ = merge(
  interval(1000).pipe(mapTo("a")),
  interval(1000).pipe(mapTo("b")),
  interval(1000).pipe(mapTo("c"))
);

source$.subscribe(value => console.log(value));
// 1秒后输出 a b c
// 再1秒后输出 a b c
// ...

使用concurrent参数

const source$ = merge(
  interval(1000).pipe(mapTo("a")),
  interval(1000).pipe(mapTo("b")),
  interval(1000).pipe(mapTo("c")),
  2
);

source$.subscribe(value => console.log(value));
// 1秒后输出 a b
// 再1秒后输出 a b
// ,,,

使用concurrent参数后,同时进行的Observable数据流只会有限制的个数,后面限制住的Observable对象会在前面的Observable对象完结后,再加入merge合并

zip

zip的意思是拉链,拉链的链齿是一一对应的,zip就能将数据一一对应起来

import { zip, of } from "rxjs";

const source$ = zip(
    of(1, 2, 3),
    of("a", "b", "c")
);

source$.subscribe(value => console.log(value));
// [ 1, "a" ]
// [ 2, "b" ]
// [ 3, "c" ]

可以看见每次输出是一个数组,元素的值和Observable数据流是一一对应`,这里是同步的数据

如果是异步的数据呢?

import { zip, of, interval } from "rxjs";

const source$ = zip(
    of("a", "b", "c"),
    interval(1000)
);

source$.subscribe({
  next: value => console.log(value),
  complete: () => console.log("complete")
});

// [ "a", 0 ]
// [ "b", 1 ]
// [ "c", 2 ]
// complete

如果是异步的数据,就算一个Observable对象先吐出数据,zip为了做到一一对应,会把这个数据暂存下来,等另一个Observable对象吐出数据后,一一对应后再输出

zip操作符会在任一Observable对象完结后,退订所有的Observable对象,所以在of("a", "b", "c")完结后,输出了complete

不过zip会有数据堆积的问题,如果一个Observable对象吐出数据很快,另一个很慢,那zip会把吐出快的数据堆积起来,等吐出慢点吐出数据后一一对应,这样会导致堆积的数据越来越多,内存也占用越多

上述的zip操作符例子,都只有2个Observable对象,其实zip操作符是可以处理个Observable对象的,数据也是一一对应,在理解了像拉链一一对应的两个Observable对象后,多个也不难理解了

combineLatest

combineLatestzip一样,也会将数据放入数组中输出,不同的是,只要有Observable吐出数据,combineLatest就会取当前数据流产生的最新数据输出,并且combineLatest的参数需要放入一个数组中

import { combineLatest, of, interval } from "rxjs";
// 参数是一个数组
const source$ = combineLatest([
  interval(2000),
  interval(1000)
]);

source$.subscribe({
  next: value => console.log(value),
  complete: () => console.log("complete")
});
// 等待两秒后
// [ 0, 0 ] // 第一次输出
// [ 0, 1 ] // 一个产生新数据(1),会使用另一个最新的数据(0)
// [ 0, 2 ]
// [ 1, 2 ]
// [ 1, 3 ]
// [ 1, 4 ]
// ...

interval(1000)操作符产生数据时,interval(2000)还没有产生数据,所以不会有输出

2秒后,interval(2000)产生数据了,可以一一对应,所以输出[ 0, 0 ],同时interval(1000)输出了新值1,这时候interval(2000)没有产生新数据,所以会用之前产生的数据输出[ 0, 1 ]

combineLatest如果输出,会取所有数据流的最新数据,即使有一个数据流完结,另一个没有完结,combineLatest还是会使用完结的数据流的最新数据持续输出

combineLatest的第一次输出顺序也值得研究

同步数据流

const source$ = combineLatest([
  of("a", "b", "c"),
  of(1, 2, 3),
]);
// ["c", 1]
// ["c", 2]
// ["c", 3]

同步、异步数据流

const source$ = combineLatest([
  of(1, 2, 3),
  interval(10000),
  interval(5000),
]);
// 10秒后
// [3, 0, 0]
// ...

我的理解是,如果是同步数据流,会根据顺序最后一个数据流吐出数据后,combineLatest开始输出,如果是异步数据流,则会等待最慢的数据流吐出数据后,combineLatest再输出

withLatestFrom

withLatestFrom的功能类似combineLatest,不过写法不同

import { interval } from "rxjs";
import { withLatestFrom } from "rxjs/operators";

const source$ = interval(1000).pipe(
  withLatestFrom(interval(2000), interval(500))
);

source$.subscribe({
  next: value => console.log(value),
  complete: () => console.log("complete")
});
// [1, 0, 3]
// [2, 0, 5]
// [3, 1, 7]
// ...

可以看到,输出是连接withLatestFromObservable对象主导的,只有interval(1000)输出的时候,withLatestFrom才会输出,这时候interval(500)的值已经是3了

race

race操作符就像是Javascript中的Promise.race只会输出最快完成的Promise一样,race操作符也只会输出最快的Observable对象产生的值

import { interval, race } from "rxjs";
import { mapTo } from "rxjs/operators";

const source$ = race(
  interval(500).pipe(mapTo("a")),
  interval(1000).pipe(mapTo("b")),
  interval(2000).pipe(mapTo("c"))
);
source$.subscribe({
  next: value => console.log(value),
  complete: () => console.log("complete")
});
// a
// a
// a
// ...

interval(500)是最快输出的,race会退订其他的Observable对象,只输出interval(500)Observable对象

forkJoin

race像是Javascript中的Promise.raceforkJoin则像是Javascript中的Promise.all,它会等待所有的Observable对象完结后再输出

import { timer, forkJoin } from "rxjs";

const source$ = forkJoin([
  timer(1000),
  timer(2000),
  timer(3000)
]);
source$.subscribe(value => console.log(value));
// 等待3秒
// [ 0, 0, 0 ]

需要注意的是,Observable对象也得放在数组中

startWith

startWith操作符会在Observable对象被订阅的时候,先吐出若干条数据,相当于初始值或者是默认值

import { interval } from "rxjs";
import { startWith } from "rxjs/operators";

const source$ = interval(5000).pipe(startWith("a"));
source$.subscribe(value => console.log(value));
// a
// 5秒后 0
// 1
// ...

与之对应的还有一个endWith