题外话
为什么想要学习RxJS
,虽然知道RxJS
在异步编程中很强大,但是在现在不管是Promise
还是async/await
,对于熟悉javascript
的程序员来说,都是很容易使用的,异步代码的执行越来越简单了
但是我认为,学习不止是使用,更重要的是思想,相信能在Rxjs
的学习中,对函数式,响应式编程能有更深的理解
其实还有一点是之前看见有人发leetcode
的招聘,技术栈是typescript + react + graphQL + rxjs
,这简直是太棒了,梦想一般的技术栈,然而人家肯定看不上现在的我,所以我要学习,成长!
学习Rxjs
,我是通过官方文档 + 程墨出版的《深入浅出RxJS》
,在书中,我看见了他的一段话
当然,我们学习RxJS,并不是因为RxJS是一项炫酷的技术,也不是因为RxJS是一个最新的技术。在技术的道路上,如果只是追逐“炫酷”和“最新”,肯定是要吃苦头的,因为这是舍本逐末。
程墨. 深入浅出RxJS (实战) (Chinese Edition) (Kindle 位置 202-203). Kindle 版本.
我觉得这段话简直是醍醐灌顶,在我的学习道路上,有很一部分是「最新」,我喜欢最新的技术,「炫酷」也有一小部分
今后的学习,一定得脚踏实地,稳步前行,争取在被"退休"前能够进大厂吧~
创造Observable
import { Observable } from "rxjs";
// 这个函数决定了Observable对象的行为,接收一个名为subscriber的参数
// 函数体内,调用参数subscriber的next函数,将数据“推”给订阅者
// 调用参数subscriber的next方法创建新值,error方法来抛出错误,completed方法来通知完成
const onSubscribe = subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
};
// 调用Observable构造函数创建一个名为source$的数据流对象(发布者)
const source$ = new Observable(onSubscribe);
// 创建观察者
// 观察者的要求是必须有一个名为next的函数,用来接收被“推”过来的数据
const theObserver = {
next: value => console.log(value)
};
// 调用数据流对象(发布者)的subscribe方法来订阅
// 参数为观察者
source$.subscribe(theObserver);
-
定义
onSubscribe
方法,这个函数会作为参数传递给Observable
构造函数 -
调用
Observable
构造函数创建一个数据流对象source$
,这时候onSubscribe
函数并没有被调用,它在等待观察者调用source$
的subscribe
方法 -
创建``对象(观察者),这时候观察者和发布者还没有关系
-
调用
source$.subscribe
,将theObserver
对象作为参数传入,在subscribe
函数被调用的时候,onSubscribe
被调用,这时onSubscribe
函数的参数subscriber
所代表的就是观察者theObserver
,但并不是theObserver
本身,RxJS
会对观察者做一个封装,可以把subscriber
简单的理解为观察者的一个代理,对subscriber
对所有函数调用都会代理到观察者theObserver
对同名函数上 -
在
onSubscribe
函数中,调用了subscriber.next
函数3次,实际上也就是调用了theObserver
的next
函数3次
永无止境的Observable
Observable
可以产生无限多的数据,并且可以跨越时间
const onSubscribe = subscriber => {
let number = 1;
setInterval(() => subscriber.next(number++), 1000);
};
这段代码,每隔一秒会将新数据推给观察者,直到我们强行终止之前,是不会终止的
因为Observable
这样的时间特性,使得异步操作十分容易,因为观察者只需要被动接受推过来的数据,而不用关心数据是何时产生的
假如我们不终止这段代码,也不会消耗更多的内存,因为每次Observable
对象只吐出一个数据,然后就被观察者Observe
消化处理掉了,这种操作方式和把数据堆积到一个数组中是不一样的,如果使用数组,内存的消耗就会随着数组大小的改变而增加
Observable的完结
在Observable
中,next
表达的是正要推送的数据,为了让Observable
告诉Observer
已经没有数据了,需要使用另一种通信机制,就是Observer
的complete
函数
修改「创建Observable
」中的theObserver
const theObserver = {
next: value => console.log(value),
complete:()=>console.log("观察结束"),
};
运行程序,并没有发生什么,是因为Observable
对象并没有调用观察者Observe
的complete
方法
修改onSubscribe
const onSubscribe = subscriber => {
let number = 1;
const timer = setInterval(() => {
subscriber.next(number++);
if (number > 3) {
clearInterval(timer);
subscriber.complete();
}
}, 1000);
};
在调用clearInterval
后,立刻调用了complete
函数,和next
函数一样,对subscriber
的complete
函数调用最终会触发观察者theObserver
的complete
函数
输出结果
1
2
3
观察结束
如果Observable
不主动调用complete
方法,即便观察者Observer
准备了complete
函数,也不会发生任何事情
在generator
里,每次获取数据是通过next
函数,并且数据中会有标示迭代结束变量isDone
,Observable对象也是如此,完结信号也是由
Observable推给
Observer`的
Observable的出错处理
除了用next
传递数据,complete
表示观察结束,还需要一种「出错了」的方式,和complete
同样,需要使用Observer
的error
函数
示例代码如下
import { Observable } from "rxjs";
const onSubscribe = subscriber => {
subscriber.next(1);
subscriber.error("出错了");
subscriber.complete();
};
const source$ = new Observable(onSubscribe);
const theObserver = {
next: value => console.log(value),
complete: () => console.log("观察结束"),
error: error => console.error(error)
};
source$.subscribe(theObserver);
执行结果如下
1
出错了
这里值得注意的是,在调用subscriber
的error
函数后,输出「出错了」,但是并没有输入「观察结束」,说明并没有调用接下来的complete
函数
这是因为在RxJS
中,一个Observable
只有一种终结状态,要么完结complete
,要么错误error
,一旦进入出错状态,对这个Observable
对象的观察也就结束了,再也不会调用next
函数,也不会调用complete
函数和其他的error
函数
Observer的简单形式
如果不创建观察者Observer
对象,subscribe
是可以直接接受函数作为参数的,第一个参数被认为是next
,第二个参数被认为是error
,第三个参数被认为是complete
source$.subscribe(
value => console.log(value),
error => console.error(error),
() => console.log("完结")
);
退订Observable
以上的都是Observable
和Observer
如何建立关系,因为Observable
的执行可能是无限的,假如有这样一个需求,在订阅Observable
三秒后,取消对Observable
对订阅,应该怎么办呢?
在Observable
中,有一个退订unsubscribe
的概念
在执行source$.subscribe
后,会返回一个正在执行的Subscription
,可以调用Subscription
的unsubscribe
方法来退订
import { Observable } from "rxjs";
const onSubscribe = subscriber => {
let number = 1;
const timer = setInterval(() => {
console.log("interval");
subscriber.next(number++);
}, 1000);
};
const source$ = new Observable(onSubscribe);
const subscription = source$.subscribe(value => console.log(value));
setTimeout(() => subscription.unsubscribe(), 3000);
但是,在执行后,next
方法并没有接着被调用了,但是却一直在输出interval
,这是因为只是对Observable
取消订阅,但是Observale
内部可能依旧在执行某些代码
在我们创建Observable
时,必须指定Observable
应该怎么执行,即是onSubscribe
函数,在此函数中,可以返回自定义subscribe
函数来实现unsubscribe
import { Observable } from "rxjs";
const onSubscribe = subscriber => {
let number = 1;
const timer = setInterval(() => {
console.log("interval");
subscriber.next(number++);
}, 1000);
return () => clearInterval(timer);
};
const source$ = new Observable(onSubscribe);
const subscription = source$.subscribe(value => console.log(value));
setTimeout(() => subscription.unsubscribe(), 3000);
可以看见控制台没有任何输出,说明内部setInterval
被正确claer
掉了