Cómo describir vívidamente el mecanismo de control de flujo y contrapresión en RxJava
En RxJava, puede formar una cadena de llamadas llamando continuamente a varios operadores en Observable, en la que los datos se pasan de aguas arriba a aguas abajo. Cuando la velocidad a la que el flujo ascendente envía datos es mayor que la velocidad a la que el flujo descendente procesa los datos, se requiere control de flujo.
Esto es como el problema de matemáticas de la escuela primaria: una piscina tiene un tubo de entrada y un tubo de salida. Si el flujo de agua del tubo de entrada es mayor, la piscina se llenará (se desbordará) después de un tiempo. Este es el resultado de no tener Control de Flujo.
¿Cuáles son las ideas para el Control de Flujo? Hay aproximadamente cuatro tipos:
(1) Contrapresión.
(2) Estrangulamiento.
(3) Procesamiento de envases.
(4) Bloqueo de pila de llamadas.
A continuación se presenta en detalle.
Nota: Actualmente coexisten dos versiones de RxJava, 1.x y 2.x. La 2.x tiene cambios importantes en la interfaz en comparación con la 1.x, incluida la parte de Contrapresión. Sin embargo, todos los conceptos relacionados en el mecanismo de control de flujo que se analizarán aquí son aplicables.
Varias ideas para el control de flujo
Contrapresión
La contrapresión, también conocida como tracción reactiva, es cuánta se necesita aguas abajo (específicamente a través de la solicitud aguas abajo se especifica cuánto es necesario), el upstream enviará tanto como necesite. Esto es algo similar al control de flujo en TCP. El receptor controla la velocidad de recepción de acuerdo con su propia ventana de recepción y controla la velocidad de envío del remitente a través de paquetes ACK inversos.
Esta solución sólo es válida para los llamados Observables fríos. Cold Observable se refiere a aquellas fuentes de transmisión que permiten reducir la velocidad. Por ejemplo, dos máquinas transmiten un archivo. La velocidad puede ser grande o pequeña. Incluso si se reduce a unos pocos bytes por segundo, aún se puede completar. siempre y cuando el tiempo sea suficiente. El ejemplo opuesto es la transmisión en vivo de audio y video. Si la velocidad de datos es inferior a un cierto valor, toda la función queda inutilizable (este es un Observable activo).
Acelerar
Acelerar, para decirlo sin rodeos, significa descartar. Si no puedes consumirlo, desecha una parte y tira el resto. Tomemos el ejemplo de la transmisión en vivo de audio y video. Cuando el procesamiento descendente no se puede procesar, el paquete de datos debe descartarse.
Existen diferentes estrategias sobre qué datos procesar y cuáles descartar. Hay tres estrategias principales:
muestra (también llamada throttleLast)
throttleFirst
antirrebote (también llamado throttleWithTimeout)
Explica en detalle ellos por separado.
muestra, muestreo. Por analogía con el muestreo de audio, el audio de 8 kHz toma un valor cada 125 microsegundos. La muestra se puede configurar para, por ejemplo, muestrear un valor cada 100 milisegundos, pero muchos valores pueden provenir del flujo ascendente dentro de 100 milisegundos. El valor a elegir es el último. Por eso también se llama throttleLast.
throttleFirst es similar a sample. Por ejemplo, todavía muestrea un valor cada 100 milisegundos, pero selecciona el primer valor dentro de esos 100 milisegundos. En el desarrollo de Android, throttleFirst a veces se puede utilizar como procesamiento antivibración para eventos de clic, porque puede procesar el primer evento de clic (es decir, muestrear el primer valor) dentro de un período de tiempo específico, pero descartar los eventos de clic posteriores.
Debounce, también llamado throttleWithTimeout, contiene un ejemplo en su nombre. Por ejemplo, un programa de red mantiene una conexión TCP y envía y recibe datos continuamente, pero cuando no hay datos para enviar o recibir en el medio, habrá un intervalo. Este tiempo intermitente se puede llamar tiempo de inactividad. Cuando el tiempo de inactividad excede un valor preestablecido, se considera que se ha agotado el tiempo de espera (tiempo de espera) y es posible que sea necesario desconectar la conexión en este momento. De hecho, algunos programas de red del lado del servidor funcionan de esta manera. Después de enviar y recibir cada paquete de datos, se inicia un temporizador y espera un tiempo de inactividad. Si los paquetes de datos se envían y reciben antes de que expire el temporizador, el temporizador se reinicia y espera un nuevo tiempo de inactividad; si el temporizador expira, se agota el tiempo y se puede cerrar la conexión; El comportamiento del rebote es muy similar a este. Puede usarlo para encontrar eventos de tiempo de inactividad después de eventos de envío y recepción consecutivos. En otras palabras, el rebote puede encontrar grandes espacios entre eventos consecutivos.
Empaquetado
El empaquetado consiste en dividir paquetes pequeños desde el nivel ascendente en paquetes grandes y distribuirlos en el nivel descendente. Esto reduce la cantidad de paquetes que deben procesarse posteriormente. RxJava proporciona dos tipos de mecanismos de este tipo: búfer y ventana.
Las funciones de buffer y ventana son básicamente las mismas, pero el formato de salida es diferente: el paquete de buffer está representado por una Lista y el paquete de ventana está representado por un Observable.
Bloqueo de pila de llamadas
Este es un caso especial que bloquea toda la pila de llamadas (bloqueo de pila de llamadas). La razón por la que este es un caso especial es que este método solo es aplicable cuando toda la cadena de llamadas se ejecuta sincrónicamente en un subproceso, lo que requiere que ninguno de los operadores en el medio pueda iniciar nuevos subprocesos. Esto debería ser relativamente raro en el uso diario, porque a menudo usamos subscribeOn u observeOn para cambiar los subprocesos de ejecución, y algunos operadores complejos iniciarán nuevos subprocesos internamente para su procesamiento. Además, si realmente ocurre una cadena de llamadas completamente sincrónica, las otras tres ideas de control de flujo mencionadas anteriormente aún pueden ser aplicables, pero este método de bloqueo es más simple y no requiere soporte adicional.
Aquí hay un ejemplo para comparar el bloqueo de la pila de llamadas con la contrapresión anterior. El "bloqueo de la pila de llamadas" equivale a que muchos automóviles conduzcan por una sinuosa carretera de montaña, y la carretera tiene un solo carril. Entonces el primer coche de la fila bloquea toda la carretera y los coches de detrás sólo pueden alinearse detrás. La "contrapresión" equivale a llamar a un número en la ventanilla cuando se hacen negocios en un banco. La ventanilla llama activamente a un determinado número (equivalente a una solicitud) y esa persona se acerca para atenderlo.
¿Cómo hacer contrapresión de soporte observable?
En RxJava 1.x, algunos Observables admiten Backpression, mientras que otros no. Sin embargo, un Observable que no admite Contrapresión se puede convertir en un Observable que admita Contrapresión a través de algunos operadores. Estos operadores incluyen:
onBackpressionBuffer
onBackpressionDrop
onBackpressionLatest
onBackpressionBlock (expirado)
Se convierten en Cada uno de los Observables tiene diferentes estrategias de contrapresión.
En RxJava 2.x, Observable ya no admite la contrapresión, sino que utiliza Flowable para admitir específicamente la contrapresión. Los primeros tres de los cuatro operadores mencionados anteriormente corresponden a las tres estrategias de Contrapresión de Flowable:
Back PressureStrategy.BUFFER
Back PressureStrategy.DROP
Back PressureStrategy.LATEST
onBackpressionBuffer es un método de procesamiento que no descarta datos. Almacene en caché todo lo recibido del flujo ascendente y espere la solicitud del flujo descendente antes de enviarla al flujo descendente. Equivale a un embalse. Pero si se va río arriba demasiado rápido, el depósito (amortiguador) se desbordará.
onBackpressionDrop y onBackpressionLatest son similares en que ambos descartan datos. Estas dos estrategias son equivalentes a un mecanismo de token (o mecanismo de cuota). El flujo descendente genera tokens (cuotas) al flujo ascendente a través de solicitudes. Cuantos tokens reciba el flujo ascendente, enviará tantos datos al flujo descendente. Cuando el número de tokens se consume hasta 0, el flujo ascendente comienza a descartar datos. Pero hay una diferencia sutil entre las dos estrategias cuando el número de tokens es 0: onBackpressionDrop descarta directamente los datos sin almacenar en caché ningún dato; mientras que onBackpressionLatest almacena en caché los datos más recientes, de modo que cuando el upstream reciba un nuevo token, simplemente lo envía. los últimos datos "más recientes" almacenados en caché al flujo descendente primero. Se puede entender combinando las dos imágenes siguientes.
onBack PressureBlock verifica si hay demanda descendente. Si hay demanda, se enviará a la descendente. Si no hay demanda descendente, no se descartará, pero intentará hacerlo. bloquea la entrada ascendente (si realmente se puede bloquear depende de la situación ascendente), no se almacena en caché. Esta estrategia ha caído en desuso.