Лабораторная работа 2

Потоки. Процессы. Асинхронность. Параллельное суммирование и парсинг веб-страниц с сохранением в БД.

Готово

Цель работы

Понять отличия между потоками (threading), процессами (multiprocessing) и асинхронностью (asyncio) в Python. Научиться применять каждый подход для параллельного выполнения задач — как вычислительных (CPU-bound), так и ввода-вывода (I/O-bound).

Выполненные задачи

Задача 1 — Параллельное суммирование
Три программы: threading, multiprocessing, async (to_thread). Вычисление суммы 1..1 000 000 000 с разбивкой на 8 подзадач. Замер времени, проверка результата формулой n·(n+1)/2.
Задача 2 — Параллельный парсинг веб-страниц
Три программы: threading + requests + psycopg2, multiprocessing + requests + psycopg2, async + aiohttp + asyncpg. Парсинг 10 URL, извлечение <title>, сохранение в PostgreSQL (таблица parsed_page).

Стек технологий

КомпонентТехнологияНазначение
ПотокиthreadingПараллелизм в одном процессе
ПроцессыmultiprocessingОбход GIL через отдельные процессы
Асинхронностьasyncio + aiohttpКооперативная многозадачность, I/O
HTTP (синхр.)requestsHTTP-запросы в threading/multiprocessing
Парсинг HTMLbeautifulsoup4Извлечение <title> из HTML
ORMSQLAlchemy 2.0 asyncAsync-доступ к PostgreSQL
Драйвер БД (async)asyncpgАсинхронный драйвер PostgreSQL
Драйвер БД (sync)psycopg2Синхронный драйвер для threading/multiprocessing
Окружениеpython-dotenvПеременные окружения из .env
ИнфраструктураDocker ComposeЗапуск PostgreSQL на конфигурируемом порту

Структура проекта

Lr2/ ├── requirements.txt # зависимости ├── .env-example # переменные окружения (POSTGRES_PORT и др.) ├── docker-compose.yml # PostgreSQL на конфигурируемом порту ├── config.py # загрузка DATABASE_URL из .env ├── database.py # async engine, async_session_maker, init_db() ├── models.py # модель ParsedPage (SQLAlchemy) ├── task1/ │ ├── threading_sum.py # Задача 1 — threading │ ├── multiprocessing_sum.py # Задача 1 — multiprocessing │ └── async_sum.py # Задача 1 — asyncio (to_thread) └── task2/ ├── threading_parser.py # Задача 2 — ThreadPoolExecutor + requests + psycopg2 ├── multiprocessing_parser.py# Задача 2 — Pool + requests + psycopg2 └── async_parser.py # Задача 2 — aiohttp + SQLAlchemy async + asyncpg

Задача 1 — Параллельное суммирование

Вычисление суммы всех целых чисел от 1 до 1 000 000 000 с разбивкой на 8 подзадач.

Threading

# task1/threading_sum.py
import threading

N = 1_000_000_000
NUM_WORKERS = 8

def partial_sum(start, end, results, idx):
    s = 0
    for i in range(start, end + 1):
        s += i
    results[idx] = s

def main():
    chunk_size = N // NUM_WORKERS
    results = [0] * NUM_WORKERS
    threads = []
    for i in range(NUM_WORKERS):
        start = i * chunk_size + 1
        end = (i + 1) * chunk_size if i < NUM_WORKERS - 1 else N
        t = threading.Thread(target=partial_sum, args=(start, end, results, i))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    total = sum(results)
    print(f"Result: {total:,}  Correct: {total == N*(N+1)//2}")

Multiprocessing

# task1/multiprocessing_sum.py
from multiprocessing import Pool

def partial_sum(args):
    start, end = args
    s = 0
    for i in range(start, end + 1):
        s += i
    return s

def main():
    ranges = [(i*chunk+1, (i+1)*chunk) for i in range(8)]
    with Pool(processes=8) as pool:
        partial_sums = pool.map(partial_sum, ranges)
    total = sum(partial_sums)

Async (to_thread)

# task1/async_sum.py
import asyncio

def partial_sum(start, end):
    s = 0
    for i in range(start, end + 1):
        s += i
    return s

async def main():
    tasks = [asyncio.to_thread(partial_sum, start, end) for ...]
    partial_sums = await asyncio.gather(*tasks)
    total = sum(partial_sums)

Результаты замеров

ПодходВремяМеханизм
threading22.46 сПотоки, GIL ограничивает CPU-работу
multiprocessing3.76 сОтдельные процессы, реальный параллелизм (~6× быстрее)
async (to_thread)21.08 сДелегирование в потоки через asyncio

Замеры на 8-ядерном CPU. Результат: 500 000 000 500 000 000 ✓ (проверено формулой n·(n+1)/2).

Задача 2 — Параллельный парсинг веб-страниц

Парсинг 10 веб-страниц, извлечение заголовков <title>, сохранение в PostgreSQL.

Threading (requests + psycopg2)

# task2/threading_parser.py
from concurrent.futures import ThreadPoolExecutor
import requests, psycopg2
from bs4 import BeautifulSoup

def parse_and_save(url):
    resp = requests.get(url, timeout=10, headers={"User-Agent": "Mozilla/5.0"})
    soup = BeautifulSoup(resp.text, "html.parser")
    title = soup.title.string.strip()

    conn = psycopg2.connect(SYNC_DSN)
    cur = conn.cursor()
    cur.execute("INSERT INTO parsed_page (url, title) VALUES (%s, %s) RETURNING id", (url, title))
    row_id = cur.fetchone()[0]
    conn.commit()
    return {"id": row_id, "url": url, "title": title}

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(parse_and_save, url): url for url in URLS}

