Red de conocimiento informático - Problemas con los teléfonos móviles - El multiproceso de Python se bloquea cuando la cola no está vacía. Descubra por qué.

El multiproceso de Python se bloquea cuando la cola no está vacía. Descubra por qué.

Recientemente me encontré con un proyecto que requería ejecutar tareas en múltiples máquinas virtuales. Me referí al código de proyectos anteriores de otras personas y usé multiproceso para manejarlo, así que verifiqué en línea sobre multiproceso en. python

1. Primero, hablemos de Queue (objeto de cola)

Queue es una biblioteca estándar en Python y se puede importar y hacer referencia directamente. Cuando estaba estudiando antes, escuché. los famosos "come primero, tira primero" y "Come después, vomita primero" son en realidad la cola mencionada aquí. Al construir la cola, puedes definir su capacidad. No comas en exceso. Si comes demasiado, se producirá un error. se informará si no lo escribe o escribe un número menor que 1 al construirlo, indica un número infinito

cola de importación

q = Queue.Queue(10)

Poner valor en la cola (put)

q .put('yang')

q.put(4)

q.put (['yan', 'xing'])

En la cola Obtenga el valor en get()

La cola predeterminada es el primero en entrar, el primero en salir

gt; gt; q.get()

'yang'

gt; p>

gt; gt; gt; q.get()

['yan', 'xing']

Cuando una cola está vacía, si usa get to recupérelo, se bloqueará, por lo que al recuperar la cola, generalmente usa el método get_nowait(), este método generará una excepción vacía cuando obtenga un valor de una cola vacía

Entonces, el método más común es para determinar primero si una cola está vacía y luego obtener el valor si no está vacía

p>

Métodos comúnmente utilizados en colas

Queue.qsize() devuelve el tamaño de la cola

Queue.empty() Si la cola está vacía, devuelve True, de lo contrario False

Queue.full() Si la cola está llena, devuelve True, de lo contrario False

Queue.get([block[, timeout]]) Obtiene la cola, tiempo de espera de espera

Queue.get_nowait() es equivalente a Queue.get(False)

Queue.put(item) sin bloqueo escribe en la cola, tiempo de espera de espera

Queue.put_nowait(item) es equivalente a Queue.put(item, False)

2. Utilizando el concepto de subproceso en multiprocesamiento

desde proceso de importación de multiprocesamiento

Puedes construir un subproceso a través de Proceso

p = Process(target=fun) , args=(args))

Luego use p.start() para iniciar el proceso hijo

Luego use el método p.join () para ejecutar el proceso padre después del proceso hijo ha terminado de ejecutarse

desde el proceso de importación multiprocesamiento

importar sistema operativo

# Código que ejecutará el proceso hijo

p>

def run_proc (nombre):

print 'Ejecutar proceso hijo s(s)...' (nombre, os.getpid())

if __name__= ='__main__':

print 'Proceso principal

s s.' os.getpid()

p = Process(target=run_proc, args=('test',))

print 'El proceso comenzará.'

p.start()

p.join()

imprimir 'Fin del proceso'.

3. >

Si necesita varios procesos secundarios, puede considerar usar un grupo de procesos (pool) para administrarlos

desde el grupo de importación de multiprocesamiento

desde el grupo de importación de multiprocesamiento

importar sistema operativo, tiempo

def long_time_task(nombre):

imprimir 'Ejecutar tarea s(s)...' (nombre, os.getpid())

inicio = time.time()

time.sleep(3)

end = time.time()

imprimir 'Tarea s ejecuta 0.2f segundos.' (nombre, (final - inicio))

if __name__=='__main__':

imprimir 'Proceso principal s.' /p>

p = Pool()

para i en el rango(5):

p.apply_async(long_time_task, args=(i,))

print 'Esperando que todos los subprocesos hayan terminado...'

p.close()

p.join()

print 'Todos los subprocesos hayan terminado .'

El método para crear un proceso hijo en el grupo es diferente al de Process. Se implementa a través de

p.apply_async(func, args=(args)). Las tareas que se pueden ejecutar simultáneamente en un grupo dependen de usted. La cantidad de CPU en la computadora. Por ejemplo, mi computadora ahora tiene 4 CPU. Entonces los subprocesos tarea0, tarea1, tarea2 y tarea3 se pueden iniciar en el. al mismo tiempo, y la tarea4 comenzará después de que finalice el proceso anterior

El resultado después de ejecutar el programa anterior en realidad se realiza por separado de acuerdo con 1, 2 y 3 en la imagen de arriba. 1 se imprime primero, 2 es. se imprime después de 3 segundos y 3 se imprime después de 3 segundos.

