Red de conocimiento informático - Aprendizaje de programación - Análisis en profundidad de BlockingQueue en subprocesos múltiples de Java (5)

Análisis en profundidad de BlockingQueue en subprocesos múltiples de Java (5)

Una descripción general

Como contenedor de subprocesos, BlockingQueue puede proporcionar una sólida garantía para la sincronización de subprocesos

Dos métodos comunes para definir BlockingQueue

Comúnmente Los métodos utilizados definidos por BlockingQueue son los siguientes

Lanzar valor especial de excepción Tiempo de espera de bloqueo

Insertar agregar(e) oferta(e) poner(e) oferta( e unidad de tiempo)

Eliminar eliminar(e) encuesta(e) tomar(e) encuesta(e) encuesta(e) encuesta(e) encuesta(e) encuesta(e) encuesta(e) unidad)

check element() peek() no disponible no disponible

) add(anObject) Agrega un objeto a BlockingQueue. es decir, devuelve verdadero si BlockingQueue puede acomodarlo; de lo contrario, genera una excepción

) oferta() unObjeto) significa que si unObjeto está disponible, no está disponible. Devuelve verdadero si BlockingQueue puede acomodar el objeto; de lo contrario, devuelve falso

) put(anObject) Si no hay espacio en BlockQueue, agrega el objeto a BlockingQueue; de ​​lo contrario, devuelve verdadero

) put(anObject) Si no hay espacio en BlockQueue, agregue el objeto a BlockingQueue. Si no hay espacio en BlockQueue, el hilo que llama a este método se bloqueará hasta que haya espacio en BlockingQueue

) poll(time) toma el primer objeto en BlockingQueue si no se puede sacar. inmediatamente, puede esperar el tiempo especificado por el parámetro de tiempo. Si no se puede sacar, devuelve un valor nulo

) take() toma el primer objeto en BlockingQueue. se elimina inmediatamente, devuelve verdadero; de lo contrario, devuelve falso

) take() toma el primer objeto en BlockingQueue. Si BlockingQueue está vacío, el bloqueo entrará en un estado de espera hasta que se agregue un nuevo objeto al Blocking

donde BlockingQueue no acepta elementos vacíos. Se generará una excepción NullPointerException cuando se implemente. Null se utiliza como valor de advertencia para indicar que la operación de sondeo falló

Tres notas sobre BlockingQueue

BlockingQueue puede tener una capacidad limitada en cualquier momento; time Tiempo, su capacidad de mantenimiento puede ser 1. Un BlockingQueue puede tener una capacidad limitada; puede tener una capacidad restante (remainCapacity) más allá de la cual no puede colocar elementos adicionales sin bloquear. Un BlockingQueue sin ningún límite de capacidad interna siempre informa la capacidad restante como el número entero MAX_VALUE.

La implementación de BlockingQueue se utiliza principalmente como una cola productor-usuario, pero también admite una interfaz de recopilación para que cualquier elemento pueda eliminarse de la cola usando remove(x).

La implementación de BlockingQueue es segura para subprocesos y todos los métodos de cola pueden lograr automáticamente su propósito mediante bloqueos internos u otras formas de control de concurrencia. Todos los métodos de cola pueden utilizar bloqueos internos u otras formas de control de concurrencia para lograr su propósito automáticamente; sin embargo, a menos que se especifique lo contrario en la implementación, no es necesario automatizar una gran cantidad de operaciones de recopilación (addAll containsAll, retainAll y removeAll), así, por ejemplo, addAll(c) puede fallar (lanzar una excepción) después de agregar solo unos pocos elementos de c.

La implementación de BlockingQueue es segura para subprocesos.

Básicamente, BlockingQueue no admite ningún tipo de operación de cierre o apagado para indicar que no se agregarán más elementos. La necesidad y el uso de esta funcionalidad dependen de la implementación. Por ejemplo, una estrategia común es insertar objetos especiales de final de flujo o envenenados para los productores y usarlos en función del tiempo que le toma al usuario obtener estos objetos. Por ejemplo, una estrategia común es insertar objetos especiales de final de flujo o veneno para los productores e interpretarlos en función de cuándo el usuario obtiene estos objetos.

