Использование модуля multiprocessing
в Python позволяет эффективно реализовывать параллельное выполнение задач и, следовательно, паттерны проектирования, такие как "Процесс-Рабочий" и "Пул рабочих процессов". Вот как это можно сделать:
В этом паттерне главный процесс (мастер) создает дочерние процессы (рабочие), которые выполняют определенные задачи. Главный процесс обычно распределяет задачи между рабочими и собирает результаты. Этот паттерн полезен, когда у вас есть набор независимых задач, которые можно выполнять параллельно.
import multiprocessing
import time
def worker_process(task_queue, result_queue):
"""Функция, выполняемая каждым рабочим процессом."""
while True:
task = task_queue.get()
if task is None: # Сигнал остановки
break
result = process_task(task) # Ваша функция обработки задачи
result_queue.put(result)
def process_task(task):
"""Функция обработки задачи. Замените на свою логику."""
print(f"Обработка задачи: {task} в процессе {multiprocessing.current_process().name}")
time.sleep(1) # Имитация сложной задачи
return f"Результат для {task}"
if __name__ == '__main__':
num_workers = 4
tasks = [f"Задача {i}" for i in range(10)]
# Создание очередей для задач и результатов
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
# Создание рабочих процессов
processes = []
for i in range(num_workers):
p = multiprocessing.Process(target=worker_process, args=(task_queue, result_queue), name=f"Worker-{i}")
processes.append(p)
p.start()
# Добавление задач в очередь
for task in tasks:
task_queue.put(task)
# Отправка сигналов остановки рабочим процессам
for _ in range(num_workers):
task_queue.put(None)
# Ожидание завершения всех процессов
for p in processes:
p.join()
# Сбор результатов
results = []
while not result_queue.empty():
results.append(result_queue.get())
print("Все задачи выполнены.")
print("Результаты:", results)
Пул рабочих процессов предоставляет способ повторного использования рабочих процессов для выполнения нескольких задач. Модуль multiprocessing
предлагает удобный класс Pool
, который упрощает создание и управление пулом рабочих процессов.
import multiprocessing
import time
def process_task(task):
"""Функция обработки задачи. Замените на свою логику."""
print(f"Обработка задачи: {task} в процессе {multiprocessing.current_process().name}")
time.sleep(1) # Имитация сложной задачи
return f"Результат для {task}"
if __name__ == '__main__':
num_workers = multiprocessing.cpu_count() # Использование всех ядер процессора
tasks = [f"Задача {i}" for i in range(10)]
# Создание пула рабочих процессов
with multiprocessing.Pool(processes=num_workers) as pool:
# Применение функции process_task к каждой задаче в пуле
results = pool.map(process_task, tasks)
# (Явное закрытие пула не обязательно при использовании 'with')
# pool.close()
# pool.join() # Ожидание завершения всех процессов
print("Все задачи выполнены.")
print("Результаты:", results)
Ключевые моменты:
multiprocessing.Queue
): Используются для безопасной передачи данных между процессами.multiprocessing.Process
: Создает и управляет процессами.multiprocessing.Pool
: Упрощает управление пулом рабочих процессов. Методы map
, apply
, apply_async
, imap
позволяют распределять задачи между рабочими процессами.multiprocessing.cpu_count()
: Возвращает количество ядер процессора, что полезно для определения оптимального количества рабочих процессов.multiprocessing.Lock
) или другие механизмы синхронизации при необходимости.with multiprocessing.Pool(...) as pool:
обеспечивает автоматическое закрытие и ожидание завершения всех процессов в пуле после завершения работы. Это упрощает код и предотвращает утечки ресурсов.