multiprocessing.Queue() для обмена данными между процессами можно следующим образом:
multiprocessing.Queue().put() для отправки данных в очередь.get() для получения данных из очереди.queue.close()) и дожидаться завершения работы процессов (process.join()).Для обмена данными между процессами в Python с использованием multiprocessing.Queue(), необходимо выполнить следующие шаги:
Queue. Очередь будет общим ресурсом, доступным для чтения и записи из разных процессов.
from multiprocessing import Process, Queue
q = Queue()
put(item) для добавления данных в очередь.
def producer(queue):
for i in range(5):
queue.put(f"Сообщение от Producer {i}")
print(f"Producer отправил: Сообщение от Producer {i}")
Важно! Можно указать необязательный аргумент block=True и timeout для put(). Если block=True (по умолчанию), процесс будет заблокирован, пока не появится свободное место в очереди (если очередь имеет ограниченный размер). Если block=False, метод вызовет исключение queue.Full, если очередь заполнена. timeout задает максимальное время ожидания в секундах.
get() для извлечения данных из очереди.
def consumer(queue):
while True:
item = queue.get()
if item is None: # Сигнал завершения
break
print(f"Consumer получил: {item}")
Важно! Можно указать необязательные аргументы block=True и timeout для get(). Если block=True (по умолчанию), процесс будет заблокирован, пока в очереди не появятся данные. Если block=False, метод вызовет исключение queue.Empty, если очередь пуста. timeout задает максимальное время ожидания в секундах.
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join() # Дождаться завершения Producer
q.put(None) # Сигнал Consumer о завершении
p2.join() # Дождаться завершения Consumer
print("Обмен данными завершен.")
Важные моменты:
Queue - это thread- и process-safe структура данных. Это означает, что она предназначена для безопасного использования несколькими потоками или процессами одновременно.None), чтобы процесс Consumer мог корректно завершить свою работу. Это сигнализирует о том, что больше данных в очередь не будет добавлено.Queue, указав параметр maxsize. Если очередь заполнена, метод put() будет блокироваться до тех пор, пока не появится свободное место (если block=True).close(). После закрытия очереди дальнейшие операции put() и get() вызовут исключение.Полный пример кода:
from multiprocessing import Process, Queue
def producer(queue):
for i in range(5):
message = f"Сообщение от Producer {i}"
queue.put(message)
print(f"Producer отправил: {message}")
def consumer(queue):
while True:
item = queue.get()
if item is None:
break # Сигнал завершения
print(f"Consumer получил: {item}")
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None) # Сигнал завершения
p2.join()
print("Обмен данными завершен.")