# RxJS

## Programación reactiva&#x20;

Es un paradigma que junto con el *patrón Rx* te ayudará a manejar mejor el flujo de información de una aplicación.

La programación reactiva se centra en **flujos de información** como secuencias ordenadas de **datos**.

La programación reactiva es un paradigma de programación que se centra en la creación de sistemas que responden automáticamente a los cambios en su entorno. En lugar de realizar tareas de forma secuencial, los programas reactivos están diseñados para responder rápidamente a eventos y cambios en el estado del sistema. Esto se logra mediante el uso de flujos de datos y programación asincrónica, lo que permite que los programas manejen múltiples entradas y salidas simultáneamente de manera eficiente.

## Patrón Observador

En el contexto de la programación reactiva, un observador es un patrón de diseño que se utiliza para monitorear y responder a cambios en un flujo de datos.

* Observable (*Subject*): Son colecciones de multiples valores de empuje (*push*) y evaluación (*Lazy*). Representan un flujo de datos que cambian en el tiempo y se comunican con los observadores usando los siguientes tres métodos:
  1. Next: Cuando envía un dato.
  2. Complete: Cuando deja de enviar datos.
  3. Error: Cuando algo falla.
* Observador (*Observer*): Es uno o mas consumidores de valores que se suscriben a un observable, se pueden comunicar a través de los métodos:
  1. Subscribe: Para empezar a recibir valores.
  2. Unsubscribe: Para dejar de recibir valores.

<figure><img src="https://624742151-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FFRKN8coRWwuuSr24p99X%2Fuploads%2FVA4GczXJXKgsGeSn9i3B%2Fimage.png?alt=media&#x26;token=205a83fa-5cdd-4f0a-b85a-5e7f5ef0fb7f" alt=""><figcaption></figcaption></figure>

## &#x20;Tipos de operadores.

Los operadores son funciones puras, crean un nuevo observable de salida sin modificar el observable de entrada.

* **Operadores creacionales**, son los que crean observables
* **Operadores pipeables**, son los que se pueden encadenar a otros observables

## **Operadores Creacionales**

### Crear un Observable

Es un flujo de datos que emite una secuencia de eventos a lo largo del tiempo, emite los  eventos que tiene de manera independiente para cada observador que se suscribe. Son *unicast*, emite valores solo cuando se suscribe.

<figure><img src="https://624742151-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FFRKN8coRWwuuSr24p99X%2Fuploads%2F0QbaeXjptNYzFeRlzEF2%2Fimage.png?alt=media&#x26;token=7955bb1a-4f0e-4af8-af3b-394ad408c2a0" alt=""><figcaption></figcaption></figure>

```typescript
export class HomeComponent {
  // Flujo de datos
  observableAlfa$ = new Observable<number | string>((subscriber) => {
    try {
      subscriber.next(1); 
      subscriber.next(2);
      subscriber.complete();
      } catch(err) {
        subscriber.error(err);
      }
  });

  //Se va a crear un observador que se subscribirá al observable.
  observator = {
    next: (value: number | string) => {
      console.log(`The value is: ${value}`);
    },
    complete: () => {
      console.log(`Done`);
    },
    error: (error: Error) => {
      console.error(error.message);
    },
  };

  constructor() {
    // Cuando se suscribe comienza a recibir los datos del observable
    this.observableAlfa$.subscribe(this.observator);
  }
}
```

### FromEvent

Es un método que permite crear un *Observable* que recibe eventos determinados de un elemento del DOM.

```typescript
export class HomeComponent {
  // Se crea el observable que lanza un evento cuando se presione una tecla.
  onKey$ = fromEvent<KeyboardEvent>(document, 'keydown'); 

  constructor() {
    // Se crea el observador que indica como usará la data obtenida.
    const observatorMouse = {
      next: (event: KeyboardEvent) => {
        console.log('new event: ', event.key);
      },
      complete: () => {
        console.log('There is no more events');
      },
      error: (error: Error) => {
        console.log('Something went wrong: ', error.message);
      },
    };

    //Se subscribe el observador al observable.
    this.onKey$.subscribe(observatorMouse);   
  }
}
```

### Subject&#x20;

Es un tipo de Observable que **nos permite enviar los mismos valores** de una fuente a todos los observadores además también **nos permite insertar valores desde afuera del observable**. Es mult*icast* porque comparten los mismos valores entre multiples suscriptores.

