Angular 响应式编程与 RxJs

从事件循环机制开始

众所周知,由于 JavaScript 是以单线程的方式运行的,作为浏览器脚本语言,其需要对DOM进行操作,如果以多线程的方式运行,恰好又同时对同一个DOM进行操作,这个时候是会出现冲突的。那么又由于它的单线程运行方式,意味着每一个任务都需要排队等待上一个任务的结束,这个时候就有可能在代码执行过程中因为某一项任务需要消耗长时间等待,导致阻塞后面的代码执行。

为了能够不阻塞代码的执行,或者说不需要等待某些任务完成就能执行下一个任务,JavaScript 设计出了事件循环机制(event loop mechanism)来区分【同步任务】和【异步任务】:通俗来讲就是同步任务从头到尾一直不间断的执行,因为只有极短的的等待时间,JavaScript 引擎发现了【异步任务】之后不立马执行,而是让它们进入一个任务队列,等所有的【同步任务】都运行完成后,再一次执行异步任务队列里面的任务。

同时【异步任务】又区分成了【微任务】和【宏任务】,也有两个不同的任务队列,执行顺序是先微后宏。

什么是响应式编程

响应式编程是使用异步数据流进行编程

Event buses 或者 Click events 本质上就是异步事件流,你可以监听并处理这些事件。响应式编程的思路大概如下:你可以用包括 Click 和 Hover 事件在内的任何东西创建 Data stream。Stream 廉价且常见,任何东西都可以是一个 Stream:变量、用户输入、属性、Cache、数据结构等等。

点击事件监听:

const element = document.getElementById('myElement');

element.addEventListener('click', () => {
	//...
})

也就是说 myElement 上的点击事件就是一个事件流,这个按钮可以点击很多次,每一次点击行为都能被捕获到,然后在回调函数中做任何想做的事情。

其实,这并不是什么新东西,而是新瓶装旧酒,把编程中常用的发布-订阅模式发扬光大,然后形成一整套可供我们高效解决问题的编程范式。

发布-订阅模式是什么

发布-订阅模式(Pub/Sub)需要一个中介连接【发布者】和【订阅者】,且两者互相是互相独立的状态,【发布者】发送状态变更到中介,【订阅者】从中介订阅消息:当【发布者】发布状态改变时,所有【订阅者】都会收到通知。

Promise & async/await

前面事件循环机制中讲到了【异步任务】,它们被放在所有【同步任务】事件后执行,机制如此限制,但现实中的 web 行为却不能够完全按照这个顺序进行下去:例如某一个需要被展示在页面的数字是根据从 http 请求返回的结果中的某一个值计算得来的。

那么就需要对这些【异步任务】作特殊处理:需要等待请求返回之后,再执行某些计算逻辑。

在 promise 和 async/await 出现之前,是通过调用回调函数来解决的,不过当【异步任务】足够多且回调函数中逻辑复杂时,就很容易出现回调函数的层层嵌套,也就是回调地狱。

Promise 能帮什么忙

一个 Promise 是一个代理,它代表一个在创建 promise 时不一定已知的值。它允许你将处理程序与异步操作的最终成功值或失败原因关联起来。这使得异步方法可以像同步方法一样返回值:异步方法不会立即返回最终值,而是返回一个 promise,以便在将来的某个时间点提供该值。

Promise 总共有三种状态:pending, fulfilled*,* rejected 。通过调用其构造函数回调参数中的 resolve 和 reject 方法,可以手动改变 Promise 的状态。最主要的是可以通过 then 链式调用,以更清晰的方式处理复杂的逻辑,不再让回调地狱发生。

但 Promise 仍旧存在一些问题:

  • 一旦开始之后就没有办法取消
  • 一次只能返回一个值

async/await 又改变了什么

async 函数是使用async关键字声明的函数。async 函数是 [AsyncFunction](<https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/AsyncFunction>) 构造函数的实例,并且其中允许使用 await 关键字。asyncawait 关键字让我们可以用一种更简洁的方式写出基于 [Promise](<https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Promise>) 的异步行为,而无需刻意地链式调用 promise

async/await 从真正意义上的做到了串形同步写法,只要在函数中加上两个关键字,就能够直观的理解为代码是从上往下执行的。