Multiprocessing (requests + psycopg2)

# task2/multiprocessing_parser.py
from multiprocessing import Pool

def parse_and_save(url):
    # Идентичная логика с threading_parser
    # Каждый процесс открывает своё подключение к БД
    ...

with Pool(processes=5) as pool:
    results = pool.map(parse_and_save, URLS)

Async (aiohttp + asyncpg)

# task2/async_parser.py
import aiohttp, asyncio
from database import async_session_maker, init_db
from models import ParsedPage

async def fetch_title(session, url):
    async with session.get(url, timeout=..., headers={"User-Agent": "Mozilla/5.0"}) as resp:
        html = await resp.text()
        soup = BeautifulSoup(html, "html.parser")
        return soup.title.string.strip()

async def parse_and_save(session, sem, url):
    async with sem:
        title = await fetch_title(session, url)
    async with async_session_maker() as db:
        page = ParsedPage(url=url, title=title)
        db.add(page)
        await db.commit()
        return {"id": page.id, "url": url, "title": title}

async def main():
    await init_db()
    async with aiohttp.ClientSession() as session:
        tasks = [parse_and_save(session, sem, url) for url in URLS]
        results = await asyncio.gather(*tasks)

Список URL для парсинга

  1. https://www.python.org → Welcome to Python.org
  2. https://en.wikipedia.org/wiki/Python_(programming_language) → Python (programming language) - Wikipedia
  3. https://fastapi.tiangolo.com → FastAPI
  4. https://www.sqlalchemy.org → SQLAlchemy - The Database Toolkit for Python
  5. https://www.djangoproject.com → The web framework for perfectionists with deadlines | Django
  6. https://github.com → GitHub
  7. https://en.wikipedia.org/wiki/Web_scraping → Web scraping - Wikipedia
  8. https://httpbin.org → httpbin.org
  9. https://pypi.org → PyPI · The Python Package Index
  10. https://docs.python.org/3/ → 3.14 Documentation

Wikipedia требует заголовок User-Agent, иначе возвращает 403.

Результаты замеров

ПодходВремяHTTPБД
threading1.67 сrequestspsycopg2
multiprocessing1.45 сrequestspsycopg2
async10.59 с ⚠️*aiohttpasyncpg

* Async-версия показала завышенное время из-за таймаута одного URL (djangoproject.com — 10 с timeout). Без учёта упавшего запроса эффективное время ~2–3 с, сопоставимое с threading/multiprocessing.

Модель данных

Используется та же база данных, что и в ЛР1 — PostgreSQL 16 (finance_db). Добавлена таблица parsed_page для хранения результатов парсинга.

ПолеТипОписание
idSERIAL PKАвтоинкрементный первичный ключ
urlTEXT NOT NULLURL спаршенной страницы
titleVARCHAR(500) NOT NULLЗаголовок страницы (<title>)
parsed_atTIMESTAMPTZВременная метка парсинга (UTC, default NOW())

Модель SQLAlchemy

# models.py
from sqlalchemy import DateTime, Integer, String, Text, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

class Base(DeclarativeBase):
    pass

class ParsedPage(Base):
    __tablename__ = "parsed_page"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    url: Mapped[str] = mapped_column(Text, nullable=False)
    title: Mapped[str] = mapped_column(String(500), nullable=False)
    parsed_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())

Подключение к БД

# database.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from config import DATABASE_URL

engine = create_async_engine(DATABASE_URL)
async_session_maker = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

async def init_db():
    from models import Base
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

Конфигурация из .env

# config.py
from dotenv import load_dotenv
load_dotenv()

POSTGRES_USER = os.getenv("POSTGRES_USER", "postgres")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD", "postgres")
POSTGRES_DB = os.getenv("POSTGRES_DB", "finance_db")
POSTGRES_PORT = os.getenv("POSTGRES_PORT", "5433")

DATABASE_URL = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:{POSTGRES_PORT}/{POSTGRES_DB}"

Общие выводы

Критерийthreadingmultiprocessingasyncio
GILДа (CPU страдает)Нет (свой GIL на процесс)Да (один поток)
ПамятьОбщаяИзолированная (копия)Общая
СозданиеБыстроеМедленноеБыстрое
I/O-boundХорошоИзбыточноОтлично
CPU-boundПлохоОтличноПлохо (нужен to_thread)
СложностьНизкаяСредняяСредняя

Рекомендации

CPU-bound задачи (вычисления, обработка данных) → multiprocessing — единственный способ получить реальный параллелизм в Python благодаря обходу GIL.

I/O-bound задачи (сеть, диск, БД) → asyncio — наиболее эффективен, так как не тратит память на потоки и минимизирует накладные расходы на переключение контекста.

threading — компромиссный вариант для I/O-bound, когда нет асинхронных альтернатив библиотек. Проще в использовании, но менее масштабируем, чем asyncio.

Запуск

# 1. Скопировать .env-example → .env (порт настраивается в POSTGRES_PORT)
cp .env-example .env

# 2. Поднять PostgreSQL (порт из .env)
docker-compose up -d

# 3. Установить зависимости
pip install -r requirements.txt

# 4. Задача 1 — параллельное суммирование
python task1/threading_sum.py
python task1/multiprocessing_sum.py
python task1/async_sum.py

# 5. Задача 2 — параллельный парсинг
python task2/threading_parser.py
python task2/multiprocessing_parser.py
python task2/async_parser.py

# 6. Остановить PostgreSQL
docker-compose down