ZHANGYU.dev

October 14, 2023

RxJS学习笔记(一)Observable的创建

JavaScript4.9 min to read

题外话

为什么想要学习RxJS,虽然知道RxJS在异步编程中很强大,但是在现在不管是Promise还是async/await,对于熟悉javascript的程序员来说,都是很容易使用的,异步代码的执行越来越简单了

但是我认为,学习不止是使用,更重要的是思想,相信能在Rxjs的学习中,对函数式,响应式编程能有更深的理解

其实还有一点是之前看见有人发leetcode的招聘,技术栈是typescript + react + graphQL + rxjs,这简直是太棒了,梦想一般的技术栈,然而人家肯定看不上现在的我,所以我要学习,成长!

学习Rxjs,我是通过官方文档 + 程墨出版的《深入浅出RxJS》,在书中,我看见了他的一段话

当然,我们学习RxJS,并不是因为RxJS是一项炫酷的技术,也不是因为RxJS是一个最新的技术。在技术的道路上,如果只是追逐“炫酷”和“最新”,肯定是要吃苦头的,因为这是舍本逐末。

程墨. 深入浅出RxJS (实战) (Chinese Edition) (Kindle 位置 202-203). Kindle 版本.

我觉得这段话简直是醍醐灌顶,在我的学习道路上,有很一部分是「最新」,我喜欢最新的技术,「炫酷」也有一小部分

今后的学习,一定得脚踏实地,稳步前行,争取在被"退休"前能够进大厂吧~

创造Observable

import { Observable } from "rxjs";

// 这个函数决定了Observable对象的行为,接收一个名为subscriber的参数
// 函数体内,调用参数subscriber的next函数,将数据“推”给订阅者
// 调用参数subscriber的next方法创建新值,error方法来抛出错误,completed方法来通知完成
const onSubscribe = subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
};

// 调用Observable构造函数创建一个名为source$的数据流对象(发布者)
const source$ = new Observable(onSubscribe);

// 创建观察者
// 观察者的要求是必须有一个名为next的函数,用来接收被“推”过来的数据
const theObserver = {
  next: value => console.log(value)
};

// 调用数据流对象(发布者)的subscribe方法来订阅
// 参数为观察者
source$.subscribe(theObserver);
  1. 定义onSubscribe方法,这个函数会作为参数传递给Observable构造函数

  2. 调用Observable构造函数创建一个数据流对象source$,这时候onSubscribe函数并没有被调用,它在等待观察者调用source$subscribe方法

  3. 创建``对象(观察者),这时候观察者和发布者还没有关系

  4. 调用source$.subscribe,将theObserver对象作为参数传入,在subscribe函数被调用的时候,onSubscribe被调用,这时onSubscribe函数的参数subscriber所代表的就是观察者theObserver,但并不是theObserver本身,RxJS会对观察者做一个封装,可以把subscriber简单的理解为观察者的一个代理,对subscriber对所有函数调用都会代理到观察者theObserver对同名函数上

  5. onSubscribe函数中,调用了subscriber.next函数3次,实际上也就是调用了theObservernext函数3次

永无止境的Observable

Observable可以产生无限多的数据,并且可以跨越时间

const onSubscribe = subscriber => {
  let number = 1;
  setInterval(() => subscriber.next(number++), 1000);
};

这段代码,每隔一秒会将新数据推给观察者,直到我们强行终止之前,是不会终止的

因为Observable这样的时间特性,使得异步操作十分容易,因为观察者只需要被动接受推过来的数据,而不用关心数据是何时产生的

假如我们不终止这段代码,也不会消耗更多的内存,因为每次Observable对象只吐出一个数据,然后就被观察者Observe消化处理掉了,这种操作方式和把数据堆积到一个数组中是不一样的,如果使用数组,内存的消耗就会随着数组大小的改变而增加

Observable的完结

