import logging
import logging.handlers
import multiprocessing
import queue
def worker_process(log_queue):
logger = logging.getLogger('worker')
queue_handler = logging.handlers.QueueHandler(log_queue)
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)
logger.info('Сообщение из потока/процесса')
if __name__ == '__main__':
log_queue = queue.Queue(-1)
listener = logging.handlers.QueueListener(log_queue, logging.StreamHandler()) # или FileHandler
listener.start()
processes = [multiprocessing.Process(target=worker_process, args=(log_queue,)) for _ in range(2)]
for p in processes:
p.start()
for p in processes:
p.join()
listener.stop()
Логирование из нескольких потоков или процессов в Python с использованием модуля `logging` требует особого внимания, так как стандартный модуль `logging` не является потокобезопасным по умолчанию для одновременной записи в один и тот же файл. Это может привести к перемешиванию записей и повреждению файла.
Вот несколько подходов для решения этой задачи:
1. Использование `QueueHandler` и `QueueListener` (предпочтительный подход):
Этот подход обеспечивает потокобезопасное логирование, используя очередь для передачи записей журнала из потоков/процессов в основной поток, который выполняет фактическую запись в файл.
import logging
import logging.handlers
import multiprocessing
import threading
import queue
import time
def worker_process(queue):
logger = logging.getLogger("worker")
handler = logging.handlers.QueueHandler(queue)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
for i in range(5):
logger.info(f"Process {multiprocessing.current_process().name}: Log message {i}")
time.sleep(0.1)
def worker_thread(queue):
logger = logging.getLogger("worker_thread")
handler = logging.handlers.QueueHandler(queue)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
for i in range(5):
logger.info(f"Thread {threading.current_thread().name}: Log message {i}")
time.sleep(0.1)
if __name__ == "__main__":
log_queue = queue.Queue(-1)
# Настройка обработчика для записи в файл (в главном процессе)
file_handler = logging.FileHandler("multiprocess_logging.log")
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
# Настройка QueueListener
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
# Создание и запуск процессов
processes = []
for i in range(2):
process = multiprocessing.Process(target=worker_process, args=(log_queue,), name=f"Process-{i}")
processes.append(process)
process.start()
# Создание и запуск потоков
threads = []
for i in range(2):
thread = threading.Thread(target=worker_thread, args=(log_queue,), name=f"Thread-{i}")
threads.append(thread)
thread.start()
# Ожидание завершения процессов
for process in processes:
process.join()
# Ожидание завершения потоков
for thread in threads:
thread.join()
listener.stop() # Останавливаем QueueListener после завершения работы
Как это работает:
Преимущества:
2. Использование `logging.handlers.RotatingFileHandler` или `logging.handlers.TimedRotatingFileHandler`:
Эти обработчики могут помочь предотвратить повреждение файла, ротируя файлы журналов по размеру или по времени. Однако они не решают проблему конкурентного доступа напрямую.
import logging
import logging.handlers
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Ротация файла по размеру (например, каждые 10MB)
handler = logging.handlers.RotatingFileHandler("my_log.log", maxBytes=10*1024*1024, backupCount=5)
# Или ротация файла по времени (например, каждый день)
# handler = logging.handlers.TimedRotatingFileHandler("my_log.log", when="D", interval=1, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
# Внутри потока или процесса
logger.info("This is a log message from a thread or process.")
Важно: Даже с ротацией файлов, при очень высокой интенсивности записи из множества потоков/процессов могут возникать проблемы. Поэтому рекомендуется использовать `QueueHandler` и `QueueListener` для надежности.
3. Использование блокировок (менее предпочтительно, сложно и потенциально снижает производительность):
Можно использовать блокировку (например, `threading.Lock` или `multiprocessing.Lock`) вокруг операций записи в файл. Однако это может значительно снизить производительность, так как потоки/процессы будут ждать освобождения блокировки.
import logging
import threading
log_lock = threading.Lock()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler("my_log.log")
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def log_message(message):
with log_lock: # захват блокировки перед записью
logger.info(message)
# Внутри потока или процесса:
log_message("This is a log message from a thread.")
Почему блокировки не рекомендуются:
4. Использование сторонних библиотек:
Существуют сторонние библиотеки для логирования, которые могут предоставлять более продвинутые возможности, включая потокобезопасное логирование. Примеры: structlog, loguru.
В заключение: Наиболее рекомендуемый подход – использование `QueueHandler` и `QueueListener`, так как он обеспечивает потокобезопасность, многопроцессорность и не оказывает значительного влияния на производительность рабочих потоков/процессов.