Implementación en Python de una cola de tareas multiproceso simple
Recientemente, cuando utilicé el algoritmo de descenso de gradiente para dibujar datos de redes neuronales, encontré algunos problemas de rendimiento del algoritmo. El código del algoritmo de descenso de gradiente es el siguiente (pseudocódigo):
defgradient_descent(): #Código de descenso de gradiente plotly.write(X, Y)
En términos generales, cuando la red solicita plot. Cuando se dibuja ly, se bloqueará y esperará el regreso, lo que también afectará la velocidad de ejecución de otras funciones de descenso de gradiente.
Una solución es iniciar un nuevo hilo cada vez que se llama a la función plotly.write, pero este método no se siente muy bien. No quiero utilizar un marco de cola de tareas tan grande y completo como Cerely (cola de tareas distribuida), porque el marco es demasiado pesado para mis necesidades y mi dibujo no requiere redis para conservar los datos.
¿Cómo solucionarlo? Escribí una pequeña cola de tareas en Python que puede llamar a la función plotly.write en un hilo separado. A continuación se muestra el código del programa.
fromthreadingimportThreadimportQueueimporttime clase task Queue(queue.queue):
Primero, heredamos la cola. Clase de cola. de la cola. Clase de cola, puede heredar los métodos get y put, así como el comportamiento de la cola.
def__init__(self, num _ trabajadores = 1): cola . _ _ init _ _(self)self .
Al inicializar, podemos ignorar la cantidad de subprocesos de trabajo.
defadd_task(self, tarea, *args, **kwargs): args = args or()kwargs = kwargs o { } self put((tarea, args, kwargs))
Almacenamos tareas, argumentos y kwargs en la cola en forma de tuplas. *args puede pasar un número variable de parámetros y **kwargs puede pasar parámetros con nombre.
def start _ Workers(self):foriinrange(self . num _ Workers):t = Thread(target = self . trabajador)t daemon = True t()
El siguiente es el código de la función de trabajo:
def trabajador(self): while true:tupl = self. get()item, args, kwargs=self.get( ) item( *args, **kwargs) self.task_done()
La función de trabajo coloca la tarea en la parte superior de la cola y la ejecuta basándose en los argumentos de entrada, pero no en otras funciones. El siguiente es el código para la cola:
Podemos probarlo con el siguiente código:
defblokkah(*args, **kwargs):time.sleep(5) print " ¡Blokkah mofo!" q = cola de tareas(num _ trabajadores = 5)for iteminrange(1):q . add _ task(blok kah)q . join()#Espera a que se completen todas las tareas. Imprimir "¡Todo listo!"
Blokkah es el nombre de la tarea que estamos a punto de realizar. La cola ya está almacenada en la memoria caché y no realiza muchas tareas.
El siguiente paso es ejecutar la cola principal como un proceso separado para que las tareas de la cola no dejen de ejecutarse cuando el programa principal sale y realiza la persistencia de la base de datos. Pero este ejemplo muestra cómo escribir un programa tan complejo como una cola de trabajos a partir de una pequeña tarea muy simple.
defgradient_descent(): #Código de descenso de gradiente queue.add_task(plotly.write, x=X, y=Y)
Después de la modificación, mi algoritmo de descenso de gradiente parece ser más eficiente . Aquellos que estén interesados pueden consultar el código a continuación. fromthreadingimportThreadimportQueueimporttime clase tarea Cola(queue.Queue): def__init__(self, num_workers = 1): cola . _ _ init _ _ (self) self num _ trabajadores = num _ trabajadores self . (self, tarea, *args, ** kwargs): args = arg sor() kwargs = kwargsor { } self put ((tarea, args, kwargs)) def start _ Workers (self): foriinrange (self . num _ trabajadores):t = Thread(target = self . trabajador)t daemon = trueq = cola de tareas (num _ trabajadores = 5) para iteminrange(10):q agregue _ task(blokkah)q . todas las tareas se completan y se imprime "¡Todo listo!" if__name__=="__main__":tests()