Una breve descripción de cuatro implementaciones de clases de cola de bloqueo comúnmente utilizadas

p>

)ArrayBlockingQueue: una cola de bloqueo con un tamaño específico, su constructor debe aceptar un parámetro int. El constructor debe usar un parámetro int para especificar su tamaño, y los objetos que contiene se ordenan primero en entrar, primero en salir

) LinkedBlockingQueue): una cola de bloqueo de tamaño variable. Si el constructor contiene un parámetro de tamaño, la cola de bloqueo generada tiene un límite de tamaño; si el parámetro de tamaño no está incluido, la cola de bloqueo generada tiene un límite de tamaño. Si el constructor contiene el parámetro de tamaño, el tamaño de la cola de bloqueo generada está determinado por el número entero MAX_VALUE y los objetos se ordenan en orden FIFO (primero en entrar, primero en salir)

) Cola de bloqueo prioritario (PriorityBlockingQueue ): Similar a LinkedBlockingQueue, pero los objetos no se ordenan por orden de primero en entrar, primero en salir, sino según el tamaño de la cola de bloqueo y la cantidad de objetos. La clasificación no es primero en entrar, primero en salir, sino que se basa en el orden de clasificación natural del objeto o en el orden determinado por el comparador del constructor

) Cola síncrona (SynchronousQueue): una BlockingQueue especial cuyas operaciones debe realizarse entre put y take Alternar entre

Entre ellos, LinkedBlockingQueue y ArrayBlockingQueue son iguales. ArrayBlockingQueue Debido a que la estructura de datos detrás de LinkedBlockingQueue es diferente, LinkedBlockingQueue tiene un mayor rendimiento de datos que ArrayBlockingQueue, pero no es tan predecible como ArrayBlockingQueue cuando el número de subprocesos es grande.

Cinco BlockingQueue específicos.

strong> V Detalles internos de clases de implementación específicas de BlockingQueue

Para usuarios pacientes, consulte los detalles de clases de implementación específicas

ArrayBlockingQueue

ArrayBlockingQueue ArrayBlockingQueue

p>

ArrayBlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue

ArrayBlockingQueue es una cola de bloqueo limitada respaldada por una matriz. Una matriz de tamaño fijo contendrá los elementos insertados por el productor y recuperados por el usuario. Una vez que se crea dicha caché, no se puede aumentar su capacidad. El número máximo de elementos que se pueden almacenar en un ArrayBlockingQueue se especifica en el momento de su creación (ya que no se expande automáticamente) y si es un bloqueo justo (parámetro justo)

De forma predeterminada, se crea un ArrayBlockingQueue con un bloqueo injusto, pero podemos especificar esto en su constructor.

public ReentrantLock(boolean fair) {

sync = (fair)?new FairSync() : new NonfairSync()

}

FairSync/NonfairSync es una clase interna de ReentrantLock

Los subprocesos solicitan bloqueos justos en orden, mientras que los bloqueos injustos pueden entrar. Cuando aún no haya ingresado a la cola de espera, se relacionará con el subproceso que espera el nodo principal de la cola. Si el estado del candado está disponible, el hilo que solicita el candado injusto puede avanzar en la cola de espera para adquirir el candado. La sincronización de bloqueo interno no proporciona una garantía explícita de equidad

Hay tres puntos a tener en cuenta sobre esta clase

El método add/put/offer para agregar nuevos elementos

Las varias variables de instancia de la clase. Variable de instancia takeIndex/putIndex/count/

Implementación condicional

Método para agregar nuevos elementos add/put/offer

En primer lugar, cuando se habla del método de agregar elementos Al hacer esto, primero es necesario analizar los bloqueos utilizados en el mecanismo de sincronización de esta clase.

Código Java

[java]

lock = new ReentrantLock(fair)

notEmpty = lock newCondition() //Variable de condición

