multiprocessing.Pool():
Pool(processes=N) - создает пул из N процессов. Если N не указано, то используется количество ядер CPU.pool.apply(func, args) - выполняет функцию func с аргументами args в одном из процессов пула. Блокирует выполнение, пока функция не завершится.pool.apply_async(func, args, callback) - аналогично apply, но выполняется асинхронно, не блокируя. callback - функция, вызываемая по завершении задачи.pool.map(func, iterable) - применяет функцию func к каждому элементу из iterable параллельно. Возвращает список результатов в том же порядке, что и входные данные.pool.map_async(func, iterable, callback) - асинхронный вариант map.pool.close() - закрывает пул, запрещая добавление новых задач.pool.join() - ждет завершения всех задач в пуле. Нужно вызывать после pool.close().
from multiprocessing import Pool
def square(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool:
results = pool.map(square, [1, 2, 3, 4, 5])
print(results) # Вывод: [1, 4, 9, 16, 25]
Пул процессов (Process Pool) - это механизм для управления группой процессов, которые выполняют задачи параллельно. Он позволяет переиспользовать процессы, сокращая накладные расходы на их создание и уничтожение для каждой задачи. Вместо того, чтобы создавать новый процесс для каждой задачи, задачи отправляются в пул свободных процессов, где они выполняются. Когда задача завершена, процесс возвращается в пул и готов для выполнения следующей задачи.
В Python модуль multiprocessing предоставляет класс Pool для реализации пулов процессов. Он значительно упрощает параллельное выполнение задач, особенно когда у вас есть большое количество независимых вычислений.
Как использовать multiprocessing.Pool():
multiprocessing:
import multiprocessing
Pool:
Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None, context=None)
processes (опционально): Количество рабочих процессов в пуле. Если не указано, используется количество ядер ЦП.initializer (опционально): Функция, которая будет вызвана в начале каждого рабочего процесса.initargs (опционально): Аргументы, передаваемые в initializer.maxtasksperchild (опционально): Максимальное количество задач, которые может выполнить один рабочий процесс, прежде чем он будет заменен новым. Полезно для предотвращения утечек памяти.context (опционально): Используемый контекст запуска процессов.pool = multiprocessing.Pool(processes=4) # Создаем пул с 4 процессами
apply, apply_async, map, map_async, imap, imap_unordered:
apply(func, args=(), kwds={}): Выполняет функцию func с аргументами args и возвращает результат. Блокирует вызывающий процесс до завершения задачи. (Обычно не используется, т.к. блокирующий).apply_async(func, args=(), kwds={}, callback=None, error_callback=None): Выполняет функцию func асинхронно. Возвращает объект AsyncResult, который можно использовать для получения результата. Не блокирует.map(func, iterable, chunksize=None): Применяет функцию func к каждому элементу итерируемого объекта iterable и возвращает список результатов. Блокирует до завершения всех задач.map_async(func, iterable, chunksize=None, callback=None, error_callback=None): Применяет функцию func к каждому элементу итерируемого объекта iterable асинхронно. Возвращает объект AsyncResult. Не блокирует.imap(func, iterable, chunksize=1): Возвращает итератор, применяющий функцию func к каждому элементу итерируемого объекта iterable. Возвращает результаты по мере их готовности, не дожидаясь завершения всех задач. Полезно для больших объемов данных.imap_unordered(func, iterable, chunksize=1): Похож на imap, но возвращает результаты в произвольном порядке (по мере завершения задач), что может быть быстрее.results = pool.map(my_function, my_list) # Применяем my_function к каждому элементу my_list
pool.close()
pool.join() # Дождаться завершения всех процессов в пуле
pool.close() говорит пулу, что больше задач не будет отправлено. pool.join() ждет завершения всех процессов в пуле. Важно вызывать close() перед join(), иначе пул может зависнуть.
Пример:
import multiprocessing
import time
def square(x):
time.sleep(1) # Имитируем долгую операцию
return x * x
if __name__ == '__main__':
# Создаем пул с 4 процессами
with multiprocessing.Pool(processes=4) as pool:
# Список чисел для обработки
numbers = [1, 2, 3, 4, 5, 6, 7, 8]
# Используем map для параллельного вычисления квадратов
results = pool.map(square, numbers)
# Выводим результаты
print(f"Квадраты чисел: {results}")
# Пул автоматически закрывается при выходе из блока 'with' и ждет завершения всех процессов
Преимущества использования пула процессов:
Когда следует использовать пул процессов:
Важно помнить о синхронизации данных и избегать гонок данных при работе с пулом процессов, особенно если процессы совместно используют ресурсы. Для этого можно использовать механизмы синхронизации, предоставляемые модулем multiprocessing, такие как блокировки (Lock), семафоры (Semaphore) и очереди (Queue).