Rxjs常用operators

更新日期: 2019-01-03阅读: 3k标签: rxjs

本文使用的是angular6内置的rxjs,版本号为6.3.3


concat

通过顺序地发出多个 Observables 的值将它们连接起来,一个接一个的。

参数:

名称类型属性描述
otherObservableInput等待被连接的 Observable。 可以接受多个输入 Observable。
schedulerScheduler可选的,默认值: null可选的调度器,控制每个输入 Observable 的订阅。
const timer1 = interval(1000).pipe(take(10));
    const timer2 = interval(2000).pipe(take(6));
    const timer3 = interval(500).pipe(take(10));
    const result = timer1.pipe(concat(timer2, timer3), toArray());
    result.subscribe(x => console.log(x));
    // 将Observable转换为数组之后的结果
    [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 1, 2, 3, 4, 5, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


count

计算源的发送数量,并当源完成时发出该数值。

当源完成的时候,告知总共发送了多少个值。  

参数:

名称类型属性描述
predicatefunction(value: T, i: number, source: Observable): boolean可选的A boolean 函数,用来选择哪些值会被计数。 参数如下:value: 来自源的值;index: 来自投射的 Observable 的值的 "index"(从0开始);source: 源 Observable 自身实例。
// 记录1到7之间有几个奇数
const numbers = range(1, 7);
numbers.pipe(count(i => i % 2 === 1))
.subscribe(x => console.log(x));
// 结果
// 4

// 返回数组的长度
const arr = [12, 9, 2, 21, 11];
from(arr).pipe(count(x => true))
.subscribe(x => console.log(x));
// 结果
// 5


delay

通过给定的超时或者直到一个给定的时间来延迟源 Observable 的发送。

每个数据项的发出时间都往后推移固定的毫秒数.  
如果延时参数是数字, 这个操作符会将源 Observable 的发出时间都往后推移固定的毫秒数。 保存值之间的相对时间间隔.  
如果延迟参数是日期类型, 这个操作符会延时Observable的执行直到到了给定的时间.  

参数:

名称类型属性描述
delaynumber或Date延迟时间(以毫秒为单位的数字)或 Date 对象(发送延迟到这个时间点)。
schedulerScheduler可选的,默认值: async调度器,用来管理处理每项时延的定时器。
// 每次点击延迟1秒
var clicks = Rx.Observable.fromEvent(document, 'click');
var delayedClicks = clicks.delay(1000); // each click emitted after 1 second
delayedClicks.subscribe(x => console.log(x));
// 延时所有的点击直到到达未来的时间点
var clicks = Rx.Observable.fromEvent(document, 'click');
var date = new Date('March 15, 2050 12:00:00'); // in the future
var delayedClicks = clicks.delay(date); // click emitted only after that date
delayedClicks.subscribe(x => console.log(x));


distinct

可以用来做数组去重

返回 Observable,它发出由源 Observable 所发出的所有与之前的项都不相同的项。

如果提供了 keySelector 函数,那么它会将源 Observable 的每个值都投射成一个新的值,这个值会用来检查是否与先前投射的值相等。如果没有提供 keySelector 函数,它会直接使用源 Observable 的每个值来检查是否与先前的值相等。

const arr = [1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1];
from(arr).pipe(distinct(), toArray())
  .subscribe(x => console.log(x));
  // 结果
  // [1, 2, 3, 4]
  
// 对象数组
const arr1 = [{ age: 4, name: 'Foo' },
{ age: 7, name: 'Bar' },
{ age: 5, name: 'Foo' }];
from(arr1).pipe(distinct(item => item.name), toArray())
  .subscribe(x => console.log(x));
  // 结果
  // [{ age: 4, name: 'Foo' },
{ age: 7, name: 'Bar' }]


every

功能类似于lodash的every

返回的 Observable 发出是否源 Observable 的每项都满足指定的条件。

参数:

名称类型属性描述
predicatefunction用来确定每一项是否满足指定条件的函数。
thisArgany可选的可选对象,作为回调函数中的 this 使用。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': false }
];
from(users).pipe(
  every(item => !item.active)
)
  .subscribe(x => console.log(x));
  // 结果
  // true
  