Observable中,next表达的是正要推送的数据,为了让Observable告诉Observer已经没有数据了,需要使用另一种通信机制,就是Observercomplete函数

修改「创建Observable」中的theObserver

const theObserver = {
  next: value => console.log(value),
  complete:()=>console.log("观察结束"),
};

运行程序,并没有发生什么,是因为Observable对象并没有调用观察者Observecomplete方法

修改onSubscribe

const onSubscribe = subscriber => {
  let number = 1;
  const timer = setInterval(() => {
    subscriber.next(number++);
    if (number > 3) {
      clearInterval(timer);
      subscriber.complete();
    }
  }, 1000);
};

在调用clearInterval后,立刻调用了complete函数,和next函数一样,对subscribercomplete函数调用最终会触发观察者theObservercomplete函数

输出结果

1
2
3
观察结束

如果Observable不主动调用complete方法,即便观察者Observer准备了complete函数,也不会发生任何事情

generator里,每次获取数据是通过next函数,并且数据中会有标示迭代结束变量isDone,Observable对象也是如此,完结信号也是由Observable推给Observer`的

Observable的出错处理

除了用next传递数据,complete表示观察结束,还需要一种「出错了」的方式,和complete同样,需要使用Observererror函数

示例代码如下

import { Observable } from "rxjs";

const onSubscribe = subscriber => {
  subscriber.next(1);
  subscriber.error("出错了");
  subscriber.complete();
};

const source$ = new Observable(onSubscribe);

const theObserver = {
  next: value => console.log(value),
  complete: () => console.log("观察结束"),
  error: error => console.error(error)
};

source$.subscribe(theObserver);

执行结果如下

1
出错了

这里值得注意的是,在调用subscribererror函数后,输出「出错了」,但是并没有输入「观察结束」,说明并没有调用接下来的complete函数

这是因为在RxJS中,一个Observable只有一种终结状态,要么完结complete,要么错误error,一旦进入出错状态,对这个Observable对象的观察也就结束了,再也不会调用next函数,也不会调用complete函数和其他的error函数

Observer的简单形式

如果不创建观察者Observer对象,subscribe是可以直接接受函数作为参数的,第一个参数被认为是next,第二个参数被认为是error,第三个参数被认为是complete

source$.subscribe(
  value => console.log(value),
  error => console.error(error),
  () => console.log("完结")
);

退订Observable

以上的都是ObservableObserver如何建立关系,因为Observable的执行可能是无限的,假如有这样一个需求,在订阅Observable三秒后,取消对Observable对订阅,应该怎么办呢?

Observable中,有一个退订unsubscribe的概念

在执行source$.subscribe后,会返回一个正在执行的Subscription,可以调用Subscriptionunsubscribe方法来退订

import { Observable } from "rxjs";

const onSubscribe = subscriber => {
  let number = 1;
  const timer = setInterval(() => {
    console.log("interval");
    subscriber.next(number++);
  }, 1000);
};

const source$ = new Observable(onSubscribe);

const subscription = source$.subscribe(value => console.log(value));

setTimeout(() => subscription.unsubscribe(), 3000);

但是,在执行后,next方法并没有接着被调用了,但是却一直在输出interval,这是因为只是对Observable取消订阅,但是Observale内部可能依旧在执行某些代码

在我们创建Observable时,必须指定Observable应该怎么执行,即是onSubscribe函数,在此函数中,可以返回自定义subscribe函数来实现unsubscribe

import { Observable } from "rxjs";

const onSubscribe = subscriber => {
  let number = 1;
  const timer = setInterval(() => {
    console.log("interval");
    subscriber.next(number++);
  }, 1000);
  return () => clearInterval(timer);
};

const source$ = new Observable(onSubscribe);

const subscription = source$.subscribe(value => console.log(value));

setTimeout(() => subscription.unsubscribe(), 3000);

可以看见控制台没有任何输出,说明内部setInterval被正确claer掉了