async/await 还有缺点吗?

  • 在处理没有依赖关系的异步请求时,由于执行会等待其完成,因此可能会造成性能的下降
  • async/await 本身不能并行的处理多个异步请求(可以通过for循环的方式做到并行)

RxJs 是什么

Think of RxJS as Lodash for events.

对于 RxJs 来说,虽然 Observable 是其核心属性,但操作符(operators)才是真正发挥其优势的关键,就如同 Lodash 中的方法一样,操作符本身就是一系列的函数,用来处理事件流中的数据,或者创建/修改事件流。

一个不恰当的比喻:事件流就像是工厂的流水线(Observable),每一个数据就好比包裹,随着传动带的运行(时间的流逝),包裹逐渐流向某一个位置(订阅者),在这个过程当中会有多个包裹经过(多个值被返回)。

RxJs 提供了一个核心类型:observable,以及其附属类型:observer,subscription,subject,scheduler。

Observable

Observables are lazy Push collections of multiple values.They fill the missing spot in the following table:

SINGLEMULTIPLE
PULLFunctionIterator
PUSHPromiseObservable

Observable 的字面意思是「可观察的」,从官网的解释来看 Observable 可以表示一个多值的异步数据流。

与「发布-订阅模式」相比,两者的设计模式非常类似。但因为订阅者可以自定义其响应行为,这种模式中的状态管理可能需要额外的逻辑来确保数据的一致性和可靠性;而 Observable 提供了更加结构化的方式来处理数据流,提供了内置的方法和操作符来管理状态、错误处理和数据转换。

在 Observable 中使用 pipe ,将数据流使用所需要的操作符和处理逻辑函数处理,可以更加直观便捷的达到我们需要的目的。

Observer

Observer 是 Observable 的回调函数,包含三个回调参数:next/error/complete ,均为可选参数,默认为 next ,当 subscribe 中什么参数也不传的时候 observable 也可以执行。

需要注意的是,当执行到 errorcomplete 时,当前的执行就会结束。

const observer = {
  next: x => {},
  error: err => {}
  complete: () => {},
};
observable.subscribe(observer);

observable.subscribe(() => {}); // 默认为next

Subscription

Subscription 很简单,它表示 Observable 的执行,subscription = observable.subscribe()

同时它有一个很重要的方法: subscription.unsubscibe() ,用于取消订阅。无论是从节省资源和性能角度来看,任何时候都应该取消不必要的 Observable。

Subject

An RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers.

Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式

维护多个订阅者的注册表,当有新值发出时,通过调用 next() ,多播到每一个订阅者,同样需要 subscribe() 订阅,才能执行。

  • BehaviorSubject:保留最后一个抛出的值,当被订阅时,立即抛出这个值
  • ReplaySubject:根据参数保留多个抛出的值, new ReplaySubject(3) (保留三个),当被订阅时,立即抛出这些值
  • AsyncSubject:只有当执行到 complete() 时,才抛出最后一个值(类似 last() 操作符)
  • Void subject:如果不在乎抛出值的类型,或者没有值抛出时,可以用 new Subject<void>()

利用 Subject 能做到多播的特性,可以在复杂系统中统一的管理数据的收发状态。

let _subject = new Subject();
let _subscriptions = new Map<string, Subscription>();

// 添加新的订阅
addSubjectListener(key: string, type: string, cb: (evt) => void) {
	// 每个订阅的唯一标识:key,如果有重复添加的情况,先取消上一个订阅
  if (this._subscriptions.has(key)) {
    this._subscriptions.get(key)?.unsubscribe();
  }
  const subscription: Subscription = this._subject.subscribe((e) => {
    if (type.includes(e.type)) {
      cb(e);
    }
  });
  this._subscriptions.set(key, subscription);
}

// 移除对应的订阅
removeSubjectListener(key: string) {
  this._subscriptions.get(key)?.unsubscribe();
  this._subscriptions.delete(key);
}

// 清空所有的订阅
removeAllSubjectListener() {
  this._subscriptions.forEach((sub) => {
    sub.unsubscribe();
  });
  this._subscriptions.clear();
}