from(users).pipe(
  every(item => item.user === 'barney' && item.active === false)
)
  .subscribe(x => console.log(x));
  // 结果
  // false


filter

通过只发送源 Observable 的中满足指定 predicate 函数的项来进行过滤。

类似于 Array.prototype.filter(), 它只会发出源 Observable 中符合标准函数的值。  

参数:

名称类型属性描述
predicatefunction(value: T, index: number): boolean评估源 Observable 所发出的每个值的函数。如果它返回 true,就发出值,如果是 false 则不会传给输出 Observable 。index 参数是自订阅开始后发送序列的索引,是从 0 开始的。
thisArgany可选的可选参数,用来决定 predicate 函数中的 this 的值。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': false },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  filter(item => item.active)
)
  .subscribe(x => console.log(x));
  // 结果
  // {user: "fred", age: 40, active: true}


find

只发出源 Observable 所发出的值中第一个满足条件的值。

找到第一个通过测试的值并将其发出。  

参数:

名称类型属性描述
predicatefunction(value: T, index: number, source: Observable): boolean使用每项来调用的函数,用于测试是否符合条件。
thisArgany可选的可选参数,用来决定 predicate 函数中的 this 的值。
const users = [
  { 'user': 'barney', 'age': 36, 'active': true },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': false },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];

from(users).pipe(
  find(item => item.active)
)
  .subscribe(x => console.log(x));
  // 结果
  // {user: "barney", age: 36, active: true}


findIndex

只发出源 Observable 所发出的值中第一个满足条件的值的索引。

它很像 find , 但发出的是找到的值的索引, 而不是值本身。  

参数:

名称类型属性描述
predicatefunction(value: T, index: number, source: Observable): boolean使用每项来调用的函数,用于测试是否符合条件。
thisArgany可选的可选参数,用来决定 predicate 函数中的 this 的值。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': false },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];

from(users).pipe(
  findIndex(item => item.active)
)
  .subscribe(x => console.log(x));
  // 结果
  // 1


first

只发出由源 Observable 所发出的值中第一个(或第一个满足条件的值)。

只发出第一个值。或者只发出第一个通过测试的值。  

参数:

名称类型属性描述
predicatefunction(value: T, index: number, source: Observable): boolean可选的使用每项来调用的函数,用于测试是否符合条件。
resultSelectorfunction(value: T, index: number): R可选的函数,它基于源 Observable 的值和索引来生成输出 Observable 的值。传给这个函数的参数有:value: 在源 Observable 上发出的值。index: 源值的索引。
defaultValueR可选的假如在源 Observable 上没有找到有效值,就会发出这个 默认值。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': false },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  first(item => item.active)
)
  .subscribe(x => console.log(x));
  // 结果
  // {user: "fred", age: 40, active: true}
  
from(users).pipe(
first()
)
.subscribe(x => console.log(x));
// 结果
// {user: "barney", age: 36, active: false}


isEmpty

如果源 Observable 是空的话,它返回一个发出 true 的 Observable,否则发出 false 。

const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': false },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  isEmpty()
)
  .subscribe(x => console.log(x));
  // 结果
  // false


last

返回的 Observable 只发出由源 Observable 发出的最后一个值。它可以接收一个可选的 predicate 函数作为 参数,如果传入 predicate 的话则发送的不是源 Observable 的最后一项,而是发出源 Observable 中 满足 predicate 函数的最后一项。

const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  last(item => item.active)
)
  .subscribe(x => console.log(x));
  // 结果
  // {user: "ckdf", age: 24, active: true}


map

将给定的 project 函数应用于源 Observable 发出的每个值,并将结果值作为 Observable 发出。

类似于 Array.prototype.map(), 它把每个源值传递给转化函数以获得相应的输出值。  

参数:

名称类型属性描述
projectfunction(value: T, index: number): R应用于由源 Observable 所发出的每个值的函数。index 参数是自订阅开始后发送序列的索引,是从 0 开始的。
thisArgany可选的可选参数,定义在 project 函数中的 this 是什么。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  map(item => 'hello' + item.user),
  toArray()
)
  .subscribe(x => console.log(x));
  // 结果
  // ["hellobarney", "hellofred", "hellockdf", "hellogdfg"]


