Как логировать данные из нескольких потоков или процессов с использованием `logging`?

Для логирования из нескольких потоков или процессов с использованием `logging`, есть несколько подходов:
  • `QueueHandler` и `QueueListener`: Потоки/процессы отправляют записи лога в очередь (`Queue`), а отдельный процесс/поток забирает их из очереди и записывает в файл. Это позволяет избежать конфликтов при записи в файл и повышает производительность.
  • `logging.handlers.SocketHandler` и `logging.handlers.SocketServer`: Потоки/процессы отправляют логи по сети на сервер, который записывает их в файл. Подходит для распределенных систем.
  • `logging.handlers.FileHandler` с блокировкой (multiprocessing.Lock): Использовать `FileHandler` и обернуть операции записи в файл в блокировку (`multiprocessing.Lock`), чтобы обеспечить потокобезопасность. Это простой подход, но может снизить производительность.
  • Использование библиотек, таких как `logstash-async` или `concurrent-log-handler`: Они предоставляют готовые решения для асинхронной и потокобезопасной записи логов.
Пример с `QueueHandler` и `QueueListener`:

    import logging
    import logging.handlers
    import multiprocessing
    import queue

    def worker_process(log_queue):
        logger = logging.getLogger('worker')
        queue_handler = logging.handlers.QueueHandler(log_queue)
        logger.addHandler(queue_handler)
        logger.setLevel(logging.INFO)
        logger.info('Сообщение из потока/процесса')

    if __name__ == '__main__':
        log_queue = queue.Queue(-1)
        listener = logging.handlers.QueueListener(log_queue, logging.StreamHandler()) # или FileHandler
        listener.start()

        processes = [multiprocessing.Process(target=worker_process, args=(log_queue,)) for _ in range(2)]
        for p in processes:
            p.start()
        for p in processes:
            p.join()

        listener.stop()
  

Логирование из нескольких потоков или процессов в Python с использованием модуля `logging` требует особого внимания, так как стандартный модуль `logging` не является потокобезопасным по умолчанию для одновременной записи в один и тот же файл. Это может привести к перемешиванию записей и повреждению файла.

Вот несколько подходов для решения этой задачи:

1. Использование `QueueHandler` и `QueueListener` (предпочтительный подход):

Этот подход обеспечивает потокобезопасное логирование, используя очередь для передачи записей журнала из потоков/процессов в основной поток, который выполняет фактическую запись в файл.


  import logging
  import logging.handlers
  import multiprocessing
  import threading
  import queue
  import time

  def worker_process(queue):
      logger = logging.getLogger("worker")
      handler = logging.handlers.QueueHandler(queue)
      logger.addHandler(handler)
      logger.setLevel(logging.INFO)

      for i in range(5):
          logger.info(f"Process {multiprocessing.current_process().name}: Log message {i}")
          time.sleep(0.1)


  def worker_thread(queue):
      logger = logging.getLogger("worker_thread")
      handler = logging.handlers.QueueHandler(queue)
      logger.addHandler(handler)
      logger.setLevel(logging.INFO)

      for i in range(5):
          logger.info(f"Thread {threading.current_thread().name}: Log message {i}")
          time.sleep(0.1)


  if __name__ == "__main__":
      log_queue = queue.Queue(-1)

      # Настройка обработчика для записи в файл (в главном процессе)
      file_handler = logging.FileHandler("multiprocess_logging.log")
      formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
      file_handler.setFormatter(formatter)

      # Настройка QueueListener
      listener = logging.handlers.QueueListener(log_queue, file_handler)
      listener.start()

      # Создание и запуск процессов
      processes = []
      for i in range(2):
          process = multiprocessing.Process(target=worker_process, args=(log_queue,), name=f"Process-{i}")
          processes.append(process)
          process.start()

      # Создание и запуск потоков
      threads = []
      for i in range(2):
          thread = threading.Thread(target=worker_thread, args=(log_queue,), name=f"Thread-{i}")
          threads.append(thread)
          thread.start()


      # Ожидание завершения процессов
      for process in processes:
          process.join()

      # Ожидание завершения потоков
      for thread in threads:
          thread.join()


      listener.stop()  # Останавливаем QueueListener после завершения работы
  

Как это работает:

  • Каждый процесс/поток создает свой логгер и `QueueHandler`, который отправляет записи в `log_queue`.
  • Главный процесс имеет `QueueListener`, который прослушивает `log_queue` и пересылает записи в `FileHandler`, который фактически записывает их в файл.

Преимущества:

  • Потокобезопасность и многопроцессорность: Записи из разных процессов и потоков не перемешиваются.
  • Обработка логирования происходит в отдельном потоке (через `QueueListener`), что снижает влияние на производительность рабочих потоков/процессов.

2. Использование `logging.handlers.RotatingFileHandler` или `logging.handlers.TimedRotatingFileHandler`:

Эти обработчики могут помочь предотвратить повреждение файла, ротируя файлы журналов по размеру или по времени. Однако они не решают проблему конкурентного доступа напрямую.


  import logging
  import logging.handlers

  logger = logging.getLogger(__name__)
  logger.setLevel(logging.INFO)

  # Ротация файла по размеру (например, каждые 10MB)
  handler = logging.handlers.RotatingFileHandler("my_log.log", maxBytes=10*1024*1024, backupCount=5)

  # Или ротация файла по времени (например, каждый день)
  # handler = logging.handlers.TimedRotatingFileHandler("my_log.log", when="D", interval=1, backupCount=5)

  formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
  handler.setFormatter(formatter)
  logger.addHandler(handler)

  # Внутри потока или процесса
  logger.info("This is a log message from a thread or process.")
  

Важно: Даже с ротацией файлов, при очень высокой интенсивности записи из множества потоков/процессов могут возникать проблемы. Поэтому рекомендуется использовать `QueueHandler` и `QueueListener` для надежности.

3. Использование блокировок (менее предпочтительно, сложно и потенциально снижает производительность):

Можно использовать блокировку (например, `threading.Lock` или `multiprocessing.Lock`) вокруг операций записи в файл. Однако это может значительно снизить производительность, так как потоки/процессы будут ждать освобождения блокировки.


  import logging
  import threading

  log_lock = threading.Lock()
  logger = logging.getLogger(__name__)
  logger.setLevel(logging.INFO)

  file_handler = logging.FileHandler("my_log.log")
  formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
  file_handler.setFormatter(formatter)
  logger.addHandler(file_handler)


  def log_message(message):
      with log_lock: # захват блокировки перед записью
          logger.info(message)

  # Внутри потока или процесса:
  log_message("This is a log message from a thread.")
  

Почему блокировки не рекомендуются:

  • Снижение производительности: Потоки вынуждены ждать доступа к файлу.
  • Сложность: Легко допустить ошибки при управлении блокировками.

4. Использование сторонних библиотек:

Существуют сторонние библиотеки для логирования, которые могут предоставлять более продвинутые возможности, включая потокобезопасное логирование. Примеры: structlog, loguru.

В заключение: Наиболее рекомендуемый подход – использование `QueueHandler` и `QueueListener`, так как он обеспечивает потокобезопасность, многопроцессорность и не оказывает значительного влияния на производительность рабочих потоков/процессов.

0