Безопасный обмен данными между потоками с использованием `queue.Queue` в Python строится вокруг нескольких ключевых принципов и техник, обеспечивающих корректную работу в конкурентной среде. Главная цель - избежать состояний гонки (race conditions) и повреждения данных.
Основные механизмы безопасности, предоставляемые `queue.Queue`:
Рекомендации по безопасной организации обмена данными:
Пример:
   
 import threading
 import queue
 import time
 import logging
 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(threadName)s - %(levelname)s - %(message)s')
 def producer(queue, items_to_produce):
  for i in range(items_to_produce):
   time.sleep(0.1) # Simulate some work
   item = f"Item {i}"
   logging.info(f"Producing {item}")
   queue.put(item)
  # Signal completion
  for _ in range(NUM_CONSUMERS):  # Send a signal for each consumer
   queue.put(None)
  logging.info("Producer finished.")
 def consumer(queue, consumer_id):
  while True:
   item = queue.get()
   if item is None:
    logging.info(f"Consumer {consumer_id} received termination signal.")
    queue.task_done()  # Signal that the None task is done, important for join()
    break
   logging.info(f"Consumer {consumer_id} processing {item}")
   time.sleep(0.2) # Simulate processing time
   queue.task_done()
  logging.info(f"Consumer {consumer_id} finished.")
 if __name__ == "__main__":
  NUM_CONSUMERS = 2
  NUM_ITEMS = 10
  my_queue = queue.Queue()
  producer_thread = threading.Thread(target=producer, args=(my_queue, NUM_ITEMS), name="Producer")
  consumer_threads = []
  for i in range(NUM_CONSUMERS):
   consumer_thread = threading.Thread(target=consumer, args=(my_queue, i+1), name=f"Consumer-{i+1}")
   consumer_threads.append(consumer_thread)
  producer_thread.start()
  for consumer_thread in consumer_threads:
   consumer_thread.start()
  producer_thread.join()  #Wait for the producer to complete
  for consumer_thread in consumer_threads:
   consumer_thread.join()  # Wait for all consumers to complete.
  print("All threads finished.")
   
  
  В этом примере производитель помещает данные в очередь, а потребители извлекают и обрабатывают их. После завершения работы производитель добавляет `None` в очередь как сигнал завершения для каждого потребителя. Consumer, получая `None` выходит из цикла. `queue.task_done()` сообщает очереди о завершении задачи, а `queue.join()` ожидает завершения всех задач в очереди. Важно отметить вызов `queue.task_done()` и для "сигнального" элемента `None`.
Важно: Использование `queue.Queue` само по себе не решает всех проблем параллелизма. Необходимо внимательно проектировать взаимодействие между потоками, учитывать возможные взаимные блокировки (deadlocks) и обеспечивать корректную обработку ошибок.