mapTo

每次源 Observble 发出值时,都在输出 Observable 上发出给定的常量值。

类似于 map,但它每一次都把源值映射成同一个输出值。  

参数:

名称类型属性描述
valueany将每个源值映射成的值。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  mapTo('hello'),
  toArray()
)
  .subscribe(x => console.log(x));
  // 结果
  // ["hello", "hello", "hello", "hello"]


max

max 操作符操作的 Observable 发出数字(或可以与提供的函数进行比较的项)并且当源 Observable 完成时它发出单一项:最大值的项。

参数:

名称类型属性描述
comparerFunction可选的可选的比较函数,用它来替代默认值来比较两项的值。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  max((a, b) => a.age < b.age ? -1 : 1)
)
  .subscribe(x => console.log(x));
  // 结果
  // {user: "fred", age: 40, active: true}


merge

创建一个输出 Observable ,它可以同时发出每个给定的输入 Observable 中的所有值。

通过把多个 Observables 的值混合到一个 Observable 中 来将其打平。  
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  merge(interval(1000)),
  take(10),
  toArray()
)
  .subscribe(x => console.log(x));
  // 结果
  // [
      { 'user': 'barney', 'age': 36, 'active': false },
      { 'user': 'fred', 'age': 40, 'active': true },
      { 'user': 'ckdf', 'age': 24, 'active': true },
      { 'user': 'gdfg', 'age': 31, 'active': false },
      0,
      1,
      2,
      3,
      4,
      5
    ];


min

min 操作符操作的 Observable 发出数字(或可以使用提供函数进行比较的项)并且当源 Observable 完成时它发出单一项:最小值的项。

参数:

名称类型属性描述
comparerFunction可选的可选的比较函数,用它来替代默认值来比较两项的值。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  min((a, b) => a.age < b.age ? -1 : 1)
)
  .subscribe(x => console.log(x));
  // 结果
  // {user: "ckdf", age: 24, active: true}


reduce

在源 Observalbe 上应用 accumulator (累加器) 函数,然后当源 Observable 完成时,返回 累加的结果,可以提供一个可选的 seed 值。

使用 accumulator (累加器) 函数将源 Observable 所发出的所有值归并在一起, 该函数知道如何将新的源值纳入到过往的累加结果中。  

参数:

名称类型属性描述
accumulatorfunction(acc: R, value: T, index: number): R调用每个 源值的累加器函数。
seedR可选的初始累加值。
const arr = [1, 2, 3, 4];
from(arr).pipe(
  reduce((acc, curr) => acc + curr, 0)
)
  .subscribe(x => console.log(x));
  // 结果
  // 10


repeat

返回的 Observable 重复由源 Observable 所发出的项的流,最多可以重复 count 次。

参数:

名称类型属性描述
countnumber可选的源 Observable 项重复的次数,如果 count 为0则产生一个空的 Observable 。
const arr = [1, 2, 3, 4];
from(arr).pipe(
  repeat(3),
  toArray()
)
  .subscribe(x => console.log(x));
  // 结果
  // [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]


startWith

返回的 Observable 会先发出作为参数指定的项,然后再发出由源 Observable 所发出的项。

参数:

名称类型属性描述
values...T你希望修改过的 Observable 可以先发出的项。
schedulerScheduler可选的用于调度 next 通知发送的 IScheduler 。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  startWith({ 'user': 'jokl', 'age': 28, 'active': false }),
  toArray()
)
  .subscribe(x => console.log(x));
  // 结果
  //[
      { 'user': 'jokl', 'age': 28, 'active': false },
      { 'user': 'barney', 'age': 36, 'active': false },
      { 'user': 'fred', 'age': 40, 'active': true },
      { 'user': 'ckdf', 'age': 24, 'active': true },
      { 'user': 'gdfg', 'age': 31, 'active': false }
    ]


take

只发出源 Observable 最初发出的的N个值 (N = count)。

接收源 Observable 最初的N个值 (N = count),然后完成。  

参数:

