Эффективная обработка ошибок в многопроцессной среде Python включает:
multiprocessing.Queue
): Передача информации об ошибках от дочерних процессов родительскому.logging
модуль).multiprocessing.Pool.apply_async
с error_callback
): Обработка исключений, возникших в процессах пула.try...except
блоков внутри каждого процесса для локализации и обработки исключений.Комбинирование этих техник позволяет надежно отслеживать и обрабатывать ошибки, возникшие в дочерних процессах.
При работе с несколькими процессами в Python обработка ошибок становится особенно важной, так как ошибки в одном процессе не должны приводить к падению всей программы. Вот несколько эффективных стратегий:
import multiprocessing
import traceback
import sys
def worker(queue):
try:
# Код, который может вызвать исключение
result = 1 / 0
queue.put("OK")
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
formatted_traceback = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
queue.put((False, str(e), formatted_traceback)) # (success, message, traceback)
if __name__ == '__main__':
error_queue = multiprocessing.Queue()
process = multiprocessing.Process(target=worker, args=(error_queue,))
process.start()
process.join()
if not error_queue.empty():
result = error_queue.get()
if isinstance(result, tuple) and not result[0]:
print("Ошибка в дочернем процессе:")
print(f"Сообщение: {result[1]}")
print(f"Трассировка:\n{result[2]}")
else:
print("Процесс завершился успешно.")
import multiprocessing
def worker(x):
if x == 0:
raise ValueError("Деление на ноль!")
return 1 / x
def success_callback(result):
print(f"Результат: {result}")
def error_callback(exception):
print(f"Ошибка: {exception}")
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4) # Используйте столько процессов, сколько нужно
for i in range(-2, 3):
pool.apply_async(worker, args=(i,), callback=success_callback, error_callback=error_callback)
pool.close()
pool.join() # Дождаться завершения всех процессов
import multiprocessing
import signal
import os
import sys
import traceback
def signal_handler(signum, frame):
print(f"Главный процесс получил сигнал {signum}")
# Здесь можно выполнить логику обработки сигнала, например, перезапустить дочерний процесс
sys.exit(1) # Завершаем главный процесс (или перезапускаем дочерний)
def worker():
try:
# Код, который может вызвать исключение
result = 1 / 0
except Exception as e:
# Отправляем сигнал родителю
os.kill(os.getppid(), signal.SIGUSR1) # Предполагаем, что родитель перехватывает SIGUSR1
exc_type, exc_value, exc_traceback = sys.exc_info()
formatted_traceback = ''.join(traceback.format_exception(exc_type, exc_value, exc_traceback))
print(f"Ошибка в дочернем процессе: {e}\nТрассировка:\n{formatted_traceback}")
sys.exit(1)
if __name__ == '__main__':
signal.signal(signal.SIGUSR1, signal_handler) # Устанавливаем обработчик сигнала в главном процессе
process = multiprocessing.Process(target=worker)
process.start()
process.join()
print("Главный процесс завершил работу.")
import logging
import logging.handlers
import multiprocessing
import time
def worker(queue):
# Настройка логгера в дочернем процессе
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Установите нужный уровень
handler = logging.handlers.QueueHandler(queue)
logger.addHandler(handler)
try:
result = 1 / 0 # Вызываем ошибку
except Exception as e:
logger.exception("Произошла ошибка!") # Записываем информацию об ошибке
logger.info("Дочерний процесс завершил работу.")
def listener_process(queue):
# Настройка логгера в процессе-слушателе
logger = logging.getLogger()
logger.setLevel(logging.DEBUG) # Установите нужный уровень
# Файловый обработчик для записи логов в файл
file_handler = logging.FileHandler("multiprocessing.log")
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
listener = logging.handlers.QueueListener(queue, *logger.handlers)
listener.start()
listener.join()
if __name__ == '__main__':
log_queue = multiprocessing.Queue(-1)
# Запускаем процесс-слушатель
listener = multiprocessing.Process(target=listener_process, args=(log_queue,))
listener.start()
# Запускаем дочерние процессы
processes = []
for i in range(2):
p = multiprocessing.Process(target=worker, args=(log_queue,))
processes.append(p)
p.start()
for p in processes:
p.join()
log_queue.put_nowait(None) # Сигнализируем слушателю о завершении
listener.join()
print("Главный процесс завершил работу.")
Рекомендации: