multiprocessing.Queue()
для обмена данными между процессами можно следующим образом:
multiprocessing.Queue()
.put()
для отправки данных в очередь.get()
для получения данных из очереди.queue.close()
) и дожидаться завершения работы процессов (process.join()
).Для обмена данными между процессами в Python с использованием multiprocessing.Queue()
, необходимо выполнить следующие шаги:
Queue
. Очередь будет общим ресурсом, доступным для чтения и записи из разных процессов.
from multiprocessing import Process, Queue
q = Queue()
put(item)
для добавления данных в очередь.
def producer(queue):
for i in range(5):
queue.put(f"Сообщение от Producer {i}")
print(f"Producer отправил: Сообщение от Producer {i}")
Важно! Можно указать необязательный аргумент block=True
и timeout
для put()
. Если block=True
(по умолчанию), процесс будет заблокирован, пока не появится свободное место в очереди (если очередь имеет ограниченный размер). Если block=False
, метод вызовет исключение queue.Full
, если очередь заполнена. timeout
задает максимальное время ожидания в секундах.
get()
для извлечения данных из очереди.
def consumer(queue):
while True:
item = queue.get()
if item is None: # Сигнал завершения
break
print(f"Consumer получил: {item}")
Важно! Можно указать необязательные аргументы block=True
и timeout
для get()
. Если block=True
(по умолчанию), процесс будет заблокирован, пока в очереди не появятся данные. Если block=False
, метод вызовет исключение queue.Empty
, если очередь пуста. timeout
задает максимальное время ожидания в секундах.
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join() # Дождаться завершения Producer
q.put(None) # Сигнал Consumer о завершении
p2.join() # Дождаться завершения Consumer
print("Обмен данными завершен.")
Важные моменты:
Queue
- это thread- и process-safe структура данных. Это означает, что она предназначена для безопасного использования несколькими потоками или процессами одновременно.None
), чтобы процесс Consumer мог корректно завершить свою работу. Это сигнализирует о том, что больше данных в очередь не будет добавлено.Queue
, указав параметр maxsize
. Если очередь заполнена, метод put()
будет блокироваться до тех пор, пока не появится свободное место (если block=True
).close()
. После закрытия очереди дальнейшие операции put()
и get()
вызовут исключение.Полный пример кода:
from multiprocessing import Process, Queue
def producer(queue):
for i in range(5):
message = f"Сообщение от Producer {i}"
queue.put(message)
print(f"Producer отправил: {message}")
def consumer(queue):
while True:
item = queue.get()
if item is None:
break # Сигнал завершения
print(f"Consumer получил: {item}")
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
q.put(None) # Сигнал завершения
p2.join()
print("Обмен данными завершен.")