名称类型属性描述
countnumber发出 next 通知的最大次数。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  startWith({ 'user': 'jokl', 'age': 28, 'active': false }),
  take(3),
  toArray()
)
  .subscribe(x => console.log(x));
  // 结果
  //[
      { 'user': 'jokl', 'age': 28, 'active': false },
      { 'user': 'barney', 'age': 36, 'active': false },
      { 'user': 'fred', 'age': 40, 'active': true }
    ]


takeLast

只发出源 Observable 最后发出的的N个值 (N = count)。与take类似

记住源 Observable 的最后N个值 (N = count),然后只有当 它完成时发出这些值。  

参数:

名称类型属性描述
countnumber从源 Observable 的值序列的末尾处,要发出的值的最大数量。
const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  startWith({ 'user': 'jokl', 'age': 28, 'active': false }),
  takeLast(3),
  toArray()
)
  .subscribe(x => console.log(x));
  // 结果
  //[
        { 'user': 'fred', 'age': 40, 'active': true },
        { 'user': 'ckdf', 'age': 24, 'active': true },
        { 'user': 'gdfg', 'age': 31, 'active': false }
    ]


toArray

将源 Observable转换为数组

const arr = [1, 2, 3, 4];
from(arr).pipe(
  toArray()
)
  .subscribe(x => console.log(x));
  // 结果
  // [1, 2, 3, 4]


toPromise

将 Observable 序列转换为符合 ES2015 标准的 Promise 。

const users = [
  { 'user': 'barney', 'age': 36, 'active': false },
  { 'user': 'fred', 'age': 40, 'active': true },
  { 'user': 'ckdf', 'age': 24, 'active': true },
  { 'user': 'gdfg', 'age': 31, 'active': false }
];
from(users).pipe(
  startWith({ 'user': 'jokl', 'age': 28, 'active': false }),
  takeLast(3),
  toArray()
)
  .toPromise()
  .then(x => console.log(x))
  .catch(err => console.log(err));
  // 结果
  //[
        { 'user': 'fred', 'age': 40, 'active': true },
        { 'user': 'ckdf', 'age': 24, 'active': true },
        { 'user': 'gdfg', 'age': 31, 'active': false }
    ]

来自:https://www.cnblogs.com/xzsty/archive/2019/01/02/10207504.html


链接: https://fly63.com/article/detial/1748

rxjs入门指南

在复杂的,频繁的异步请求场景,使用rxjs。在依赖的多个异步数据,决定渲染的情景,使用rxjs。总之:在前台频繁的、大量的、和后台数据交互的复杂项目里面,使用rxjs(web端,iOS,android端等,客户端都可考虑使用)

RxJS主题(Subject)

什么是主题?RxJS 主题就是一个特性类型的 Observable 对象,它允许值多路广播给观察者(Observers)。当一个简单的 Observable 是单播的(每个订阅的观察者它们自己都依赖 Observable 的执行)时候,主题(Subjects)就是多播的。

RxJS中Operators

RxJS 的操作符(operators)是最有用的,尽管 Observable 是最基本的。操作符最基本的部分(pieces)就是以申明的方式允许复杂的异步代码组合简化。操作符是函数。这里有两种操作符:

RxJS调度器(Scheduler)

调度是数据结构。它知道怎样在优先级或其他标准去存储和排队运行的任务,调度器是一个执行上下文。它表示任务在何时何地执行(例如,立即或是在回调机制中如 setTimeout 或 process.nextTick,又或是动画框架)

80 行代码实现简易 RxJS

RxJS 是一个响应式的库,它接收从事件源发出的一个个事件,经过处理管道的层层处理之后,传入最终的接收者,这个处理管道是由操作符组成的,开发者只需要选择和组合操作符就能完成各种异步逻辑,极大简化了异步编程

我每天都在使用的 10 个 RxJS 运算符

作为一名 Angular 开发人员,您可能会发现以下 RxJS 运算符在您的日常开发中很有用:map():此运算符用于转换可观察对象发出的值。它以一个函数作为参数,它接收发出的值作为输入并返回转换后的输出

内容以共享、参考、研究为目的,不存在任何商业目的。其版权属原作者所有,如有侵权或违规,请与小编联系!情况属实本人将予以删除!