RxJS

Programación reactiva

Es un paradigma que junto con el patrón Rx te ayudará a manejar mejor el flujo de información de una aplicación.

La programación reactiva se centra en flujos de información como secuencias ordenadas de datos.

La programación reactiva es un paradigma de programación que se centra en la creación de sistemas que responden automáticamente a los cambios en su entorno. En lugar de realizar tareas de forma secuencial, los programas reactivos están diseñados para responder rápidamente a eventos y cambios en el estado del sistema. Esto se logra mediante el uso de flujos de datos y programación asincrónica, lo que permite que los programas manejen múltiples entradas y salidas simultáneamente de manera eficiente.

Patrón Observador

En el contexto de la programación reactiva, un observador es un patrón de diseño que se utiliza para monitorear y responder a cambios en un flujo de datos.

  • Observable (Subject): Son colecciones de multiples valores de empuje (push) y evaluación (Lazy). Representan un flujo de datos que cambian en el tiempo y se comunican con los observadores usando los siguientes tres métodos:

    1. Next: Cuando envía un dato.

    2. Complete: Cuando deja de enviar datos.

    3. Error: Cuando algo falla.

  • Observador (Observer): Es uno o mas consumidores de valores que se suscriben a un observable, se pueden comunicar a través de los métodos:

    1. Subscribe: Para empezar a recibir valores.

    2. Unsubscribe: Para dejar de recibir valores.

Tipos de operadores.

Los operadores son funciones puras, crean un nuevo observable de salida sin modificar el observable de entrada.

  • Operadores creacionales, son los que crean observables

  • Operadores pipeables, son los que se pueden encadenar a otros observables

Operadores Creacionales

Crear un Observable

Es un flujo de datos que emite una secuencia de eventos a lo largo del tiempo, emite los eventos que tiene de manera independiente para cada observador que se suscribe. Son unicast, emite valores solo cuando se suscribe.

export class HomeComponent {
  // Flujo de datos
  observableAlfa$ = new Observable<number | string>((subscriber) => {
    try {
      subscriber.next(1); 
      subscriber.next(2);
      subscriber.complete();
      } catch(err) {
        subscriber.error(err);
      }
  });

  //Se va a crear un observador que se subscribirá al observable.
  observator = {
    next: (value: number | string) => {
      console.log(`The value is: ${value}`);
    },
    complete: () => {
      console.log(`Done`);
    },
    error: (error: Error) => {
      console.error(error.message);
    },
  };

  constructor() {
    // Cuando se suscribe comienza a recibir los datos del observable
    this.observableAlfa$.subscribe(this.observator);
  }
}

FromEvent

Es un método que permite crear un Observable que recibe eventos determinados de un elemento del DOM.

export class HomeComponent {
  // Se crea el observable que lanza un evento cuando se presione una tecla.
  onKey$ = fromEvent<KeyboardEvent>(document, 'keydown'); 

  constructor() {
    // Se crea el observador que indica como usará la data obtenida.
    const observatorMouse = {
      next: (event: KeyboardEvent) => {
        console.log('new event: ', event.key);
      },
      complete: () => {
        console.log('There is no more events');
      },
      error: (error: Error) => {
        console.log('Something went wrong: ', error.message);
      },
    };

    //Se subscribe el observador al observable.
    this.onKey$.subscribe(observatorMouse);   
  }
}

Subject

Es un tipo de Observable que nos permite enviar los mismos valores de una fuente a todos los observadores además también nos permite insertar valores desde afuera del observable. Es multicast porque comparten los mismos valores entre multiples suscriptores.

Este tipo de observable, se puede conectar a otros observables por medio de pipes.

export class HomeComponent {
  constructor() {
    const numbers$ = new Observable<number>(subscriber => {
      subscriber.next(Math.round(Math.random()*100));
    })

    const numbersRandom$ = new Subject<number>();

    const observador1 = {
      next: (value: number) => {
        console.log("the value is: ", value);
      }
    }

    const observador2 = {
      next: (value: number) => {
        console.log("the value is2: ", value);
      }
    }

    numbersRandom$.subscribe(observador1);
    numbersRandom$.subscribe(observador2);

    numbers$.subscribe(numbersRandom$); // Subject se puede conectar a otros observables
    numbersRandom$.next(Math.round(Math.random()*100));// Subject nos permite insertar valores desde afuera del observable
  }
}

BehaviorSubject

Es una variante de Subject que tiene una noción del valor actual que almacena y emite a todas las suscripciones nuevas. Este valor actual es el elemento emitido más recientemente por la fuente observable o un valor inicial/predeterminado si aún no se ha emitido ninguno. Dado que siempre debe haber un valor actual, BehaviorSubject requiere un valor inicial durante la inicialización. Subject y BehaviorSubject son observables únicos que actúan como observadores y observables a la vez.

  // store.service.ts
  ...
  private myShoppingCart: Product[] = [];
  private myCart = new BehaviorSubject<Product[]>([]);
  public myCart$ = this.myCart.asObservable();

  addProduct(product: Product): void {
    // El observable emitirá un nuevo valor con cada producto que se agregue al carrito.
    this.myShoppingCart.push(product);
    this.myCart.next(this.myShoppingCart);
  }
