El multiproceso de Python se bloquea cuando la cola no está vacía. Descubra por qué.
Recientemente me encontré con un proyecto que requería ejecutar tareas en múltiples máquinas virtuales. Me referí al código de proyectos anteriores de otras personas y usé multiproceso para manejarlo, así que verifiqué en línea sobre multiproceso en. python
1. Primero, hablemos de Queue (objeto de cola)
Queue es una biblioteca estándar en Python y se puede importar y hacer referencia directamente. Cuando estaba estudiando antes, escuché. los famosos "come primero, tira primero" y "Come después, vomita primero" son en realidad la cola mencionada aquí. Al construir la cola, puedes definir su capacidad. No comas en exceso. Si comes demasiado, se producirá un error. se informará si no lo escribe o escribe un número menor que 1 al construirlo, indica un número infinito
cola de importación
q = Queue.Queue(10)
Poner valor en la cola (put)
q .put('yang')
q.put(4)
q.put (['yan', 'xing'])
En la cola Obtenga el valor en get()
La cola predeterminada es el primero en entrar, el primero en salir
gt; gt; q.get()
'yang'
gt; p>
gt; gt; gt; q.get()
['yan', 'xing']
Cuando una cola está vacía, si usa get to recupérelo, se bloqueará, por lo que al recuperar la cola, generalmente usa el método get_nowait(), este método generará una excepción vacía cuando obtenga un valor de una cola vacía
Entonces, el método más común es para determinar primero si una cola está vacía y luego obtener el valor si no está vacía
p>
Métodos comúnmente utilizados en colas
Queue.qsize() devuelve el tamaño de la cola
Queue.empty() Si la cola está vacía, devuelve True, de lo contrario False p>
Queue.full() Si la cola está llena, devuelve True, de lo contrario False
Queue.get([block[, timeout]]) Obtiene la cola, tiempo de espera de espera
Queue.get_nowait() es equivalente a Queue.get(False) p>
Queue.put(item) sin bloqueo escribe en la cola, tiempo de espera de espera
Queue.put_nowait(item) es equivalente a Queue.put(item, False)
2. Utilizando el concepto de subproceso en multiprocesamiento
desde proceso de importación de multiprocesamiento
Puedes construir un subproceso a través de Proceso
p = Process(target=fun) , args=(args))
Luego use p.start() para iniciar el proceso hijo
Luego use el método p.join () para ejecutar el proceso padre después del proceso hijo ha terminado de ejecutarse
desde el proceso de importación multiprocesamiento
importar sistema operativo
# Código que ejecutará el proceso hijo
p>def run_proc (nombre):
print 'Ejecutar proceso hijo s(s)...' (nombre, os.getpid())
if __name__= ='__main__': p>
print 'Proceso principal
s s.' os.getpid()
p = Process(target=run_proc, args=('test',))
print 'El proceso comenzará.'
p.start()
p.join()
imprimir 'Fin del proceso'.
3. >
Si necesita varios procesos secundarios, puede considerar usar un grupo de procesos (pool) para administrarlos
desde el grupo de importación de multiprocesamiento
desde el grupo de importación de multiprocesamiento
importar sistema operativo, tiempo
def long_time_task(nombre):
imprimir 'Ejecutar tarea s(s)...' (nombre, os.getpid()) p>
inicio = time.time()
time.sleep(3)
end = time.time()
imprimir 'Tarea s ejecuta 0.2f segundos.' (nombre, (final - inicio))
if __name__=='__main__':
imprimir 'Proceso principal s.' /p>
p = Pool()
para i en el rango(5):
p.apply_async(long_time_task, args=(i,))
print 'Esperando que todos los subprocesos hayan terminado...'
p.close()
p.join()
print 'Todos los subprocesos hayan terminado .'
El método para crear un proceso hijo en el grupo es diferente al de Process. Se implementa a través de
p.apply_async(func, args=(args)). Las tareas que se pueden ejecutar simultáneamente en un grupo dependen de usted. La cantidad de CPU en la computadora. Por ejemplo, mi computadora ahora tiene 4 CPU. Entonces los subprocesos tarea0, tarea1, tarea2 y tarea3 se pueden iniciar en el. al mismo tiempo, y la tarea4 comenzará después de que finalice el proceso anterior
El resultado después de ejecutar el programa anterior en realidad se realiza por separado de acuerdo con 1, 2 y 3 en la imagen de arriba. 1 se imprime primero, 2 es. se imprime después de 3 segundos y 3 se imprime después de 3 segundos.
La p en el código .close() cierra el grupo de procesos y ya no le agrega procesos llamando al método join() en el grupo. El objeto esperará a que todos los procesos secundarios completen la ejecución. Antes de llamar a join (), primero debe llamar a close (), no puede continuar agregando nuevos procesos.
En ese momento, también puede definir el número de procesos para el grupo de instancias
Si p=Pool(5) en el código anterior, entonces todos los procesos secundarios se pueden procesar en al mismo tiempo
3. Comunicación entre múltiples subprocesos
La comunicación entre múltiples subprocesos debe utilizar la cola mencionada en el primer paso. Por ejemplo, si existe lo siguiente. requisito, un subproceso envía un mensaje a la cola Escribe datos y otro proceso toma datos de la cola,
#coding: gbk
del proceso de importación multiprocesamiento, cola
importar sistema operativo, tiempo, aleatorio
# Código ejecutado por el proceso de escritura de datos:
def write(q):
for valor en ['A', 'B', 'C' ]:
imprimir el valor de 'Poner s en la cola...'
q.put(valor)
time.sleep(random.random())
# Código ejecutado por el proceso de lectura de datos:
def read(q):
while Verdadero:
si no es q.empty():
valor = q.get(True)
imprimir 'Obtener s de la cola.' p>
time.sleep(random.random() )
else:
descanso
if __name__=='__main__':
# El proceso padre crea una Cola y la pasa a cada Subproceso:
q = Cola()
pw = Proceso(target=write, args=(q, ))
pr = Process(target= read, args=(q,))
# Inicie el subproceso pw, escriba:
pw. start()
# Espere a que finalice pw:
pw.join()
# Inicie el subproceso pr, lea:
pr.start()
pr.join()
# El proceso pr está en un bucle infinito y no puede esperar a que finalice. Solo se puede terminar por la fuerza:
imprimir
imprimir 'Todos los datos se escriben y se leen Fin'
4. Varias preguntas interesantes sobre el código anterior
if __name__ =='__main__':
# El proceso padre crea una cola y la pasa a cada proceso hijo:
q = Queue()
p = Pool ()
pw = p.apply_async(write, args=(q,) )
pr = p.apply_async(read, args=(q,))
p.close()
p.join()
imprimir
imprimir 'Todos los datos se escriben y leen'
Si la función principal está escrita como en el ejemplo anterior, lo que quiero es obtener una cola, pasarla como parámetro a cada proceso hijo en el grupo de procesos, pero obtener
RuntimeError: los objetos de la cola solo deben ser compartido entre procesos a través
ugh herencia
Después de verificar, la idea general es que el objeto de la cola no puede comunicarse entre el proceso padre y el proceso hijo. Si desea usar la cola en el grupo de procesos, debe usar la clase Manager de. multiproceso
if __name__=='__main__':
manager = multiprocessing.Manager()
# El proceso padre crea una cola y la pasa a cada hijo proceso:
q = manager.Queue()
p = Pool()
pw = p.apply_async(write, args=(q,))
time.sleep(0.5)
pr = p.apply_async(read, args=(q,))
p.close()
p. join()
imprimir
imprimir 'Todos los datos se escriben y leen'
De esta manera, este objeto de cola puede comunicarse entre el proceso padre y el proceso hijo, si no usa un grupo, no necesita un Administrador. Extendamos la clase Administrador en multiproceso más adelante
En cuanto a la aplicación de bloqueos, si operan diferentes programas. en la misma cola al mismo tiempo, para evitar errores, cuando una función opera una cola, puede agregarle un bloqueo, de modo que solo un proceso secundario pueda operar en la cola al mismo tiempo, y el bloqueo debe también habrá un bloqueo en el objeto administrador
#coding: gbk
desde multiprocesamiento importar Proceso, Cola, Pool
importar multiprocesamiento
importar sistema operativo, tiempo, aleatorio
# Escribir código de ejecución del proceso de datos:
def write(q, lock):
lock.acquire() #Agregar bloqueo
para el valor en ['A', ' B', 'C']:
imprimir el valor de 'Poner s en cola...'
q .put(value)
lock.release( ) #Liberar bloqueo
# Código ejecutado por el proceso de lectura de datos:
def read(q):
mientras es Verdadero:
si no q.empty():
valor = q.get(False)
imprimir 'Obtener s de la cola.' value
time.sleep(random .random())
else:
break
if __name__== '__main__':
manager = multiprocessing.Manager()
# El proceso padre crea una cola y la pasa a cada proceso hijo:
q = manager .Queue()
lock = manager.Lock() # Inicializar un bloqueo
p = Pool()
pw = p.apply_async(write, args =(q, bloquear))
pr = p.apply_async(read, args=(q,))
p.close()
p.join()
imprimir
imprimir 'Todos los datos se escriben y leen'