notFull =?lock newCondition() // Variable de condición

Estas tres variables son todas variables de instancia de la clase. Solo hay un bloqueo, es decir, lock, y luego lock instancia dos. Las condiciones, concretamente notEmpty/noFull, se utilizan para coordinar lecturas y escrituras de subprocesos múltiples, respectivamente.

Código Java

[java]

oferta booleana pública (E e) {

if (e == null) throw new NullPointerException()

final ReentrantLock lock = este bloqueo; // Cada objeto corresponde a un bloqueo explícito

lock lock() // Solicita el bloqueo hasta que se adquiera (no se puede interrumpir). )

intente {

if (count == longitud de los elementos)// si la cola está llena

return false

return; falso ;

else {

insertar(e)

devuelve verdadero

}

} finalmente {

lock unlock() //

}

}

Ver método INSERT

private void insert( E x) {

items[putIndex] = x

//Aumentar el valor del índice global

/*

Dentro del cuerpo del método Inc

final int inc(int i) {

return ( i == longitud de los elementos)?: i; >

en Aquí puede ver que ArrayBlockingQueue inserta nuevos elementos en la matriz interna de adelante hacia atrás.

Si se hace esto, putIndex se puede volver a cambiar al mismo valor (siempre que se realice la operación de eliminación; de lo contrario, la cola estará llena en el juicio anterior)

*/

putIndex = inc(putIndex)

count;

notEmpty signal() //activa un hilo en espera

}

Código Java

[ java]

public void put(e) lanza InterruptedException {

if (e == null) lanza nueva NullPointerException()

final E[] items = ems;

final ReentrantLock lock = este bloqueo;

lock lockInterruptfully() //solicita el bloqueo hasta que se obtenga o se interrumpa

p>

try {

try {

while (count == longitud de los elementos)//Si está completo, el hilo actual entra en estado de espera noFull

notFull await()

} catch (InterruptedException es decir){

notFull signal() // propagar a no interrumpido.)// propagar a hilo no interrumpido

lanzar es decir;

}

insertar(e)

} finalmente {

bloquear desbloquear()

}

}

Código Java

[java]

oferta booleana pública (E e long timeoutTimeUnit unidad)

throws InterruptedException {

if (e == null) throw new NullPointerException()

long nanos = unit toNanos(timeout)

bloqueo final de ReentrantLock = este bloqueo;

bloqueo lockInterruptably()

prueba {

for ( ) {

if (count ! Interrumpe y espera a que regresen los nanos

nanos = notFull awaitNanos(nanos)

} catch (InterruptedException es decir) {

notFull signal() // Propagar a hilos no interrumpidos

notFull signal()

señal notFull()

señal notFull()

señal notFull()

Propagar a subprocesos no interrumpidos

lanzar es decir,

}

}

} finalmente {

bloquear desbloquear()

}

}

}

}

Código Java

[java]

adición booleana pública ( E e) {

return super add(e)

}

Clase principal

public boolean add(E e) {

if (oferta(e))

devuelve verdadero

else

lanza una nueva IllegalStateException(Cola llena)

}

Varias variables de instancia de esta clase takeIndex/ putIndex/count

Código Java

[java]

Tres números se utilizan para mantener los cambios de datos en la cola

/** índice de elementos para la próxima encuesta o eliminación */

private int takeIndex

/* * índice de elementos para la siguiente oferta de venta o agregar */

private int putIndex;

/** Número de elementos en la cola */

private int count;

Los tres métodos de extracción de elementos, tomar/encuesta/eliminar, llaman a este método internamente

Código Java

[java]

extracto E privado() {

elementos E[] finales = ems;

E x = elementos[takeIndex]

elementos[takeIndex] = nulo; //Eliminar el elemento extraído

takeIndex = inc(takeIndex) //Misma estrategia que al agregar elementos

count

notFull signal ( ) //Recordar; otros subprocesos esperan que la condición notFull intente funcionar

return x;

}

Como se puede ver en este método, tabkeIndex mantiene una posición de índice. en el que se puede extraer/eliminar el elemento.

Dado que takeIndex está aumentando, esta clase es una cola de primero en entrar, primero en salir

putIndex mantiene el índice de posición del elemento insertable

Obviamente, el recuento mantiene el número total de elementos que ya existen en la cola

p>

Implementación condicional

Las condiciones ahora solo se implementan como ConditionObject en el bloqueo actual AbstractQueueSynchoronizer y se exponen a través del método newCondition() de ReentranLock. Esto se debe a que el await()/sinal() condicional generalmente se ejecuta entre lock lock() y lock unlock(). Cuando se ejecuta el método condicional await (), primero liberará el bloqueo retenido por el hilo actual, luego ingresará a la cola de espera hasta que sinal () se despierte, luego intentará adquirir el bloqueo nuevamente y luego ejecutará el código en await ( ). Liberar y adquirir el bloqueo actual requiere el método tryAcquire (int arg) de ReentranLock para determinar y utilizar la recarga de ReentranLock

código Java

[java]

public final void await() lanza InterruptedException {

if (Thread interrumped())

throw new InterruptedException()

//agrega un nuevo nodo de camarero de condición < / p>

Nodo nodo = addConditionWaiter()

//Libera tu propio bloqueo

int saveState =fullyRelease(node)

int interruptMode = <; /p>

while (!isOnSyncQueue(node)) {

//Si el hilo actual Esperando un estado cuando CONDITION estaciona el hilo actual Esperando una señal de CONDITION para liberarlo

LockSupport park(this)

if ( (interruptMode = checkInterruptWaiting(node)) != )

break

}

p>

if (acquireQueued(node ​​​​savedState) amp; amp; interruptMode ! = THROW_IE)

interruptMode = REINTERRUPT

if (node ​​nextWaiter; ! = null)

unlinkCancelledWaiters()

if (interruptMode != )

reportInterruptAfterWait(interruptMode)

}

Cola de sincronización

Una cola de bloqueo en la que cada put debe esperar una toma, y ​​viceversa.

Las colas de sincronización no tienen capacidad interna, ni siquiera la capacidad de la cola. No puede echar un vistazo a la cola sincronizada porque el elemento solo existe cuando intenta obtenerlo y no puede agregar un elemento (usando cualquier método) a menos que otro hilo intente eliminarlo. Si no hay un hilo en la cola, no. Se agregará el elemento , el encabezado de la cola está vacío. El encabezado de la cola es el primer elemento del subproceso de la cola que intenta agregarse a la cola. Si no hay un subproceso de la cola, no se agregará ningún elemento. En comparación con otros métodos de recopilación (como contiene), la cola sincronizada. (SynchronousQueue) es como una reunión vacía. En este tipo de diseño, si un objeto que se ejecuta en un hilo quiere pasar información, eventos o tareas a un objeto que se ejecuta en otro hilo, debe sincronizarse con ese objeto.

Para los subprocesos de productor y usuario en espera, este tipo de cola admite una política de ordenamiento justo opcional. El ordenamiento justo no está garantizado de forma predeterminada, pero la cola creada después de establecer la equidad en verdadero garantiza que el acceso a los subprocesos sea el primero. orden de entrada, primero en salir. La equidad generalmente reduce el rendimiento, pero reduce la variabilidad y evita un servicio deficiente.

Cola de bloqueo de enlaces

Un rango arbitrario de colas de bloqueo basadas en nodos de enlace. El encabezado de la cola es el elemento con el tiempo más largo en la cola, la cola de la cola es el elemento con el tiempo más corto en la cola, se insertan nuevos elementos en la cola de la cola y la operación de recuperación de la cola obtiene el elemento al principio de la cola. Las colas de enlaces generalmente tienen un mayor rendimiento que las colas basadas en matrices. Estos dos bloqueos (putLock y takeLock) son instancias de las condiciones notFull y notEmpty respectivamente y se utilizan para coordinar el acceso multiproceso. Algunos métodos (como eliminar toArray toString, borrar, etc.) requieren adquirir estos dos bloqueos al mismo tiempo. Este orden es para evitar posibles interbloqueos (no sé por qué sucede esto.

PriorityBlockingQueue

La cola de bloqueo ilimitada utiliza las mismas reglas de ordenación que la clase PriorityQueue y proporciona recuperación de bloqueo Aunque la cola no está limitada lógicamente, los intentos de agregarla pueden fallar (lo que genera un error OutOfMemoryError). La priorización que se basa en el orden natural de la cola no permite la inserción de objetos no comparables. esto generará ClassCastException)