...
// some.component.ts
...
  constructor(private storeService: StoreService) {}
 
  totalProducts = 0;
  total = 0;

  ngOnInit(): void {
    this.storeService.myCart$.subscribe((products) => {
      // Cada vez que el observable emita un valor, se ejecutará este código
      this.totalProducts = products.length;
      this.total = this.storeService.getTotal();
    });
  }
...

Observables: from y of

Los Observables from y of nos permiten generar observables a través de una serie de datos ya definidos.

  • of genera un Observable a través de sus parámetros.

  • from genera un Observable a través de un arreglo. asyncScheduler como segundo argumento de from envía los valores del observable al event loop queue.

Ejemplo para convertir una Promesa a Observable

Observables: interval y time

...

const sequenceNumbers$ = interval(200); // Emite valores numéricos cada 200 milisegundos.
const delayedTimer$ = timer(5000); // Retrasa un valor numérico por 5000 milisegundos.

sequenceNumbers$.subscribe(console.log); // console.log, Para evitar enviar todo un objeto Observador
delayedTimer$.subscribe(console.log);

Operadores Pipeables

  • pipe(): todos los operadores creacionales tienen este método, genera una cadena de operadores que se pueden enlazar uno tras otro.

  • map(): itera sobre los valores que obtenemos del observable transformándolos.

  • filter(): filtra los valores de un observable dada una condición.

  • reduce(): combina todos los valores emitidos por un observable a través de una función acumuladora.

// Operador map
const numbers1$ = from([1, 2, 3, 4, 5, 6, 7, 8]).pipe(
    map(number => number * 2),
    map(number => number * 10) // Se pueden ir encadenando mas y mas operadores
);
numbers1$.subscribe(console.log);

// Operador reduce
const numbers2$ = from([1, 2, 3, 4, 5, 6, 7, 8]).pipe(
    reduce((acc, val) => acc + val, 0)
);
numbers2$.subscribe(console.log);

// Operador filter
const numbers3$ = from([1, 2, 3, 4, 5, 6, 7, 8]).pipe(
    filter(number => number > 6)
);
numbers3$.subscribe(console.log);

Operadores de distinción

La operación distinct de RxJS devuelve un flujo de datos que emite sólo los valores distintos que recibe, eliminando duplicados.

Por ejemplo, si tenemos un flujo de números:

const numeros$ = from([1, 1, 2, 2, 3, 4, 5, 5]);
numeros$.pipe(
  distinct()
).subscribe(
  valor => console.log(valor)
);

La salida en la consola sería:

1
2
3
4
5

La operación distinctUntilChanged también devuelve un flujo de datos con valores únicos, pero sólo emite un valor cuando cambia el valor anterior recibido.

Por ejemplo, si tenemos un flujo de números:

const numeros$ = from([1, 2, 2, 1, 3]);
numeros$.pipe(
  distinctUntilChanged()
).subscribe(
  valor => console.log(valor)
);

La salida en la consola sería:

1
2
1
3

Como podemos ver, los números repetidos no se emiten, y sólo se emite el valor cuando cambia respecto al anterior.

Operadores de tiempo

Los operadores de tiempo nos ayudan a gestionar cómo y con qué frecuencia entregamos valores.

Hay muchos operadores de esta clase en RxJS, en esta clase te introduzco 4 de ellos:

¿Por qué son importantes los operadores de tiempo? Imagina que necesitamos consultar a una API por unos valores que tenemos. Si son muchos valores corremos el riesgo de utilizar muchos recursos o generar muchas peticiones y llegar a un límite establecido.

Pero a través de un operador como sampleTime, sólo hacemos una petición a una API cada 1 minuto, o cada 10 minutos por los valores emitidos, disminuimos la frecuencia de peticiones y el riesgo de llegar a un límite.

Por ejemplo, si tenemos un flujo de eventos de teclado que emite una cadena de texto cada vez que el usuario escribe en un campo de entrada de texto, podríamos usar debounceTime para retrasar la emisión de valores hasta que el usuario ha dejado de escribir por un tiempo determinado:

...
const onKey$ = fromEvent<KeyboardEvent>(document, 'keydown');
onKey$.pipe(
      debounceTime(500)
).subscribe(
      value => console.log(value.key)
 );

En este ejemplo, el flujo de eventos input$ emite un evento de teclado cada vez que el usuario escribe en el campo de entrada de texto. La operación debounceTime retrasa la emisión de valores durante 500 milisegundos antes de enviar la cadena de texto a la salida del flujo de datos. Esto significa que si el usuario sigue escribiendo durante ese tiempo, el valor no se emitirá hasta que se haya detenido la escritura durante al menos 500 milisegundos.

De esta forma, podemos reducir la cantidad de actualizaciones innecesarias en nuestra aplicación y mejorar la experiencia del usuario.

Operadores: mergeAll, mergeMap

Son operadores para fucionar a partir de uno o más observables.

