Rx IV – Linq: operaciones con observables

15 Feb 2012 · 7 mins. de lectura

Hemos llegado al cuarto artículo sobre reactive extensions en el que vamos a hablar de operaciones que se pueden realizar con observables. Pero, como hace unos días que no publicábamos nada al respecto, vamos a hacer primero una pequeña retrospectiva. Hasta ahora hemos tratado de explicar rx como una fórmula matemática:

Dentro de esta formula hemos intentado exponer que el componente básico son los sujetos (observables), y también las operaciones básicas de creación (linq). En este artículo explicaremos más a fondo el resto de operaciones y operadores que podemos encontrar.

Pero antes de entrar en materia vamos a conocer unos diagramas que nos ayudarán a explicar y entender más fácilmente cómo se comportan las operaciones:

Marble diagrams

Cuando hablamos de reactive extensions es muy común encontrarnos con unos diagramas que nos ayudan a entender las operaciones. En ellos se muestra la evolución de la tarea de observación con respecto el tiempo y tienen este formato:

Dentro de uno de estos diagramas podemos encontrarnos con 4 símbolos diferentes:

Representa una iteración o un valor en una secuencia observable. (IObserver<T>.OnNext(T);)
Cuando sucede una transformación en un valor.
x Si una excepción termina con la secuencia observable. (IObserver<T>.OnError(Exception);)
| Cuando una secuencia observable termina de forma correcta. (IObserver<T>.OnCompleted();)

 

Una vez tenemos claro cómo funciona, podemos ponernos a explicar los operadores especiales:

Merge

Cuando hacemos un Merge de dos objetos observables, estamos seleccionando todas las iteraciones (OnNext) de uno y otro operador.

var z = x.Merge(y);

En este ejemplo, el observable z terminará cuando se haya completado (OnComplete) tanto x como y, o cuando una de ellas tenga un error (OnError). Una vista del resultado en un diagrama sería:

Rx Merge

Concat

Otra operación es la concatenación. Donde el resultado no será exactamente el mismo que el de un merge.

var z = x.Concat(y);

Al concatenar el observable x con y, el resultado serán todas las iteraciones de x (OnNext), hasta que este se haya completado (OnComplete), y después toda las que ocurran en el observable y. Las iteraciones que ocurran en y mientras x no ha terminado serán ignoradas. Y si ocurre un error (OnError) en x se terminará z con el error. Es un concepto un tanto complejo, pero si echamos un vistazo a su diagrama se entiende rápidamente:

Rx Concat

En un lenguaje más vulgar, se recogen la iteraciones de x y cuando termina, las de y.

Catch

Al igual que cuando hablamos de un bloque try ... catch, este operador recogerá un error (OnError) que se produzca en el primer operando:

var z = x.Catch(y);

Al poner esta secuencia, el resultado será semejante a un Concat. Pero la diferencia es que la "señal" que se utiliza para cambiar de 
operando es un error (OnError). Podemos decir entonces que se recogen todas las iteraciones (OnNext) de x hasta que este falla y entonces se empiezan a recoger las de y.

Rx Catch

Si cualquiera de los dos operandos de la operación terminan (OnComplete) el resultado será que z también terminará.

OnErrorResumeNext

A la gente que ha programado en Visual basic, le sonará este comando. Como su buen nombre indica lo que hará es que si ocurre un error se pase al siguiente y si no seguirá con el proceso normal.

var z = x.OnErrorResumeNext(y);

Es decir, realizará la misma operación que un Concat, pero si encuentra un error se comportará como un Catch. O dicho con otras palabras, devuelve las iteraciones (OnNext) de x, hasta que ocurre un error (OnError) o es completado (OnComplete), momento en el cual empezará a devolver las iteraciones de y.

Rx OnErrorResumeNext

Zip

La operación Zip puede resultar un poco especial. Podríamos traducirla al castellano como "cremallera" y la operación que realiza se asemeja a lo que hace una cremallera al cerrarse. Si nos adentramos en este código:

var z = x.Zip(y, (oneX, oneY) => oneX + oneY);

El resultado esperará una iteración en alguno de los dos objetos observables. Cuando ocurra, esperará a que tenga una "respuesta" por parte del otro objeto observable. Es decir, va emparejando las iteraciones (OnNext) de uno y otro observable, aplicando la conversión que especifiquemos:

Rx Zip

CombineLatest

Una operación que se asemeja con Zip es CombineLatest. Como su nombre indica combinará las últimas iteraciones que se encuentra.

var z = x.CombineLatest(y, (oneX, oneY) => oneX + oneY);

Cuando nos encontremos este tipo de código, sabremos que cada vez que ocurre una iteración (OnNext) en alguno de los observables, la va a combinar con la última ocurrida en el otro observable, usando para ello la función que le hemos pasado como parámetro:

Rx CombineLatest

Repeat

Teniendo en cuenta que un objeto observable de reactive extensions es además enumerable, puede ser que necesitemos repetir la última secuencia que hemos estado observando. En este contexto podríamos usar un código semejante a este:

var z = x.Repeat(3);

Donde grabaremos en el objeto observable z todas las iteraciones (OnNext) en orden, que han ocurrido en x, y las repetiremos tantas veces como le indiquemos (en este caso 3). Gráficamente se representaría así:

