Потоки. Процессы. Асинхронность. Параллельное суммирование и парсинг веб-страниц с сохранением в БД.
Понять отличия между потоками (threading), процессами (multiprocessing) и асинхронностью (asyncio) в Python. Научиться применять каждый подход для параллельного выполнения задач — как вычислительных (CPU-bound), так и ввода-вывода (I/O-bound).
| Компонент | Технология | Назначение |
|---|---|---|
| Потоки | threading | Параллелизм в одном процессе |
| Процессы | multiprocessing | Обход GIL через отдельные процессы |
| Асинхронность | asyncio + aiohttp | Кооперативная многозадачность, I/O |
| HTTP (синхр.) | requests | HTTP-запросы в threading/multiprocessing |
| Парсинг HTML | beautifulsoup4 | Извлечение <title> из HTML |
| ORM | SQLAlchemy 2.0 async | Async-доступ к PostgreSQL |
| Драйвер БД (async) | asyncpg | Асинхронный драйвер PostgreSQL |
| Драйвер БД (sync) | psycopg2 | Синхронный драйвер для threading/multiprocessing |
| Окружение | python-dotenv | Переменные окружения из .env |
| Инфраструктура | Docker Compose | Запуск PostgreSQL на конфигурируемом порту |
Вычисление суммы всех целых чисел от 1 до 1 000 000 000 с разбивкой на 8 подзадач.
# task1/threading_sum.py
import threading
N = 1_000_000_000
NUM_WORKERS = 8
def partial_sum(start, end, results, idx):
s = 0
for i in range(start, end + 1):
s += i
results[idx] = s
def main():
chunk_size = N // NUM_WORKERS
results = [0] * NUM_WORKERS
threads = []
for i in range(NUM_WORKERS):
start = i * chunk_size + 1
end = (i + 1) * chunk_size if i < NUM_WORKERS - 1 else N
t = threading.Thread(target=partial_sum, args=(start, end, results, i))
threads.append(t)
t.start()
for t in threads:
t.join()
total = sum(results)
print(f"Result: {total:,} Correct: {total == N*(N+1)//2}")
# task1/multiprocessing_sum.py
from multiprocessing import Pool
def partial_sum(args):
start, end = args
s = 0
for i in range(start, end + 1):
s += i
return s
def main():
ranges = [(i*chunk+1, (i+1)*chunk) for i in range(8)]
with Pool(processes=8) as pool:
partial_sums = pool.map(partial_sum, ranges)
total = sum(partial_sums)
# task1/async_sum.py
import asyncio
def partial_sum(start, end):
s = 0
for i in range(start, end + 1):
s += i
return s
async def main():
tasks = [asyncio.to_thread(partial_sum, start, end) for ...]
partial_sums = await asyncio.gather(*tasks)
total = sum(partial_sums)
| Подход | Время | Механизм |
|---|---|---|
threading | 22.46 с | Потоки, GIL ограничивает CPU-работу |
multiprocessing | 3.76 с | Отдельные процессы, реальный параллелизм (~6× быстрее) |
async (to_thread) | 21.08 с | Делегирование в потоки через asyncio |
Замеры на 8-ядерном CPU. Результат: 500 000 000 500 000 000 ✓ (проверено формулой n·(n+1)/2).
Парсинг 10 веб-страниц, извлечение заголовков <title>, сохранение в PostgreSQL.
# task2/threading_parser.py
from concurrent.futures import ThreadPoolExecutor
import requests, psycopg2
from bs4 import BeautifulSoup
def parse_and_save(url):
resp = requests.get(url, timeout=10, headers={"User-Agent": "Mozilla/5.0"})
soup = BeautifulSoup(resp.text, "html.parser")
title = soup.title.string.strip()
conn = psycopg2.connect(SYNC_DSN)
cur = conn.cursor()
cur.execute("INSERT INTO parsed_page (url, title) VALUES (%s, %s) RETURNING id", (url, title))
row_id = cur.fetchone()[0]
conn.commit()
return {"id": row_id, "url": url, "title": title}
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(parse_and_save, url): url for url in URLS}
# task2/multiprocessing_parser.py
from multiprocessing import Pool
def parse_and_save(url):
# Идентичная логика с threading_parser
# Каждый процесс открывает своё подключение к БД
...
with Pool(processes=5) as pool:
results = pool.map(parse_and_save, URLS)
# task2/async_parser.py
import aiohttp, asyncio
from database import async_session_maker, init_db
from models import ParsedPage
async def fetch_title(session, url):
async with session.get(url, timeout=..., headers={"User-Agent": "Mozilla/5.0"}) as resp:
html = await resp.text()
soup = BeautifulSoup(html, "html.parser")
return soup.title.string.strip()
async def parse_and_save(session, sem, url):
async with sem:
title = await fetch_title(session, url)
async with async_session_maker() as db:
page = ParsedPage(url=url, title=title)
db.add(page)
await db.commit()
return {"id": page.id, "url": url, "title": title}
async def main():
await init_db()
async with aiohttp.ClientSession() as session:
tasks = [parse_and_save(session, sem, url) for url in URLS]
results = await asyncio.gather(*tasks)
Wikipedia требует заголовок User-Agent, иначе возвращает 403.
| Подход | Время | HTTP | БД |
|---|---|---|---|
| threading | 1.67 с | requests | psycopg2 |
| multiprocessing | 1.45 с | requests | psycopg2 |
| async | 10.59 с ⚠️* | aiohttp | asyncpg |
* Async-версия показала завышенное время из-за таймаута одного URL (djangoproject.com — 10 с timeout). Без учёта упавшего запроса эффективное время ~2–3 с, сопоставимое с threading/multiprocessing.
Используется та же база данных, что и в ЛР1 — PostgreSQL 16 (finance_db). Добавлена таблица parsed_page для хранения результатов парсинга.
| Поле | Тип | Описание |
|---|---|---|
id | SERIAL PK | Автоинкрементный первичный ключ |
url | TEXT NOT NULL | URL спаршенной страницы |
title | VARCHAR(500) NOT NULL | Заголовок страницы (<title>) |
parsed_at | TIMESTAMPTZ | Временная метка парсинга (UTC, default NOW()) |
# models.py
from sqlalchemy import DateTime, Integer, String, Text, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass
class ParsedPage(Base):
__tablename__ = "parsed_page"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
url: Mapped[str] = mapped_column(Text, nullable=False)
title: Mapped[str] = mapped_column(String(500), nullable=False)
parsed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
# database.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from config import DATABASE_URL
engine = create_async_engine(DATABASE_URL)
async_session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
async def init_db():
from models import Base
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# config.py
from dotenv import load_dotenv
load_dotenv()
POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "postgres")
POSTGRES_DB = os.getenv("POSTGRES_DB", "finance_db")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5433")
DATABASE_URL = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:{POSTGRES_PORT}/{POSTGRES_DB}"
| Критерий | threading | multiprocessing | asyncio |
|---|---|---|---|
| GIL | Да (CPU страдает) | Нет (свой GIL на процесс) | Да (один поток) |
| Память | Общая | Изолированная (копия) | Общая |
| Создание | Быстрое | Медленное | Быстрое |
| I/O-bound | Хорошо | Избыточно | Отлично |
| CPU-bound | Плохо | Отлично | Плохо (нужен to_thread) |
| Сложность | Низкая | Средняя | Средняя |
CPU-bound задачи (вычисления, обработка данных) → multiprocessing — единственный способ получить реальный параллелизм в Python благодаря обходу GIL.
I/O-bound задачи (сеть, диск, БД) → asyncio — наиболее эффективен, так как не тратит память на потоки и минимизирует накладные расходы на переключение контекста.
threading — компромиссный вариант для I/O-bound, когда нет асинхронных альтернатив библиотек. Проще в использовании, но менее масштабируем, чем asyncio.
# 1. Скопировать .env-example → .env (порт настраивается в POSTGRES_PORT)
cp .env-example .env
# 2. Поднять PostgreSQL (порт из .env)
docker-compose up -d
# 3. Установить зависимости
pip install -r requirements.txt
# 4. Задача 1 — параллельное суммирование
python task1/threading_sum.py
python task1/multiprocessing_sum.py
python task1/async_sum.py
# 5. Задача 2 — параллельный парсинг
python task2/threading_parser.py
python task2/multiprocessing_parser.py
python task2/async_parser.py
# 6. Остановить PostgreSQL
docker-compose down