mergeAll → Convierte un observable de orden superior en un observable de primer orden que entrega simultáneamente todos los valores que se emiten en los observables internos. Observable de orden superior: Observable que emite otros observables.

const onClick2$ = fromEvent(document, "click");
const ordenSuperior$ = onClick2$.pipe(
    map(() => interval(1000))
);

const primerOrden$ = ordenSuperior$.pipe(
    mergeAll()
);

primerOrden$.subscribe(console.log);

mergeMap → Proyecta cada valor de fuente a un observable que se fusiona en la salida del observable. Es una mezcla entre mapear un observador y luego mezclar todos los valores que han sido emitidos.

const letters$ = from(["A", "B", "C"]);
const result$ = letters$.pipe(
    // Anidando el observable letters$ con el observable interval.
    mergeMap(letter => interval(1000).pipe(
        // Anida una letra por cada segundo
        map(
            second => letter + second
        )
    ))
);

result$.subscribe(console.log);

Operador takeUntil

La operación takeUntil de RxJS se utiliza para cancelar la suscripción a un flujo de datos cuando se emite un valor en otro flujo de datos. En otras palabras, se utiliza para detener un flujo de datos cuando se cumple una cierta condición o evento.

La sintaxis básica de takeUntil es la siguiente:

const source$.pipe(
  takeUntil(notifier$)
)

Donde source$ es el flujo de datos que se quiere detener, y notifier$ es el flujo de datos que emite un valor para indicar que se debe detener source$.

Un caso de uso común para takeUntil es cuando se quiere cancelar la suscripción a un flujo de datos cuando el componente o servicio asociado se destruye. En este caso, se puede crear un flujo de datos que emita un valor cuando se destruye el componente o servicio, y usar takeUntil para cancelar la suscripción al flujo de datos principal.

Por ejemplo, si tenemos un flujo de intervalo que emite un valor cada segundo, pero queremos detenerlo cuando el componente se destruye, podríamos hacer lo siguiente:

import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

@Component({
  selector: 'app-ejemplo',
  templateUrl: './ejemplo.component.html'
})
export class EjemploComponent implements OnDestroy {
  private destroy$ = new Subject();
  private intervalo$ = interval(1000);

  constructor() {
    this.intervalo$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(
      valor => console.log(valor)
    );
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

En este ejemplo, creamos un flujo de intervalo que emite un valor cada segundo. Usamos takeUntil para cancelar la suscripción a intervalo$ cuando se emite un valor en el flujo de destroy$, que se emite cuando se destruye el componente mediante el método ngOnDestroy().

De esta manera, aseguramos que el flujo de datos intervalo$ se detenga cuando se destruye el componente, evitando problemas de memoria y pérdidas de rendimiento en nuestra aplicación.

Otras formas para desuscribir un observable:

https://rafaelneto.dev/blog/desuscribir-observable-behaviorsubject-angular/

Operadores: startWith, endWith

Los operadores startWith y endWith de RxJS se utilizan para agregar valores iniciales o finales a un flujo de datos.

  • El operador startWith agrega un valor inicial al inicio del flujo de datos, antes de cualquier otro valor que ya esté siendo emitido por el flujo.

  • El operador endWith agrega un valor final al final del flujo de datos, después de cualquier otro valor que ya esté siendo emitido por el flujo.

La sintaxis básica de ambos operadores es la siguiente:

import { of } from 'rxjs';
import { startWith, endWith } from 'rxjs/operators';

const numeros$ = of(1, 2, 3);

numeros$.pipe(
  startWith(0),
  endWith(4)
).subscribe(
  valor => console.log(valor)
);

Manejo de errores en RxJS

RxJS nos provee de operadores para manejar errores que pueden ocurrir en nuestro código.

  • catchError: Captura errores en un observable retornando un nuevo observable o lanzando un error.

  • retry: Reintenta ejecutar el observable cuando sucede un error. El número de reintentos lo puedes especificar.

⚠️ Recuerda que el orden de las funciones (en este caso operadores) importa al momento de manejar errores: Si declaramos un operador como catchError antes que un retry, el operador retry no se ejecutará. Por el contrario si declaramos retry antes que catchError podrían ejecutarse los dos.

🔗 Ejemplo

import { of, catchError, map, retry } from "rxjs";

const letters$ = of("A", "B", "C", "D", "E").pipe(
  map((letter) => {
    if (letter === "D") {
      x = 4;
    }
    return letter;
  }),
  retry(2),
  catchError((error) => of(error.message))
);

letters$.subscribe(console.log);

BehaviorSubject

Es una variante de Subject que tiene una noción del valor actual que almacena y emite a todas las suscripciones nuevas. Este valor actual es el elemento emitido más recientemente por la fuente observable o un valor inicial/predeterminado si aún no se ha emitido ninguno. Dado que siempre debe haber un valor actual, BehaviorSubject requiere un valor inicial durante la inicialización. Subject y BehaviorSubject son observables únicos que actúan como observadores y observables a la vez.

Referencias

https://rxmarbles.com/

Last updated