Лекция по главе 21: Многопоточность в Rust

Содержание: Введение в многопоточность Потоки: std::thread Каналы: std::sync::mpsc Синхронизация: Mutex и RwLock Атомарные операции: Atomic types Thread-local storage (thread_local!) и rayon tokio Примеры: параллельная обработка данных Упражнение: многопоточный парсер

Добро пожаловать в главу 21 нашего курса по Rust! Сегодня мы отправляемся в захватывающее путешествие по миру многопоточности — одной из самых мощных и сложных тем в программировании, области, где программы превращаются из одиноких путников в слаженные команды, работающие параллельно. Представьте, что вы шеф-повар, а ваши потоки — помощники на кухне: каждый выполняет свою задачу (режет овощи, варит суп, жарит мясо), чтобы обед был готов быстрее. Но если не наладить координацию, кто-то может случайно вылить суп на плиту или порезать не тот ингредиент. Rust предоставляет уникальный подход к параллелизму, сочетая производительность с безопасностью благодаря своей системе типов и владения, помогает организовать эту "кухню" так, чтобы всё работало быстро, безопасно и без хаоса. Эта лекция — ваш путеводитель от базовых потоков до сложных инструментов синхронизации, с примерами, аналогиями и разбором всех "подводных камней", чтобы вы могли не только понять, но и полюбить многопоточность.


1. Введение в многопоточность

Многопоточность — позволяет выполнять несколько задач одновременно, используя возможности современных многоядерных процессоров. Однако она сопряжена с рисками: гонки данных, взаимные блокировки (deadlocks) и сложность отладки, это как оркестр: каждый музыкант играет свою партию, а вместе они создают гармоничную симфонию. В программировании потоки позволяют выполнять несколько задач одновременно, используя мощь многоядерных процессоров. Например, один поток может загружать данные из сети, другой — обрабатывать их, а третий — обновлять интерфейс. Но без правильной координации это может превратиться в какофонию: потоки начнут мешать друг другу, перезаписывать данные или ждать вечно.

Rust выделяется среди языков тем, что его система владения и заимствования защищает от типичных ошибок многопоточности, таких как гонки данных (когда два потока одновременно меняют одну переменную) или использование памяти после её освобождения. Мы разберём инструменты из стандартной библиотеки (std::thread, std::sync) и заглянем в библиотеку rayon, чтобы вы могли управлять потоками как опытный дирижёр.


2. Потоки: std::thread

Что такое потоки?

Поток — это независимая единица выполнения в рамках процесса, это как отдельный работник на вашей фабрике кода. Он выполняет задачу независимо от других, но все потоки делят одну "память" процесса. В Rust потоки создаются через std::thread::spawn.

Как это работает?

Представьте, что вы отправляете друга за пиццей, пока сами готовите салат. thread::spawn запускает новый поток и даёт вам "чек" (JoinHandle), чтобы вы могли позже проверить, вернулся ли он с пиццей.

Пример: базовый поток

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..5 {
            println!("Друг с пиццей: шаг {}", i);
            thread::sleep(Duration::from_millis(100)); // Имитация времени доставки
        }
    });

    for i in 1..5 {
        println!("Я готовлю салат: шаг {}", i);
        thread::sleep(Duration::from_millis(150)); // Я медленнее
    }

    handle.join().unwrap(); // Жду друга с пиццей
    println!("Ужин готов!");
}

Развёрнутые пояснения

Здесь главный поток (вы) и дочерний поток (друг) работают параллельно. thread::sleep добавляет задержку, чтобы показать, что они выполняются одновременно. Метод .join() — это как сказать: "Не начинаем есть, пока пицца не приедет".

Передача данных

Если нужно передать данные в поток, используйте move:

let pizza_order = String::from("Маргарита");
let handle = thread::spawn(move || {
    println!("Заказываю: {}", pizza_order);
});
handle.join().unwrap();

move передаёт владение pizza_order потоку, потому что Rust не позволит двум потокам одновременно владеть одной переменной. Без .join() главный поток может завершиться раньше, и дочерний поток будет прерван.

Нюансы

Подводные камни с пояснениями


3. Каналы: std::sync::mpsc

Что такое каналы?

Каналы (`mpsc` — multiple producer, single consumer) позволяют обмениваться данными между потоками, это как почтовый ящик между потоками. Один поток отправляет письмо (данные), другой его получает. mpsc означает "много отправителей, один получатель" — несколько потоков могут писать в один ящик, но читает только один.

Rust предоставляет два типа каналов:

Как это работает?

Канал создаётся функцией mpsc::channel(), которая возвращает пару: tx (transmitter, отправитель) и rx (receiver, получатель).

Пример: простой канал

