Организация параллельного выполнения задач, избегая блокировок в многозадачных Python-приложениях, требует использования нескольких подходов. Выбор подходящего метода зависит от характера задач и их зависимостей.
1. Многопоточность (Threading):
with
statement): Облегчают управление ресурсами и обеспечивают их гарантированное освобождение, даже при возникновении исключений. Используются, например, для блокировок: with lock: ...
import threading
import queue
import time
def worker(q, lock, result_list):
while True:
item = q.get()
if item is None:
break
# Симулируем долгую операцию
time.sleep(0.1)
with lock: # Защищаем доступ к result_list
result_list.append(item * 2)
q.task_done() #Сообщаем очереди, что задача выполнена.
q = queue.Queue()
lock = threading.Lock()
results = []
threads = []
for i in range(4): # Создаем 4 потока
t = threading.Thread(target=worker, args=(q, lock, results))
threads.append(t)
t.start()
for i in range(10):
q.put(i)
# Останавливаем потоки
for i in range(4):
q.put(None)
# Ждем завершения всех потоков
q.join()
for t in threads:
t.join()
print(results) # Результаты
2. Многопроцессорность (Multiprocessing):
queue.Queue
, так как данные должны быть сериализуемы.multiprocessing.Array
) или объекты (multiprocessing.Value
). Требует синхронизации с помощью блокировок и семафоров.
import multiprocessing
import time
def worker(item):
# Симулируем долгую операцию
time.sleep(0.1)
return item * 2
if __name__ == '__main__': # Обязательно для Windows
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(worker, range(10))
print(results)
3. Асинхронное программирование (asyncio):
async
и await
: Ключевые слова для определения корутин и приостановки их выполнения до завершения асинхронных операций.asyncio.to_thread
или asyncio.create_task
с передачей в отдельный поток/процесс.asyncio
(например, aiohttp
для сетевых запросов).
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, f"https://example.com/{i}") for i in range(5)]
results = await asyncio.gather(*tasks)
print(len(results))
if __name__ == "__main__":
start = time.time()
asyncio.run(main())
end = time.time()
print(f"Время выполнения: {end - start}")
4. Celery (распределенная очередь задач):
В заключение: Для эффективной работы с параллелизмом в Python необходимо учитывать характеристики задач и выбирать соответствующие инструменты. Важно избегать блокировок, используя механизмы синхронизации (блокировки, очереди) и асинхронные подходы, а также правильно проектировать архитектуру приложения.