Concurrencia (principalmente) en Python

2 01 2010

Este post y los que le sigan sobre concurrencia tendran un enfoque práctico, así que para entender varios temas sobre concurrencia en general aunque más específicos a Python, recomiendo leerse el articulo Python Threads and the Global Interpreter Lock. Para una introducción a threads (hilos) en Python también recomiendo Basic Threading in Python .

Este primer post tratará un ejemplo básico de Consumer/Producer utilizando Threads, en otros posts utilizaremos stackless, concurrenceconcurrence y finalmente compararemos con Erlang.

Queue

El modulo Queue implementa una cola multi-productora y multi-consumidora y es una forma sencilla y eficiente de implementar comunicación entre threads. Proporciona 3 tipos de colas Queue (FIFO), LifoQueue (LIFO) y PriorityQueue (por prioridad). Lo que haremos es implementar una cola FIFO (First In First Out), un consumidor que reciba mensajes de la cola, y una clase que produzca los mensajes. Los mensajes en este caso serán objetos tipo threads que imprimen un mensaje por pantalla. Como lo que queremos mostrar es que estos se ejecutan en paralelo, en lugar de imprimirlos en el orden que se introducen a la cola, cada thread tendra un timeout diferente y semi-aleatorio. El codigo es este:

## printer.py

import threading
import time
import Queue
import random

# Cola tipo FIFO
pool = Queue.Queue()

# Objeto que imprime un mensaje despues de 'secs' segundos
# corre en su propio thread.
class Printer(threading.Thread):
    def __init__(self, id, secs):
        self.id = id
        self.secs = secs
        threading.Thread.__init__ ( self )
        
    def run(self):
        time.sleep(self.secs)
        print self.id

# Escucha mensajes que entran en la cola y los ejecuta
class Consumer(threading.Thread):    
    def run(self):
        while True:
            # Obtenemos un cliente de la cola
            client = pool.get()

            # Nos aseguramos que haya un verdadero cliente en la
	    # variable 'client'.
            if client == "exit":
                    sys.exit(1) # Si el mensaje en la cola es el string "exit" terminamos la ejecución
                else:
                    client.start() # Ejecutamos el thread

# Producer que mete mensajes en la cola.
# Los mensajes son threads tipo "Printer"
def thread_factory():
    duration = [1,12,5,8,15]
    i = 0

    # Creamos y ejecutamos el thread consumidor
    consumer = Consumer()
    consumer.start()

    # Producimos 10 mensajes y los metemos en la cola
    while < 10:
        pool.put(Printer("Printer %d" % i, random.choice(duration)))
        i += 1

    pool.put("exit")

if __name__=='__main__':
    thread_factory()

Lo ejecutamos con python printer.py

La función thread_factory elige un numero de segundos al azar a partir de la lista duration y crea 10 threads tipo Printer, sin embargo, los threads no inician su ejecución hasta que consumer los ejecuta mediante client.start(). Lo interesante de esto es que consumer ejecutara cualquier objeto tipo thread que reciba en la cola. Por ejemplo, podemos crear una clase AutoPrinter que determine ella misma el tiempo a dormir antes de imprimir su mensaje:

class AutoPrinter(threading.Thread):
    
    def __init__(self, id):
        self.id = id
        duration = [1,12,5,8,15]
        self.secs = random.choice(duration)
        threading.Thread.__init__ ( self )
        
    def run(self):
        time.sleep(self.secs)
        print self.id

y lo metemos en la cola desde thread_factory:

def thread_factory():
    duration = [1,12,5,8,15]
    i = 0
    consumer = Consumer()
    consumer.start()
    while i<5:
        pool.put(Printer("Printer %d" % i, random.choice(duration)))
        i += 1
    
    while i<10:
        pool.put(AutoPrinter("Auto %d" % i))
        i += 1
    
    pool.put("exit")

Anuncios