use std::thread;
use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel(); // Создаём почтовый ящик

    thread::spawn(move || {
        let letter = String::from("Пицца готова!");
        tx.send(letter).unwrap(); // Отправляем письмо
    });

    let received = rx.recv().unwrap(); // Читаем письмо
    println!("Получено: {}", received);
}

Развёрнутые пояснения

send возвращает Result, так как получатель может быть удалён. Представьте, что вы повар, а другой поток — курьер. Вы кладёте пиццу в ящик (tx.send), а курьер забирает её (rx.recv). Метод recv блокирует поток, пока письмо не придёт, как если бы курьер ждал у двери.

Много отправителей

Для нескольких отправителей клонируйте `tx`:

let (tx, rx) = mpsc::channel();
let tx2 = tx.clone(); // Второй курьер
thread::spawn(move || { tx.send("Пицца").unwrap(); });
thread::spawn(move || { tx2.send("Салат").unwrap(); });
while let Ok(item) = rx.recv() {
    println!("Доставлено: {}", item);
}

Нюансы

Подводные камни с пояснениями


4. Синхронизация: Mutex и Rw Aurelija

Что такое Mutex?

Mutex (mutual exclusion) защищает данные от одновременного доступа из нескольких потоков. Только один поток может заблокировать Mutex в данный момент, это замок на двери комнаты с общими данными. Только один поток может войти, пока другой ждёт снаружи.

Пример: счётчик с Mutex

use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0)); // Общая касса
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap(); // Открываем кассу
            *num += 1; // Добавляем монетку
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Всего монет: {}", *counter.lock().unwrap()); // 10
}

Развёрнутые пояснения

Arc (Atomic Reference Counted) — это как общий ключ от комнаты, который можно скопировать. Mutex гарантирует, что только один поток изменяет counter в данный момент. Без него потоки могли бы одновременно записывать разные значения, и итог был бы хаотичным.

Что такое RwLock?

RwLock (read-write lock) позволяет множественным потокам читать данные одновременно, но только одному записывать, это как библиотека: много людей могут читать книги одновременно, но писать в них может только один.

Пример: RwLock

use std::sync::{RwLock, Arc};
use std::thread;

fn main() {
    let data = Arc::new(RwLock::new(42));
    let mut handles = vec![];

    for _ in 0..3 {
        let data = Arc::clone(&data);
        handles.push(thread::spawn(move || {
            let read = data.read().unwrap(); // Читаем книгу
            println!("Читатель видит: {}", *read);
        }));
    }

    let data_clone = Arc::clone(&data);
    handles.push(thread::spawn(move || {
        let mut write = data_clone.write().unwrap(); // Пишем в книгу
        *write = 100;
    }));

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Итог: {}", *data.read().unwrap());
}

Нюансы

Подводные камни с пояснениями


5. Атомарные операции: Atomic types

Что такое атомарные типы?

Атомарные операции (типы) (AtomicUsize, AtomicBool, etc.) предоставляют операции, которые выполняются как единое целое без прерываний. Они полезны для простых счётчиков или флагов, это как мгновенные записи в журнале: никто не может прервать вас посреди слова. Типы вроде AtomicUsize или AtomicBool идеальны для простых данных.

Пример: атомарный счётчик

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;

