multiprocessing.Pool()
:
Pool(processes=N)
- создает пул из N процессов. Если N не указано, то используется количество ядер CPU.pool.apply(func, args)
- выполняет функцию func
с аргументами args
в одном из процессов пула. Блокирует выполнение, пока функция не завершится.pool.apply_async(func, args, callback)
- аналогично apply
, но выполняется асинхронно, не блокируя. callback
- функция, вызываемая по завершении задачи.pool.map(func, iterable)
- применяет функцию func
к каждому элементу из iterable
параллельно. Возвращает список результатов в том же порядке, что и входные данные.pool.map_async(func, iterable, callback)
- асинхронный вариант map
.pool.close()
- закрывает пул, запрещая добавление новых задач.pool.join()
- ждет завершения всех задач в пуле. Нужно вызывать после pool.close()
.
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
results = pool.map(square, [1, 2, 3, 4, 5])
print(results) # Вывод: [1, 4, 9, 16, 25]
Пул процессов (Process Pool) - это механизм для управления группой процессов, которые выполняют задачи параллельно. Он позволяет переиспользовать процессы, сокращая накладные расходы на их создание и уничтожение для каждой задачи. Вместо того, чтобы создавать новый процесс для каждой задачи, задачи отправляются в пул свободных процессов, где они выполняются. Когда задача завершена, процесс возвращается в пул и готов для выполнения следующей задачи.
В Python модуль multiprocessing
предоставляет класс Pool
для реализации пулов процессов. Он значительно упрощает параллельное выполнение задач, особенно когда у вас есть большое количество независимых вычислений.
Как использовать multiprocessing.Pool()
:
multiprocessing
:
import multiprocessing
Pool
:
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
processes
(опционально): Количество рабочих процессов в пуле. Если не указано, используется количество ядер ЦП.initializer
(опционально): Функция, которая будет вызвана в начале каждого рабочего процесса.initargs
(опционально): Аргументы, передаваемые в initializer
.maxtasksperchild
(опционально): Максимальное количество задач, которые может выполнить один рабочий процесс, прежде чем он будет заменен новым. Полезно для предотвращения утечек памяти.context
(опционально): Используемый контекст запуска процессов.pool = multiprocessing.Pool(processes=4) # Создаем пул с 4 процессами
apply
, apply_async
, map
, map_async
, imap
, imap_unordered
:
apply(func, args=(), kwds={})
: Выполняет функцию func
с аргументами args
и возвращает результат. Блокирует вызывающий процесс до завершения задачи. (Обычно не используется, т.к. блокирующий).apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
: Выполняет функцию func
асинхронно. Возвращает объект AsyncResult
, который можно использовать для получения результата. Не блокирует.map(func, iterable, chunksize=None)
: Применяет функцию func
к каждому элементу итерируемого объекта iterable
и возвращает список результатов. Блокирует до завершения всех задач.map_async(func, iterable, chunksize=None, callback=None, error_callback=None)
: Применяет функцию func
к каждому элементу итерируемого объекта iterable
асинхронно. Возвращает объект AsyncResult
. Не блокирует.imap(func, iterable, chunksize=1)
: Возвращает итератор, применяющий функцию func
к каждому элементу итерируемого объекта iterable
. Возвращает результаты по мере их готовности, не дожидаясь завершения всех задач. Полезно для больших объемов данных.imap_unordered(func, iterable, chunksize=1)
: Похож на imap
, но возвращает результаты в произвольном порядке (по мере завершения задач), что может быть быстрее.results = pool.map(my_function, my_list) # Применяем my_function к каждому элементу my_list
pool.close()
pool.join() # Дождаться завершения всех процессов в пуле
pool.close()
говорит пулу, что больше задач не будет отправлено. pool.join()
ждет завершения всех процессов в пуле. Важно вызывать close()
перед join()
, иначе пул может зависнуть.
Пример:
import multiprocessing
import time
def square(x):
time.sleep(1) # Имитируем долгую операцию
return x * x
if __name__ == '__main__':
# Создаем пул с 4 процессами
with multiprocessing.Pool(processes=4) as pool:
# Список чисел для обработки
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
# Используем map для параллельного вычисления квадратов
results = pool.map(square, numbers)
# Выводим результаты
print(f"Квадраты чисел: {results}")
# Пул автоматически закрывается при выходе из блока 'with' и ждет завершения всех процессов
Преимущества использования пула процессов:
Когда следует использовать пул процессов:
Важно помнить о синхронизации данных и избегать гонок данных при работе с пулом процессов, особенно если процессы совместно используют ресурсы. Для этого можно использовать механизмы синхронизации, предоставляемые модулем multiprocessing
, такие как блокировки (Lock
), семафоры (Semaphore
) и очереди (Queue
).