Безопасный обмен данными между потоками с использованием `queue.Queue` в Python строится вокруг нескольких ключевых принципов и техник, обеспечивающих корректную работу в конкурентной среде. Главная цель - избежать состояний гонки (race conditions) и повреждения данных.
Основные механизмы безопасности, предоставляемые `queue.Queue`:
Рекомендации по безопасной организации обмена данными:
Пример:
import threading
import queue
import time
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
def producer(queue, items_to_produce):
for i in range(items_to_produce):
time.sleep(0.1) # Simulate some work
item = f"Item {i}"
logging.info(f"Producing {item}")
queue.put(item)
# Signal completion
for _ in range(NUM_CONSUMERS): # Send a signal for each consumer
queue.put(None)
logging.info("Producer finished.")
def consumer(queue, consumer_id):
while True:
item = queue.get()
if item is None:
logging.info(f"Consumer {consumer_id} received termination signal.")
queue.task_done() # Signal that the None task is done, important for join()
break
logging.info(f"Consumer {consumer_id} processing {item}")
time.sleep(0.2) # Simulate processing time
queue.task_done()
logging.info(f"Consumer {consumer_id} finished.")
if __name__ == "__main__":
NUM_CONSUMERS = 2
NUM_ITEMS = 10
my_queue = queue.Queue()
producer_thread = threading.Thread(target=producer, args=(my_queue, NUM_ITEMS), name="Producer")
consumer_threads = []
for i in range(NUM_CONSUMERS):
consumer_thread = threading.Thread(target=consumer, args=(my_queue, i+1), name=f"Consumer-{i+1}")
consumer_threads.append(consumer_thread)
producer_thread.start()
for consumer_thread in consumer_threads:
consumer_thread.start()
producer_thread.join() #Wait for the producer to complete
for consumer_thread in consumer_threads:
consumer_thread.join() # Wait for all consumers to complete.
print("All threads finished.")
В этом примере производитель помещает данные в очередь, а потребители извлекают и обрабатывают их. После завершения работы производитель добавляет `None` в очередь как сигнал завершения для каждого потребителя. Consumer, получая `None` выходит из цикла. `queue.task_done()` сообщает очереди о завершении задачи, а `queue.join()` ожидает завершения всех задач в очереди. Важно отметить вызов `queue.task_done()` и для "сигнального" элемента `None`.
Важно: Использование `queue.Queue` само по себе не решает всех проблем параллелизма. Необходимо внимательно проектировать взаимодействие между потоками, учитывать возможные взаимные блокировки (deadlocks) и обеспечивать корректную обработку ошибок.