multiprocessing
.
Вот пример:
import multiprocessing
def task(item):
# Обработка задачи item
return item * 2
if __name__ == '__main__':
pool_size = multiprocessing.cpu_count() # Рекомендуется использовать количество ядер CPU
items = range(10)
with multiprocessing.Pool(processes=pool_size) as pool:
results = pool.map(task, items) # Выполняет task для каждого элемента в items
print(results) # Вывод результатов обработки
В этом примере multiprocessing.Pool
создает пул из pool_size
процессов. Метод pool.map
применяет функцию task
к каждому элементу в списке items
, распределяя задачи между процессами в пуле. Важно оборачивать код создания пула в if __name__ == '__main__':
из-за особенностей работы multiprocessing в Windows и macOS.
Для запуска пула процессов с определенным количеством процессов в Python можно использовать модуль multiprocessing
, в частности класс multiprocessing.Pool
.
Вот основные шаги и пример кода:
multiprocessing
:import multiprocessing
def process_task(data):
# Здесь код обработки данных
result = data * 2 # Пример обработки: умножение на 2
return result
Pool
, указав количество процессов:num_processes = 4 # Например, 4 процесса
pool = multiprocessing.Pool(processes=num_processes)
Существует несколько способов передачи задач:
pool.map(function, iterable)
: Применяет функцию к каждому элементу итерируемого объекта. Результаты возвращаются в виде списка, сохраняя порядок ввода.pool.apply(function, args=(), kwds={})
: Применяет функцию с заданными аргументами к одному процессу. Чаще используется для задач, где не важен порядок и нужно дождаться завершения одной задачи, прежде чем перейти к следующей. Блокирует основной поток, пока задача не будет выполнена.pool.apply_async(function, args=(), kwds={}, callback=None)
: Аналогичен pool.apply
, но выполняется асинхронно. Позволяет продолжить выполнение основного потока, а результат будет доступен позже. Можно указать функцию обратного вызова (callback
), которая будет выполнена после завершения задачи.pool.imap(function, iterable, chunksize=1)
: Ленивая версия pool.map
. Возвращает итератор, который выдает результаты по мере их готовности. chunksize
определяет, сколько задач за раз передается одному процессу.pool.imap_unordered(function, iterable, chunksize=1)
: Похож на pool.imap
, но результаты возвращаются в произвольном порядке, по мере их готовности. Это может быть более эффективным, если порядок не важен.Пример использования pool.map
:
data = [1, 2, 3, 4, 5]
results = pool.map(process_task, data)
print(results) # Выведет: [2, 4, 6, 8, 10]
Пример использования pool.apply_async
:
def callback(result):
print(f"Результат: {result}")
results = []
for i in range(5):
result = pool.apply_async(process_task, args=(i,), callback=callback)
results.append(result)
# Необходимо дождаться завершения всех задач.
pool.close()
pool.join()
# results теперь содержит AsyncResult объекты, а callback функция уже вывела результаты.
pool.close()
pool.join()
pool.close()
сообщает пулу, что больше не будет новых задач.
pool.join()
ждет завершения всех задач в пуле. Важно вызвать pool.close()
перед pool.join()
, иначе программа может зависнуть.
Полный пример кода:
import multiprocessing
def process_task(data):
"""Функция, обрабатывающая данные."""
result = data * 2
print(f"Обработка {data} в процессе с ID {multiprocessing.current_process().pid}")
return result
if __name__ == '__main__':
num_processes = 4
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
with multiprocessing.Pool(processes=num_processes) as pool:
results = pool.map(process_task, data)
print(f"Результаты: {results}")
Важные моменты:
__main__
: При использовании multiprocessing
в Windows необходимо поместить код, использующий пул процессов, в блок if __name__ == '__main__':
. Это связано с тем, как Windows запускает процессы. В Unix-подобных системах это не обязательно, но рекомендуется для переносимости.