try...except для перехвата пользовательских исключений и их локальной обработки.threading.Lock). Поток/задача, поймав исключение, устанавливает переменную и освобождает блокировку, а основной поток периодически проверяет эту переменную.Использование пользовательских исключений в многозадачных (многопоточных или асинхронных) программах на Python требует особого внимания из-за особенностей обработки исключений в контексте параллельного выполнения.
Основные моменты, которые следует учитывать:
try...except для обработки возможных исключений. Необрабатываемые исключения могут привести к аварийному завершению потока/задачи, не затрагивая остальные (в случае потоков) или к прерыванию асинхронного цикла (в случае asyncio).
    Примеры использования:
Многопоточность (threading):
import threading
import logging
import time
class CustomError(Exception):
    pass
def worker(data):
    try:
        # Выполняем какие-то действия, которые могут привести к исключению
        if data < 0:
            raise CustomError("Данные должны быть положительными")
        result = 10 / data # Может вызвать ZeroDivisionError
        print(f"Результат: {result} в потоке {threading.current_thread().name}")
    except CustomError as e:
        logging.error(f"Пользовательское исключение в потоке {threading.current_thread().name}: {e}")
    except ZeroDivisionError as e:
        logging.error(f"Ошибка деления на ноль в потоке {threading.current_thread().name}: {e}")
    except Exception as e:
        logging.exception(f"Непредвиденное исключение в потоке {threading.current_thread().name}: {e}")
def main():
    logging.basicConfig(level=logging.ERROR)
    threads = []
    data_list = [2, 0, -1, 5]
    for data in data_list:
        t = threading.Thread(target=worker, args=(data,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join() # Ждем завершения всех потоков
if __name__ == "__main__":
    main()
  Асинхронность (asyncio):
import asyncio
import logging
class CustomError(Exception):
    pass
async def worker(data):
    try:
        if data < 0:
            raise CustomError("Данные должны быть положительными")
        result = 10 / data
        print(f"Результат: {result}")
        return result
    except CustomError as e:
        logging.error(f"Пользовательское исключение: {e}")
        return None
    except ZeroDivisionError as e:
        logging.error(f"Ошибка деления на ноль: {e}")
        return None
    except Exception as e:
        logging.exception(f"Непредвиденное исключение: {e}")
        return None
async def main():
    logging.basicConfig(level=logging.ERROR)
    tasks = []
    data_list = [2, 0, -1, 5]
    for data in data_list:
        tasks.append(asyncio.create_task(worker(data)))
    results = await asyncio.gather(*tasks, return_exceptions=True) # Перехватываем исключения от задач
    for result in results:
      if isinstance(result, Exception):
          logging.error(f"Задача завершилась с исключением: {result}")
if __name__ == "__main__":
    asyncio.run(main())
  Ключевые моменты в примерах:
CustomError.
      try...except.
      logging, включая трассировку стека (в примере многопоточности).
      asyncio используется asyncio.gather(..., return_exceptions=True), чтобы перехватить исключения, возникшие в задачах, и обработать их.
      Дополнительные рекомендации:
queue.Queue для потоков, asyncio.Queue для асинхронности).
    concurrent.futures:  Для упрощения работы с многопоточностью и многопроцессорностью можно использовать модуль concurrent.futures, который предоставляет более высокоуровневый интерфейс. Он позволяет получать результаты работы задач, в том числе и исключения, через объекты Future.
    В заключение, использование пользовательских исключений в многозадачных программах требует тщательного планирования и обработки исключений на каждом уровне параллельного выполнения, чтобы обеспечить стабильность и надежность приложения.