La p en el código .close() cierra el grupo de procesos y ya no le agrega procesos llamando al método join() en el grupo. El objeto esperará a que todos los procesos secundarios completen la ejecución. Antes de llamar a join (), primero debe llamar a close (), no puede continuar agregando nuevos procesos.

En ese momento, también puede definir el número de procesos para el grupo de instancias

Si p=Pool(5) en el código anterior, entonces todos los procesos secundarios se pueden procesar en al mismo tiempo

3. Comunicación entre múltiples subprocesos

La comunicación entre múltiples subprocesos debe utilizar la cola mencionada en el primer paso. Por ejemplo, si existe lo siguiente. requisito, un subproceso envía un mensaje a la cola Escribe datos y otro proceso toma datos de la cola,

#coding: gbk

del proceso de importación multiprocesamiento, cola

importar sistema operativo, tiempo, aleatorio

# Código ejecutado por el proceso de escritura de datos:

def write(q):

for valor en ['A', 'B', 'C' ]:

imprimir el valor de 'Poner s en la cola...'

q.put(valor)

time.sleep(random.random())

# Código ejecutado por el proceso de lectura de datos:

def read(q):

while Verdadero:

si no es q.empty():

valor = q.get(True)

imprimir 'Obtener s de la cola.' p>

time.sleep(random.random() )

else:

descanso

if __name__=='__main__':

# El proceso padre crea una Cola y la pasa a cada Subproceso:

q = Cola()

pw = Proceso(target=write, args=(q, ))

pr = Process(target= read, args=(q,))

# Inicie el subproceso pw, escriba:

pw. start()

# Espere a que finalice pw:

pw.join()

# Inicie el subproceso pr, lea:

pr.start()

pr.join()

# El proceso pr está en un bucle infinito y no puede esperar a que finalice. Solo se puede terminar por la fuerza:

imprimir

imprimir 'Todos los datos se escriben y se leen Fin'

4. Varias preguntas interesantes sobre el código anterior

if __name__ =='__main__':

# El proceso padre crea una cola y la pasa a cada proceso hijo:

q = Queue()

p = Pool ()

pw = p.apply_async(write, args=(q,) )

pr = p.apply_async(read, args=(q,))

p.close()

p.join()

imprimir

imprimir 'Todos los datos se escriben y leen'

Si la función principal está escrita como en el ejemplo anterior, lo que quiero es obtener una cola, pasarla como parámetro a cada proceso hijo en el grupo de procesos, pero obtener

RuntimeError: los objetos de la cola solo deben ser compartido entre procesos a través

ugh herencia

Después de verificar, la idea general es que el objeto de la cola no puede comunicarse entre el proceso padre y el proceso hijo. Si desea usar la cola en el grupo de procesos, debe usar la clase Manager de. multiproceso

if __name__=='__main__':

manager = multiprocessing.Manager()

# El proceso padre crea una cola y la pasa a cada hijo proceso:

q = manager.Queue()

p = Pool()

pw = p.apply_async(write, args=(q,))

time.sleep(0.5)

pr = p.apply_async(read, args=(q,))

p.close()

p. join()

imprimir

imprimir 'Todos los datos se escriben y leen'

De esta manera, este objeto de cola puede comunicarse entre el proceso padre y el proceso hijo, si no usa un grupo, no necesita un Administrador. Extendamos la clase Administrador en multiproceso más adelante

En cuanto a la aplicación de bloqueos, si operan diferentes programas. en la misma cola al mismo tiempo, para evitar errores, cuando una función opera una cola, puede agregarle un bloqueo, de modo que solo un proceso secundario pueda operar en la cola al mismo tiempo, y el bloqueo debe también habrá un bloqueo en el objeto administrador

#coding: gbk

desde multiprocesamiento importar Proceso, Cola, Pool

importar multiprocesamiento

importar sistema operativo, tiempo, aleatorio

# Escribir código de ejecución del proceso de datos:

def write(q, lock):

lock.acquire() #Agregar bloqueo

para el valor en ['A', ' B', 'C']:

imprimir el valor de 'Poner s en cola...'

q .put(value)

lock.release( ) #Liberar bloqueo

# Código ejecutado por el proceso de lectura de datos:

def read(q):

mientras es Verdadero:

si no q.empty():

valor = q.get(False)

imprimir 'Obtener s de la cola.' value

time.sleep(random .random())

else:

break

if __name__== '__main__':

manager = multiprocessing.Manager()

# El proceso padre crea una cola y la pasa a cada proceso hijo:

q = manager .Queue()

lock = manager.Lock() # Inicializar un bloqueo

p = Pool()

pw = p.apply_async(write, args =(q, bloquear))

pr = p.apply_async(read, args=(q,))

p.close()

p.join()

imprimir

imprimir 'Todos los datos se escriben y leen'