Rx Repeat

Retry

Cuando lo que queremos es repetir la secuencia solo en el caso de que obtengamos un error (OnError), usaremos Retry.

var z = x.Retry(3);

Esta operación volverá a repetir la secuencia de iteraciones si se produce un error. Y lo intentará tantas veces como le indiquemos. Por supuesto que si no obtiene ningún error no se repetirá nunca la secuencia.

Rx Retry

Join

Hablar del método Join es lo mismo que hablar de unión entre dos objetos. Hasta este punto hemos visto dos formas de realizar "uniones" con objetos: Zip y CombineLatest. La diferencia fundamental de esta función es que no solo combina parejas o los últimos, si no más bien todas las posibles combinaciones hasta el momento de ocurrir.

var z = x.Join(y, v => x, v => y, (oneX, oneY) => oneX + oneY);

El formato en este caso es la unión entre x e y, mientras por un lado dure x (v => x) y por el otro dure y. Y la función de unión es la especificada en el último parámetro.

Con el código que acabamos de ver, cada vez que ocurra una iteración (OnNext) buscará todas las iteraciones que han ocurrido en el otro observador y combinará esta con todas las anteriores. Un concepto que queda mucho más claro al ver el diagrama:

Rx Join

Buffer

Uno de los métodos más útiles dentro de las rx, es el de Buffer, que nos permite crear bloques de observación en forma de listas genéricas, en dependencia del tiempo o de un número de iteraciones. Por ejemplo, imaginemos un escenario donde cada segundo recibimos una iteración, pero queremos agruparlas cada 3 segundos con el fin de que el usuario no vea refrescada constantemente la interfaz gráfica. En este caso podríamos usar este código:

var z = x.Buffer(TimeSpan.FromSeconds(3));

Y su resultado sería un objeto observable de este tipo: IObservable<IList<T>>. O lo que es lo mismo, cada iteración de este nuevo objeto observable contendrá una lista de los objetos que se han almacenado en forma de buffer:
Rx Buffer (time)

Como vemos en el ejemplo anterior, aunque en un segundo no se produzca una iteración, se guardará un buffer con respecto al tiempo. Si lo que deseamos es un resultado semejante a este:

Rx buffer (count)

Lo que tendremos que usar es también la función Buffer, pero esta vez con respecto a un contador (y no al tiempo):

var z = x.Buffer(3);

Window

Muchos encontrarán que Window y Buffer tienen un formato semejante. En el caso de Buffer creamos una secuencia de paquetes en forma de lista genérica. Sin embargo, cuando usamos Window, en lugar de listas, empaquetamos en objetos observables.

var z = x.Window(3);

En este caso el diagrama de canicas sería algo así:

Rx Window (Counter)

Por lo que el tipo de datos que devuelve esta función es IObservable<IObservable<T>>. Y podemos usarlo con contador o con respecto el tiempo también:

var k = x.Window(TimeSpan.FromSeconds(3));



Switch

Para terminar de describir métodos nuevos de las reactive extensions, hablaremos de Switch, que es complementario a Window, ya que realiza la operación contraria. Es decir, convierte un objeto IObservable<IObservable<T>> en un simple IObservable<T>.

IObservable<int> x;
/* ... */

var k = x.Window(TimeSpan.FromSeconds(5));

var w = k.Switch();


En este código, haríamos que el objeto observable x fuera exactamente igual a w. Podríamos decir entonces que Switch es el equivalente al deshacer de un comando Window.

Aplicando Rx

La mejor de las virtudes de las reactive extensions, es su versatilidad. Puede aplicarse en muy diferentes escenarios y solo hay que dejar volar un poco la imaginación para darnos cuenta de que podemos usarlas para problemas comunes del día a día. Por ejemplo, hace unos meses, Pablo Nuñez (@pablonete) propuso vía twitter un ejercicio simple de Linq que seguro nos hubiera ocupado unas cuantas líneas y gracias a rx podemos reducirlo a una:

Ejercicio LINQ: Trocear una lista { "a", "1", "b", "2", ...} en sublistas { { "a", "1" }, { "b", "2"}, {...}...}

La solución que podríamos deducir con lo estudiado hasta hora podría ser:

var list = new List<string> { "1", "a", "2", "b", "3", "c", "4", "d", ... };
var result = new List<List<string>>();

list.ToObservable().Buffer(2).ObserveOn(Scheduler.Immediate).Subscribe(result.Add);

Esto no quiere decir que esta sea la mejor solución, solo una más. Lo que queremos dejar claro en este artículo es que reactive extensions son unas nuevas herramientas aplicables a la mayor parte de los escenarios que manejamos hoy en día. Nos aportan nuevas operaciones y por lo tanto más versatilidad al lenguaje y la plataforma de desarrollo que usamos.

 

 

Antes de terminar, quisiera instar al lector a experimentar más métodos dentro de las extensiones de rx. Ya que aunque en este artículo hemos tratado muchos de ellos, no son los únicos. Y solo conociéndolos, descubriremos todas sus aplicaciones.

Por hoy me despido y os invito a seguir esta línea de artículos, y más aún el siguiente donde de verdad la cosa se pone entretenida con la última variable de la función que nos definen las reactive extensions: Schedulers.

buy me a beer