Как обмениваться данными между процессами с использованием `multiprocessing.Queue()`?

Использовать multiprocessing.Queue() для обмена данными между процессами можно следующим образом:
  1. Создать экземпляр multiprocessing.Queue().
  2. В родительском процессе передать этот экземпляр дочерним процессам.
  3. Дочерние процессы могут использовать метод put() для отправки данных в очередь.
  4. Родительский процесс (или другой дочерний) может использовать метод get() для получения данных из очереди.
  5. Важно: необходимо закрывать очередь после использования (queue.close()) и дожидаться завершения работы процессов (process.join()).

Для обмена данными между процессами в Python с использованием multiprocessing.Queue(), необходимо выполнить следующие шаги:

  1. Создание очереди: Инициализируйте объект Queue. Очередь будет общим ресурсом, доступным для чтения и записи из разных процессов.
    from multiprocessing import Process, Queue
    
    q = Queue()
    
  2. Отправка данных (Producer): В процессе, который отправляет данные (Producer), используйте метод put(item) для добавления данных в очередь.
    def producer(queue):
        for i in range(5):
            queue.put(f"Сообщение от Producer {i}")
            print(f"Producer отправил: Сообщение от Producer {i}")
    
    
    Важно! Можно указать необязательный аргумент block=True и timeout для put(). Если block=True (по умолчанию), процесс будет заблокирован, пока не появится свободное место в очереди (если очередь имеет ограниченный размер). Если block=False, метод вызовет исключение queue.Full, если очередь заполнена. timeout задает максимальное время ожидания в секундах.
  3. Получение данных (Consumer): В процессе, который получает данные (Consumer), используйте метод get() для извлечения данных из очереди.
    def consumer(queue):
        while True:
            item = queue.get()
            if item is None:  # Сигнал завершения
                break
            print(f"Consumer получил: {item}")
    
    
    Важно! Можно указать необязательные аргументы block=True и timeout для get(). Если block=True (по умолчанию), процесс будет заблокирован, пока в очереди не появятся данные. Если block=False, метод вызовет исключение queue.Empty, если очередь пуста. timeout задает максимальное время ожидания в секундах.
  4. Запуск процессов: Создайте и запустите процессы Producer и Consumer, передавая им общую очередь в качестве аргумента.
    if __name__ == '__main__':
        q = Queue()
    
        p1 = Process(target=producer, args=(q,))
        p2 = Process(target=consumer, args=(q,))
    
        p1.start()
        p2.start()
    
        p1.join()  # Дождаться завершения Producer
        q.put(None) # Сигнал Consumer о завершении
        p2.join()  # Дождаться завершения Consumer
    
        print("Обмен данными завершен.")
    

Важные моменты:

  • Queue - это thread- и process-safe структура данных. Это означает, что она предназначена для безопасного использования несколькими потоками или процессами одновременно.
  • После завершения работы процесса Producer, необходимо отправить в очередь специальное значение (например, None), чтобы процесс Consumer мог корректно завершить свою работу. Это сигнализирует о том, что больше данных в очередь не будет добавлено.
  • Размер очереди можно ограничить при создании объекта Queue, указав параметр maxsize. Если очередь заполнена, метод put() будет блокироваться до тех пор, пока не появится свободное место (если block=True).
  • Очередь автоматически закрывается при удалении её объекта. Если вы хотите закрыть очередь вручную, используйте метод close(). После закрытия очереди дальнейшие операции put() и get() вызовут исключение.

Полный пример кода:

from multiprocessing import Process, Queue

def producer(queue):
    for i in range(5):
        message = f"Сообщение от Producer {i}"
        queue.put(message)
        print(f"Producer отправил: {message}")

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break  # Сигнал завершения
        print(f"Consumer получил: {item}")

if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    q.put(None) # Сигнал завершения
    p2.join()

    print("Обмен данными завершен.")
0