fn main() {
    let counter = Arc::new(AtomicUsize::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        handles.push(thread::spawn(move || {
            counter.fetch_add(1, Ordering::SeqCst); // Атомарно прибавляем 1
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Всего: {}", counter.load(Ordering::SeqCst)); // 10
}

Развёрнутые пояснения

Ordering определяет гарантии синхронизации:

fetch_add — это как нажать кнопку счётчика в кассе: операция неделима. Ordering::SeqCst (sequential consistency) гарантирует, что все потоки видят изменения в одном порядке.

Нюансы

Подводные камни с пояснениями


6. Thread-local storage (thread_local!) и rayon

Thread-local storage

thread_local! создаёт переменные, уникальные для каждого потока, это как личный блокнот каждого потока: записи видны только ему.

Пример: thread_local!

use std::cell::RefCell;
use std::thread;

thread_local! {
    static MY_NOTEBOOK: RefCell = RefCell::new(0);
}

fn main() {
    let handles: Vec<_> = (0..3)
        .map(|_| {
            thread::spawn(|| {
                MY_NOTEBOOK.with(|note| {
                    *note.borrow_mut() += 1;
                    println!("Мои заметки: {}", *note.borrow());
                });
            })
        })
        .collect();

    for handle in handles {
        handle.join().unwrap();
    }
}

Развёрнутые пояснения

Каждый поток видит свою копию MY_NOTEBOOK. Это полезно, например, для локальных счётчиков или кэша.

Rayon

rayon библиотека для параллельных вычислений, упрощающая работу с потоками, это как нанять бригаду рабочих с автоматическим распределением задач.

Пример: rayon

use rayon::prelude::*;

fn main() {
    let ingredients = vec![1, 2, 3, 4, 5];
    let total: i32 = ingredients.par_iter().map(|x| x * 2).sum();
    println!("Общий вес: {}", total); // 30
}

Нюансы

Подводные камни с пояснениями


Tokio

Tokio — это мощный асинхронный runtime для языка программирования Rust, который используется для построения высокопроизводительных приложений, работающих с сетью, вводом-выводом и другими асинхронными операциями. В отличие от стандартной многопоточности с std::thread, tokio предоставляет модель асинхронного выполнения на основе событийного цикла (event loop), что делает его особенно эффективным для задач с высокой конкуренцией (например, серверов, обрабатывающих тысячи подключений).

Как работает Tokio?

Tokio основан на концепции фьючерсов (futures) и задач (tasks). Основные компоненты:

  1. Runtime — ядро Tokio, которое управляет событийным циклом и распределяет задачи между потоками.
  2. Futures — асинхронные вычисления, которые завершаются в будущем. Они представляют собой результат операции, который будет доступен позже.
  3. Tasks — единицы работы, которые планируются runtime’ом для выполнения.
  4. Реактор — следит за событиями (например, готовность сокета для чтения) и уведомляет runtime.
  5. Executor — отвечает за выполнение задач.

Tokio использует модель кооперативной многозадачности: задачи сами решают, когда уступить управление другим задачам (через await), что позволяет избежать накладных расходов на переключение потоков.

Установка

Добавьте зависимости в Cargo.toml:


[dependencies]
tokio = { version = "1.0", features = ["full"] }
    

Флаг features = ["full"] включает все возможности Tokio, такие как таймеры, сетевые утилиты и синхронизация.

Пример: Простой сервер

Пример TCP-сервера, который принимает подключения и отправляет "Hello, World!":


use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::main]
async fn main() -> tokio::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8080").await?;
    println!("Server running on 127.0.0.1:8080");

    loop {
        let (mut socket, addr) = listener.accept().await?;
        println!("New connection: {}", addr);

        tokio::spawn(async move {
            let message = "Hello, World!\n";
            if let Err(e) = socket.write_all(message.as_bytes()).await {
                eprintln!("Failed to write to socket: {}", e);
            }
        });
    }
}
    

Основные компоненты Tokio

  1. Таймеры (tokio::time):
    Используются для задержек или периодических задач.
    
    use tokio::time::{sleep, Duration};
    
    #[tokio::main]
    async fn main() {
        println!("Start");
        sleep(Duration::from_secs(2)).await;
        println!("After 2 seconds");
    }
                
  2. Сетевые утилиты (tokio::net):
    Поддержка TCP, UDP, Unix сокетов и т.д.
  3. Синхронизация (tokio::sync):
    Mutex, RwLock, mpsc (многопроизводительный, однопотребительный канал) для асинхронных задач.
    
    use tokio::sync::mpsc;
    
    #[tokio::main]
    async fn main() {
        let (tx, mut rx) = mpsc::channel(32);
    
        tokio::spawn(async move {
            tx.send("Hello from channel").await.unwrap();
        });
    
        while let Some(message) = rx.recv().await {
            println!("Received: {}", message);
        }
    }
                
  4. Потоки ввода-вывода (tokio::io):
    Асинхронное чтение и запись.

Нюансы и подводные камни

  1. Блокирующие операции:
    Вызовы вроде std::thread::sleep или длительных синхронных операций блокируют runtime. Используйте tokio::task::spawn_blocking для таких задач:
    
    let result = tokio::task::spawn_blocking(|| {
        std::thread::sleep(std::time::Duration::from_secs(1));
        42
    }).await?;
                
  2. Ограничения .await:
    Нельзя вызывать .await внутри синхронного контекста или в замыканиях, не помеченных как async.
  3. Переполнение задач:
    Слишком большое количество tokio::spawn может перегрузить runtime. Используйте пулы задач или ограничивайте количество одновременных операций.
  4. Отмена задач:
    Tokio не предоставляет встроенной отмены задач. Используйте tokio::select! или тайм-ауты для управления временем жизни задач.

Практические советы

  1. Выбор runtime:
    Используйте #[tokio::main] для простоты или создайте собственный runtime с tokio::runtime::Builder для настройки числа потоков и поведения.
  2. Обработка ошибок:
    Всегда обрабатывайте результаты асинхронных операций (Result), чтобы избежать паники.
  3. Использование select!:
    Макрос tokio::select! позволяет обрабатывать несколько асинхронных операций одновременно:
    
    use tokio::time::{sleep, Duration};
    
    #[tokio::main]
    async fn main() {
        tokio::select! {
            _ = sleep(Duration::from_secs(1)) => println!("1 second passed"),
            _ = sleep(Duration::from_secs(2)) => println!("2 seconds passed"),
        }
    }
                
  4. Профилирование:
    Используйте tokio-console для анализа производительности асинхронных приложений.

Лучшие практики

  1. Минимизируйте ожидание:
    Старайтесь избегать долгих .await на операциях, которые можно оптимизировать.
  2. Используйте каналы для коммуникации:
    mpsc идеально подходит для передачи данных между задачами.
  3. Тестирование:
    Используйте #[tokio::test] для написания асинхронных тестов:
    
    #[tokio::test]
    async fn test_timer() {
        tokio::time::sleep(Duration::from_millis(100)).await;
        assert_eq!(1 + 1, 2);
    }
                
  4. Ограничение ресурсов:
    Для серверов используйте tokio::sync::Semaphore для ограничения числа одновременных подключений.

Пример: Асинхронный парсер URL

Пример парсинга нескольких URL параллельно:


use tokio::time::Duration;
use reqwest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let urls = vec![
        "https://www.rust-lang.org",
        "https://www.tokio.rs",
    ];

    let mut handles = vec![];

    for url in urls {
        let handle = tokio::spawn(async move {
            let resp = reqwest::get(url).await?.text().await?;
            println!("Fetched {}: {} bytes", url, resp.len());
            Ok::<_, reqwest::Error>(())
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.await??;
    }

    Ok(())
}
    

Упражнение: Многопоточный асинхронный парсер

  1. Напишите программу, которая принимает список URL из файла (например, urls.txt).
  2. Используйте Tokio для асинхронной загрузки содержимого каждого URL.
  3. Сохраните результаты (размер страницы в байтах и первые 100 символов) в файл results.txt.
  4. Добавьте обработку ошибок и лимит времени (например, 5 секунд на запрос) с помощью tokio::time::timeout.
  5. Используйте tokio::sync::Semaphore, чтобы ограничить количество одновременных запросов до 5.

Примерный план:


7. Примеры: параллельная обработка данных

Параллельная сумма

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let ingredients = vec![1, 2, 3, 4, 5, 6, 7, 8];
    let chunk_size = ingredients.len() / 4;
    let total_weight = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for chunk in ingredients.chunks(chunk_size) {
        let chunk = chunk.to_vec();
        let total_weight = Arc::clone(&total_weight);
        handles.push(thread::spawn(move || {
            let sum: i32 = chunk.iter().sum();
            let mut total = total_weight.lock().unwrap();
            *total += sum;
        }));
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Общий вес: {}", *total_weight.lock().unwrap()); // 36
}

Развёрнутые пояснения

Мы делим массив на куски, как ингредиенты для поваров. Каждый поток считает свою часть, а затем добавляет результат в общую "кастрюлю" через Mutex.


8. Упражнение: многопоточный парсер

Задача

Создайте программу, которая параллельно парсит длину содержимого веб-страниц по списку URL.

Решение

use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;
use reqwest::blocking::get;

fn fetch_url(url: &str) -> Result {
    let response = get(url)?;
    Ok(response.text()?.len())
}

fn main() {
    let urls = vec![
        "https://www.rust-lang.org",
        "https://www.google.com",
        "https://www.github.com",
    ];

    let results = Arc::new(Mutex::new(Vec::new()));
    let (tx, rx) = mpsc::channel();
    let mut handles = vec![];

    for url in urls {
        let tx = tx.clone();
        let url = url.to_string();
        handles.push(thread::spawn(move || {
            match fetch_url(&url) {
                Ok(len) => tx.send((url, len)).unwrap(),
                Err(e) => eprintln!("Ошибка для {}: {}", url, e),
            }
        }));
    }

    drop(tx); // Закрываем ящик после отправки

    for handle in handles {
        handle.join().unwrap();
    }

    while let Ok((url, len)) = rx.recv() {
        let mut res = results.lock().unwrap();
        res.push((url, len));
    }

    for (url, len) in results.lock().unwrap().iter() {
        println!("Страница: {}, Длина: {}", url, len);
    }
}

Развёрнутые пояснения

Каждый поток — как курьер, который едет за "посылкой" (HTML-страницей). Он отправляет результат через канал, а главный поток собирает все посылки в общую коробку (results).

Нюансы

Подводные камни с пояснениями


Заключение

Многопоточность в Rust — это как организовать вечеринку: потоки — гости, каналы — почта, а Mutex и RwLock — правила доступа к угощениям. Вы научились управлять этим праздником безопасно и эффективно. Практикуйтесь, экспериментируйте и наслаждайтесь мощью параллелизма!