// 发送事件
sendSubjectEvent(type: string, data?: any) {
  const body: MaterialPackageSubjectEvent = {
    type: type,
    data: data,
  };
  this._subject.next(body);
}

Scheduler

// todo

Angular 与 RxJs 的关系

如果你使用 Angular CLI 创建过项目,你就会发现当项目创建好之后,你的 package.json 中就已经存在最新版本的 rxjs 了,Angular 官网也有 RxJs 的引用介绍。

Angular 中深度集成了 RxJs,很多地方都能够看到它的身影,同时这些方法/属性能在大多数场景下帮住我们在开发过程中简易的实现想要的效果,而不是绞尽脑汁寻找各种奇怪的组合拳。

  • EventEmitter:Subject 的扩展,通过 @Output() 定义,并调用 emit() ,在组件之间发送值,调用 emit() 方法时,所发送的值会传给订阅者的 next() 。 也就是下面代码中的事件绑定 (myEmitter)
// 子组件(app-child) ts 
@Output() myEmitter = new EventEmitter<boolean>();

emitToFather() {
	this.myEmitter.emit(true);
}

// 父组件 html
<app-child (myEmitter)="myFunc($event)"></app-child>

// 父组件 ts
myFunc(value: boolean) {
  //...
}
  • Reactive Form:通过订阅 FormControlvalueChangesstatusChanges 可观察对象,订阅到表单的变化:
myForm!: FormGroup;

OnInit() {
	this.myForm.valueChange.subscribe((res) => {
		//...
	})

	this.myForm.statusChange.subscribe((res) => {
		//...
	})

}
  • Router:Router.events ActivatedRoute.url(params/queryParams/fragment/data/paramMap/queryParamMap) 包含路由信息的可观察对象,当路由发生变化时,可以获取到所有路由相关的路由信息:
import {  Router } from '@angular/router';
//...

constructor(private router: Router) {}

this.router.events.subscribe((e: any) => {
	//...
});
  • async pipe:async pipe 订阅一个 Observable 或者 Promise,当其订阅的 Observable 有返回新值时,会将这个组件标记为需要进行变更检测,刷新页面。当订阅对象是 Observable 时,只会触发一次 subscribe,并且事件完成后会自动取消订阅:
<div *ngIf="myObserver | async as myData">
  <!-- 这里应该展示 1 -->
	{{myData}}
</div>

<!-- 或者这种写法 -->
<div>
	{{myData | async}}
</div>
import { Observer, of } from 'rxjs';
//...

myObserver!: Observer<any>

OnInit() {
	this.myObserver = of(1);
}

// promise 版
// myPromise!: Promise<any>
// OnInit() {
//	this.myPromise = new Promise((reslove) => resolve(1))
// }
  • HTTP:HttpClient 的调用方法: get/put/post/delete 会返回 Observable 对象,默认http 的订阅在请求完成之后会自动关闭,通常情况下不需要手动取消订阅。(但在某些场景下还是建议取消,比如当请求还在 pending 状态时切换到了另一个页面):
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';

constructor(private http: HttpClient) { }

myRequst(): Observable<any> {
    return this.http.get('url');
 }

RxJs 有啥光环 & 最后的总结

RxJs 不仅语法简洁明了,并且为异步编程带来了很多的以前无法实现或者需要通过改造才能达到的效果:

  • 只要Observable存在,就能够一直收到数据流
  • 使用管道操作符能够对数据流进行任何操作
  • 通过 forkJoin 等操作符能实现多个异步请求的并行处理
  • 能通过简单的方式随时取消对 Observable 的订阅,优化不必要再进行的请求
  • 通过 Subject 多播能做到每个模块间订阅同一个Obsevable但又能根据需求处理不同的数据

引用

https://github.com/JChehe/blog/blob/master/posts/关于JavaScript单线程的一些事.md
https://coffe1891.gitbook.io/frontend-hard-mode-interview/5/5.3.1
https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Global_Objects/Promise
https://developer.mozilla.org/zh-CN/docs/Web/JavaScript/Reference/Statements/async_function
https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
https://rxjs.dev/guide/observable
https://juejin.cn/post/6910943445569765384
https://angular.io/guide/observables-in-angular

Leave a Reply

Your email address will not be published. Required fields are marked *