Básicamente, puedes entender la clase observando las tres propiedades de la clase

Código Java

[java]

PriorityQueue q final privado

bloqueo ReentrantLock final privado = nuevo ReentrantLock(true)

condición final privada notEmpty = bloqueo newCondition()

bloqueo significa que esta clase usa bloqueos para sincronizar operaciones como lectura y escritura

notEmpty coordina la disponibilidad de nuevos elementos en la cola. Cuando la cola está llena, llama al método de crecimiento de PriorityQueue para hacer crecer la cola.

DelayQueue

Una cola de retraso es una cola de bloqueo ilimitada de elementos retrasados ​​de la cual los elementos se pueden recuperar solo cuando expira el retraso. El encabezado de es el elemento retrasado que permanece por más tiempo. después de que expire el retraso, cuando el valor devuelto por el método getDelay (TimeUnit NANOSECONDS) del elemento es menor o igual a cero, la encuesta devolverá un valor nulo.

La interfaz de retraso hereda de Comparable y se implementa mediante los elementos que insertamos.

El diseño de DelayQueue se basa en la interfaz de retraso. p> Documentación API sobre el propósito de DelayQueue

Una cola de bloqueo ilimitada de elementos retrasados ​​que solo se puede utilizar cuando su retraso ha expirado. Si el retraso no ha expirado, no hay encabezado de cola y la encuesta devolverá nulo. El retraso ha expirado cuando el método getDelay(TimeUnit NANOSECONDS) del elemento devuelve un valor menor o igual a cero, incluso si los elementos no vencidos no se pueden eliminar usando. tomar o sondear, se tratan como un elemento normal. Por ejemplo, el método size devuelve un recuento de elementos caducados y no caducados. La cola no permite elementos vacíos.

Esto se debe a que el constructor DelayQueue no permite pasar paratores (en el modelo anterior no se permitía pasar paratores). PriorityBlockingQueue), es decir, solo puede definir reglas de comparación de prioridad en el método pare. Cuando el elemento getDelayed.Expiration (vencimiento retrasado), el encabezado de la cola es el elemento retrasado con el retraso más largo. Cuando el valor devuelto por el método getDelay (TimeUnit NANOSECONDS) del elemento es menor o igual a cero, se producirá la caducidad. ; = int

Arriba. El texto en inglés también explica que al sondear/extraer, el elemento de la cola determinará si el elemento ha alcanzado el tiempo de espera. Si no se ha alcanzado el tiempo de espera, el sondeo devuelve nulo y la extracción entra en estado de espera. Pero a excepción de estos dos métodos, los elementos de la cola se tratan como elementos ordinarios; por ejemplo, el método de tamaño devuelve el número de todos los elementos independientemente de si se agotó el tiempo de espera. Las condiciones disponibles para la coordinación solo son significativas para tomar y sondear.

También se debe agregar que el tipo de cola de trabajo en ScheduledThreadPoolExecutor es su clase interna DelayedWorkQueue y su contenedor de tareas es DelayedWorkQueue. El contenedor de tareas de DelayedWorkQueue es de tipo DelayQueue, y ScheduledFutureTask, que es la clase de implementación de Delay, es la clase de tarea encapsulada de Runnable, lo que significa que ScheduledThreadPoolExecutor ejecuta tareas utilizando las reglas de prioridad de DelayQueue

BlockingDque LinkedBlockingQueue lishixinzhi/Article/ program/Java/gj/201311/27544