<figure><img src="https://624742151-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2FFRKN8coRWwuuSr24p99X%2Fuploads%2FbN3tWTp9SmTN1Dd7uHkF%2Fimage.png?alt=media&#x26;token=14103296-4d9b-4262-b606-56a0624c8647" alt=""><figcaption></figcaption></figure>

Este tipo de observable, se puede conectar a otros observables por medio de *pipes.*

```typescript
export class HomeComponent {
  constructor() {
    const numbers$ = new Observable<number>(subscriber => {
      subscriber.next(Math.round(Math.random()*100));
    })

    const numbersRandom$ = new Subject<number>();

    const observador1 = {
      next: (value: number) => {
        console.log("the value is: ", value);
      }
    }

    const observador2 = {
      next: (value: number) => {
        console.log("the value is2: ", value);
      }
    }

    numbersRandom$.subscribe(observador1);
    numbersRandom$.subscribe(observador2);

    numbers$.subscribe(numbersRandom$); // Subject se puede conectar a otros observables
    numbersRandom$.next(Math.round(Math.random()*100));// Subject nos permite insertar valores desde afuera del observable
  }
}
```

## BehaviorSubject

Es una variante de [Subject](#subject) que tiene una noción del valor actual que almacena y emite a todas las suscripciones nuevas. Este valor actual es el elemento emitido más recientemente por la fuente observable o un valor inicial/predeterminado si aún no se ha emitido ninguno. Dado que siempre debe haber un valor actual, *BehaviorSubject* requiere un valor inicial durante la inicialización. [Subject](#subject) y *BehaviorSubject* son observables únicos que actúan como o**bservadores y observables** a la vez.

```typescript
  // store.service.ts
  ...
  private myShoppingCart: Product[] = [];
  private myCart = new BehaviorSubject<Product[]>([]);
  public myCart$ = this.myCart.asObservable();

  addProduct(product: Product): void {
    // El observable emitirá un nuevo valor con cada producto que se agregue al carrito.
    this.myShoppingCart.push(product);
    this.myCart.next(this.myShoppingCart);
  }
...
```

<pre class="language-typescript"><code class="lang-typescript"><strong>// some.component.ts
</strong><strong>...
</strong>  constructor(private storeService: StoreService) {}
 
  totalProducts = 0;
  total = 0;

  ngOnInit(): void {
    this.storeService.myCart$.subscribe((products) => {
      // Cada vez que el observable emita un valor, se ejecutará este código
      this.totalProducts = products.length;
      this.total = this.storeService.getTotal();
    });
  }
...
</code></pre>

### Observables: `from` y `of`

&#x20;Los Observables `from` y `of` nos permiten generar observables a través de una serie de datos ya definidos.

* [`of`](https://rxjs.dev/api/index/function/of) genera un Observable a través de sus parámetros.
* [`from`](https://rxjs.dev/api/index/function/from) genera un Observable a través de un arreglo. `asyncScheduler` como segundo argumento de `from` envía los valores del observable al event loop **queue**.

Ejemplo para convertir una [Promesa a Observable](https://shimozurdo.gitbook.io/frontend/fundamentos#promesa-a-observable)

### Observables: interval y time

```typescript
...

const sequenceNumbers$ = interval(200); // Emite valores numéricos cada 200 milisegundos.
const delayedTimer$ = timer(5000); // Retrasa un valor numérico por 5000 milisegundos.

sequenceNumbers$.subscribe(console.log); // console.log, Para evitar enviar todo un objeto Observador
delayedTimer$.subscribe(console.log);
```

## Operadores Pipeables

* **pipe()**: todos los operadores creacionales tienen este método, genera una cadena de operadores que se pueden enlazar uno tras otro.
* **map()**: itera sobre los valores que obtenemos del observable transformándolos.
* **filter()**: filtra los valores de un observable dada una condición.
* **reduce()**: combina todos los valores emitidos por un observable a través de una función acumuladora.

```typescript
// Operador map
const numbers1$ = from([1, 2, 3, 4, 5, 6, 7, 8]).pipe(
    map(number => number * 2),
    map(number => number * 10) // Se pueden ir encadenando mas y mas operadores
);
numbers1$.subscribe(console.log);

// Operador reduce
const numbers2$ = from([1, 2, 3, 4, 5, 6, 7, 8]).pipe(
    reduce((acc, val) => acc + val, 0)
);
numbers2$.subscribe(console.log);

// Operador filter
const numbers3$ = from([1, 2, 3, 4, 5, 6, 7, 8]).pipe(
    filter(number => number > 6)
);
numbers3$.subscribe(console.log);
```

## Operadores de distinción

La operación `distinct` de RxJS devuelve un flujo de datos que emite sólo los valores distintos que recibe, eliminando duplicados.

Por ejemplo, si tenemos un flujo de números:

```typescript
const numeros$ = from([1, 1, 2, 2, 3, 4, 5, 5]);
numeros$.pipe(
  distinct()
).subscribe(
  valor => console.log(valor)
);
```

La salida en la consola sería:

```
1
2
3
4
5
```

La operación `distinctUntilChanged` también devuelve un flujo de datos con valores únicos, pero sólo emite un valor cuando cambia el valor anterior recibido.

Por ejemplo, si tenemos un flujo de números:

```typescript
const numeros$ = from([1, 2, 2, 1, 3]);
numeros$.pipe(
  distinctUntilChanged()
).subscribe(
  valor => console.log(valor)
);
```

La salida en la consola sería:

```
1
2
1
3
```

Como podemos ver, los números repetidos no se emiten, y sólo se emite el valor cuando cambia respecto al anterior.

## Operadores de tiempo

Los operadores de tiempo nos ayudan a gestionar cómo y con qué frecuencia entregamos valores.

Hay muchos operadores de esta clase en RxJS, en esta clase te introduzco 4 de ellos:

* [throttleTime](https://rxjs.dev/api/operators/throttleTime)
* [sampleTime](https://rxjs.dev/api/operators/sampleTime)
* [auditTime](https://rxjs.dev/api/operators/auditTime)
* [debounceTime](https://rxjs.dev/api/operators/debounceTime)

**¿Por qué son importantes los operadores de tiempo?**\
Imagina que necesitamos consultar a una API por unos valores que tenemos. Si son muchos valores corremos el riesgo de utilizar muchos recursos o generar muchas peticiones y llegar a un límite establecido.

Pero a través de un operador como `sampleTime`, sólo hacemos una petición a una API cada 1 minuto, o cada 10 minutos por los valores emitidos, disminuimos la frecuencia de peticiones y el riesgo de llegar a un límite.

Por ejemplo, si tenemos un flujo de eventos de teclado que emite una cadena de texto cada vez que el usuario escribe en un campo de entrada de texto, podríamos usar `debounceTime` para retrasar la emisión de valores hasta que el usuario ha dejado de escribir por un tiempo determinado:

```javascript
...
const onKey$ = fromEvent<KeyboardEvent>(document, 'keydown');
onKey$.pipe(
      debounceTime(500)
).subscribe(
      value => console.log(value.key)
 );
```

En este ejemplo, el flujo de eventos `input$` emite un evento de teclado cada vez que el usuario escribe en el campo de entrada de texto. La operación `debounceTime` retrasa la emisión de valores durante 500 milisegundos antes de enviar la cadena de texto a la salida del flujo de datos. Esto significa que si el usuario sigue escribiendo durante ese tiempo, el valor no se emitirá hasta que se haya detenido la escritura durante al menos 500 milisegundos.

De esta forma, podemos reducir la cantidad de actualizaciones innecesarias en nuestra aplicación y mejorar la experiencia del usuario.

## Operadores: mergeAll, mergeMap

Son operadores para fucionar a partir de uno o más observables.

**`mergeAll`** → Convierte un observable de orden superior en un observable de primer orden que entrega simultáneamente todos los valores que se emiten en los observables internos.\
\
**Observable de orden superior:** Observable que emite otros observables.

```typescript
const onClick2$ = fromEvent(document, "click");
const ordenSuperior$ = onClick2$.pipe(
    map(() => interval(1000))
);

const primerOrden$ = ordenSuperior$.pipe(
    mergeAll()
);

primerOrden$.subscribe(console.log);
```

**`mergeMap`** → Proyecta cada valor de fuente a un observable que se fusiona en la salida del observable. Es una mezcla entre mapear un observador y luego mezclar todos los valores que han sido emitidos.

```typescript
const letters$ = from(["A", "B", "C"]);
const result$ = letters$.pipe(
    // Anidando el observable letters$ con el observable interval.
    mergeMap(letter => interval(1000).pipe(
        // Anida una letra por cada segundo
        map(
            second => letter + second
        )
    ))
);

result$.subscribe(console.log);
```

## Operador takeUntil

La operación `takeUntil` de RxJS se utiliza para cancelar la suscripción a un flujo de datos cuando se emite un valor en otro flujo de datos. En otras palabras, se utiliza para detener un flujo de datos cuando se cumple una cierta condición o evento.

La sintaxis básica de `takeUntil` es la siguiente:

```typescript
const source$.pipe(
  takeUntil(notifier$)
)
```

Donde `source$` es el flujo de datos que se quiere detener, y `notifier$` es el flujo de datos que emite un valor para indicar que se debe detener `source$`.

Un caso de uso común para `takeUntil` es cuando se quiere cancelar la suscripción a un flujo de datos cuando el componente o servicio asociado se destruye. En este caso, se puede crear un flujo de datos que emita un valor cuando se destruye el componente o servicio, y usar `takeUntil` para cancelar la suscripción al flujo de datos principal.

Por ejemplo, si tenemos un flujo de intervalo que emite un valor cada segundo, pero queremos detenerlo cuando el componente se destruye, podríamos hacer lo siguiente:

```javascript
import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

@Component({
  selector: 'app-ejemplo',
  templateUrl: './ejemplo.component.html'
})
export class EjemploComponent implements OnDestroy {
  private destroy$ = new Subject();
  private intervalo$ = interval(1000);

  constructor() {
    this.intervalo$.pipe(
      takeUntil(this.destroy$)
    ).subscribe(
      valor => console.log(valor)
    );
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}
```

En este ejemplo, creamos un flujo de intervalo que emite un valor cada segundo. Usamos `takeUntil` para cancelar la suscripción a `intervalo$` cuando se emite un valor en el flujo de `destroy$`, que se emite cuando se destruye el componente mediante el método `ngOnDestroy()`.

De esta manera, aseguramos que el flujo de datos `intervalo$` se detenga cuando se destruye el componente, evitando problemas de memoria y pérdidas de rendimiento en nuestra aplicación.

Otras formas para desuscribir un observable:

<https://rafaelneto.dev/blog/desuscribir-observable-behaviorsubject-angular/>

## Operadores: startWith, endWith

Los operadores `startWith` y `endWith` de RxJS se utilizan para agregar valores iniciales o finales a un flujo de datos.

* El operador `startWith` agrega un valor inicial al inicio del flujo de datos, antes de cualquier otro valor que ya esté siendo emitido por el flujo.
* El operador `endWith` agrega un valor final al final del flujo de datos, después de cualquier otro valor que ya esté siendo emitido por el flujo.

La sintaxis básica de ambos operadores es la siguiente:

```javascript
import { of } from 'rxjs';
import { startWith, endWith } from 'rxjs/operators';

const numeros$ = of(1, 2, 3);

numeros$.pipe(
  startWith(0),
  endWith(4)
).subscribe(
  valor => console.log(valor)
);
```

## Manejo de errores en RxJS

RxJS nos provee de operadores para manejar errores que pueden ocurrir en nuestro código.

* [`catchError`](https://rxjs.dev/api/operators/catchError): Captura errores en un observable retornando un nuevo observable o lanzando un error.&#x20;
* [`retry`](https://rxjs.dev/api/operators/retry): Reintenta ejecutar el observable cuando sucede un error. El número de reintentos lo puedes especificar.

⚠️ Recuerda que el orden de las funciones (en este caso **operadores**) importa al momento de manejar errores:\
Si declaramos un operador como `catchError` antes que un `retry`, el operador `retry` no se ejecutará. Por el contrario si declaramos `retry` antes que `catchError` podrían ejecutarse los dos.

[🔗 Ejemplo ](https://shimozurdo.gitbook.io/frontend/fundamentos#manejo-de-errores)

```javascript
import { of, catchError, map, retry } from "rxjs";

const letters$ = of("A", "B", "C", "D", "E").pipe(
  map((letter) => {
    if (letter === "D") {
      x = 4;
    }
    return letter;
  }),
  retry(2),
  catchError((error) => of(error.message))
);

letters$.subscribe(console.log);
```

## BehaviorSubject

Es una variante de [Subject](#subject) que tiene una noción del valor actual que almacena y emite a todas las suscripciones nuevas. Este valor actual es el elemento emitido más recientemente por la fuente observable o un valor inicial/predeterminado si aún no se ha emitido ninguno. Dado que siempre debe haber un valor actual, *BehaviorSubject* requiere un valor inicial durante la inicialización. [Subject](#subject) y *BehaviorSubject* son observables únicos que actúan como observadores y observables a la vez.

## Referencias

<https://rxmarbles.com/>
