std::thread
Каналы: std::sync::mpsc
Синхронизация: Mutex
и RwLock
Атомарные операции: Atomic types
Thread-local storage (thread_local!
) и rayon
tokio
Примеры: параллельная обработка данных
Упражнение: многопоточный парсер
Добро пожаловать в главу 21 нашего курса по Rust! Сегодня мы отправляемся в захватывающее путешествие по миру многопоточности — одной из самых мощных и сложных тем в программировании, области, где программы превращаются из одиноких путников в слаженные команды, работающие параллельно. Представьте, что вы шеф-повар, а ваши потоки — помощники на кухне: каждый выполняет свою задачу (режет овощи, варит суп, жарит мясо), чтобы обед был готов быстрее. Но если не наладить координацию, кто-то может случайно вылить суп на плиту или порезать не тот ингредиент. Rust предоставляет уникальный подход к параллелизму, сочетая производительность с безопасностью благодаря своей системе типов и владения, помогает организовать эту "кухню" так, чтобы всё работало быстро, безопасно и без хаоса. Эта лекция — ваш путеводитель от базовых потоков до сложных инструментов синхронизации, с примерами, аналогиями и разбором всех "подводных камней", чтобы вы могли не только понять, но и полюбить многопоточность.
Многопоточность — позволяет выполнять несколько задач одновременно, используя возможности современных многоядерных процессоров. Однако она сопряжена с рисками: гонки данных, взаимные блокировки (deadlocks) и сложность отладки, это как оркестр: каждый музыкант играет свою партию, а вместе они создают гармоничную симфонию. В программировании потоки позволяют выполнять несколько задач одновременно, используя мощь многоядерных процессоров. Например, один поток может загружать данные из сети, другой — обрабатывать их, а третий — обновлять интерфейс. Но без правильной координации это может превратиться в какофонию: потоки начнут мешать друг другу, перезаписывать данные или ждать вечно.
Rust выделяется среди языков тем, что его система владения и заимствования защищает от типичных ошибок многопоточности, таких как гонки данных (когда два потока одновременно меняют одну переменную) или использование памяти после её освобождения. Мы разберём инструменты из стандартной библиотеки (std::thread
, std::sync
) и заглянем в библиотеку rayon
, чтобы вы могли управлять потоками как опытный дирижёр.
std::thread
Поток — это независимая единица выполнения в рамках процесса, это как отдельный работник на вашей фабрике кода. Он выполняет задачу независимо от других, но все потоки делят одну "память" процесса. В Rust потоки создаются через std::thread::spawn
.
thread::spawn
создаёт новый поток и возвращает JoinHandle
, который позволяет дождаться его завершения.move
).Представьте, что вы отправляете друга за пиццей, пока сами готовите салат. thread::spawn
запускает новый поток и даёт вам "чек" (JoinHandle
), чтобы вы могли позже проверить, вернулся ли он с пиццей.
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..5 {
println!("Друг с пиццей: шаг {}", i);
thread::sleep(Duration::from_millis(100)); // Имитация времени доставки
}
});
for i in 1..5 {
println!("Я готовлю салат: шаг {}", i);
thread::sleep(Duration::from_millis(150)); // Я медленнее
}
handle.join().unwrap(); // Жду друга с пиццей
println!("Ужин готов!");
}
Здесь главный поток (вы) и дочерний поток (друг) работают параллельно. thread::sleep
добавляет задержку, чтобы показать, что они выполняются одновременно. Метод .join()
— это как сказать: "Не начинаем есть, пока пицца не приедет".
Если нужно передать данные в поток, используйте move
:
let pizza_order = String::from("Маргарита");
let handle = thread::spawn(move || {
println!("Заказываю: {}", pizza_order);
});
handle.join().unwrap();
move
передаёт владение pizza_order
потоку, потому что Rust не позволит двум потокам одновременно владеть одной переменной. Без .join()
главный поток может завершиться раньше, и дочерний поток будет прерван.
.join()
, главный поток завершится, а дочерний будет прерван, как если бы вы ушли из дома, не дождавшись пиццы. Всегда ждите важные потоки!rayon
).std::sync::mpsc
Каналы (`mpsc` — multiple producer, single consumer) позволяют обмениваться данными между потоками, это как почтовый ящик между потоками. Один поток отправляет письмо (данные), другой его получает. mpsc
означает "много отправителей, один получатель" — несколько потоков могут писать в один ящик, но читает только один.
Rust предоставляет два типа каналов:
Канал создаётся функцией mpsc::channel()
, которая возвращает пару: tx
(transmitter, отправитель) и rx
(receiver, получатель).
use std::thread;
use std::sync::mpsc;
fn main() {
let (tx, rx) = mpsc::channel(); // Создаём почтовый ящик
thread::spawn(move || {
let letter = String::from("Пицца готова!");
tx.send(letter).unwrap(); // Отправляем письмо
});
let received = rx.recv().unwrap(); // Читаем письмо
println!("Получено: {}", received);
}
send
возвращает Result
, так как получатель может быть удалён. Представьте, что вы повар, а другой поток — курьер. Вы кладёте пиццу в ящик (tx.send
), а курьер забирает её (rx.recv
). Метод recv
блокирует поток, пока письмо не придёт, как если бы курьер ждал у двери.
Для нескольких отправителей клонируйте `tx`:
let (tx, rx) = mpsc::channel();
let tx2 = tx.clone(); // Второй курьер
thread::spawn(move || { tx.send("Пицца").unwrap(); });
thread::spawn(move || { tx2.send("Салат").unwrap(); });
while let Ok(item) = rx.recv() {
println!("Доставлено: {}", item);
}
mpsc::channel
) имеет неограниченный буфер, что может привести к утечкам памяти при чрезмерной нагрузке. Если отправлять слишком много сообщений, а никто не забирает, ящик "переполнится" в памяти.recv
вернёт ошибку, если все отправители исчезли (ящик закрыт).mpsc::sync_channel
) с ограниченным буфером.recv
), а отправитель никогда не придёт, программа застрянет, как курьер, ждущий пиццу, которой нет.Mutex
и Rw Aurelija
Mutex
?Mutex
(mutual exclusion) защищает данные от одновременного доступа из нескольких потоков. Только один поток может заблокировать Mutex
в данный момент, это замок на двери комнаты с общими данными. Только один поток может войти, пока другой ждёт снаружи.
Mutex
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0)); // Общая касса
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap(); // Открываем кассу
*num += 1; // Добавляем монетку
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Всего монет: {}", *counter.lock().unwrap()); // 10
}
Arc
(Atomic Reference Counted) — это как общий ключ от комнаты, который можно скопировать. Mutex
гарантирует, что только один поток изменяет counter
в данный момент. Без него потоки могли бы одновременно записывать разные значения, и итог был бы хаотичным.
RwLock
?RwLock
(read-write lock) позволяет множественным потокам читать данные одновременно, но только одному записывать, это как библиотека: много людей могут читать книги одновременно, но писать в них может только один.
RwLock
use std::sync::{RwLock, Arc};
use std::thread;
fn main() {
let data = Arc::new(RwLock::new(42));
let mut handles = vec![];
for _ in 0..3 {
let data = Arc::clone(&data);
handles.push(thread::spawn(move || {
let read = data.read().unwrap(); // Читаем книгу
println!("Читатель видит: {}", *read);
}));
}
let data_clone = Arc::clone(&data);
handles.push(thread::spawn(move || {
let mut write = data_clone.write().unwrap(); // Пишем в книгу
*write = 100;
}));
for handle in handles {
handle.join().unwrap();
}
println!("Итог: {}", *data.read().unwrap());
}
lock()
и write()
возвращают временную ссылку, которая "отпирает" замок при выходе из области видимости.Mutex
(например, из-за паники), другие будут ждать вечно — как очередь в библиотеку, где библиотекарь ушёл с ключом. Всегда проверяйте логику освобождения.RwLock
: Если читатели приходят постоянно, писатель может никогда не получить доступ — как автор, которого не пускают в библиотеку из-за толпы читателей. Используйте справедливые блокировки из crates вроде parking_lot
.Атомарные операции (типы) (AtomicUsize
, AtomicBool
, etc.) предоставляют операции, которые выполняются как единое целое без прерываний. Они полезны для простых счётчиков или флагов, это как мгновенные записи в журнале: никто не может прервать вас посреди слова. Типы вроде AtomicUsize
или AtomicBool
идеальны для простых данных.
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
fn main() {
let counter = Arc::new(AtomicUsize::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
handles.push(thread::spawn(move || {
counter.fetch_add(1, Ordering::SeqCst); // Атомарно прибавляем 1
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("Всего: {}", counter.load(Ordering::SeqCst)); // 10
}
Ordering
определяет гарантии синхронизации:
SeqCst
: Полная последовательность.Relaxed
: Минимум гарантий, максимум производительности.fetch_add
— это как нажать кнопку счётчика в кассе: операция неделима. Ordering::SeqCst
(sequential consistency) гарантирует, что все потоки видят изменения в одном порядке.
Relaxed
: Как записать заметку на доске — быстро, но другие могут не сразу увидеть.SeqCst
: Как объявление по громкоговорителю — все видят одновременно.Mutex
для простых операций.Ordering
: Использование Relaxed
вместо SeqCst
может привести к гонкам данных — как если бы кассиры считали посетителей, но не договорились о порядке. Тщательно выбирайте режим.thread_local!
) и rayon
thread_local!
создаёт переменные, уникальные для каждого потока, это как личный блокнот каждого потока: записи видны только ему.
thread_local!
use std::cell::RefCell;
use std::thread;
thread_local! {
static MY_NOTEBOOK: RefCell = RefCell::new(0);
}
fn main() {
let handles: Vec<_> = (0..3)
.map(|_| {
thread::spawn(|| {
MY_NOTEBOOK.with(|note| {
*note.borrow_mut() += 1;
println!("Мои заметки: {}", *note.borrow());
});
})
})
.collect();
for handle in handles {
handle.join().unwrap();
}
}
Каждый поток видит свою копию MY_NOTEBOOK
. Это полезно, например, для локальных счётчиков или кэша.
rayon
библиотека для параллельных вычислений, упрощающая работу с потоками, это как нанять бригаду рабочих с автоматическим распределением задач.
rayon
use rayon::prelude::*;
fn main() {
let ingredients = vec![1, 2, 3, 4, 5];
let total: i32 = ingredients.par_iter().map(|x| x * 2).sum();
println!("Общий вес: {}", total); // 30
}
thread_local!
требует RefCell
для изменения, так как данные неизменяемы по умолчанию.rayon
использует пул потоков, что экономит ресурсы.thread_local!
не делится данными между потоками — как если бы повара не могли передать друг другу специи. Используйте каналы для обмена.rayon
: Нужно добавить в Cargo.toml
(rayon = "1.5"
), иначе компилятор не найдёт библиотеку.Tokio
— это мощный асинхронный runtime для языка программирования Rust, который используется для построения высокопроизводительных приложений, работающих с сетью, вводом-выводом и другими асинхронными операциями. В отличие от стандартной многопоточности с std::thread
, tokio
предоставляет модель асинхронного выполнения на основе событийного цикла (event loop), что делает его особенно эффективным для задач с высокой конкуренцией (например, серверов, обрабатывающих тысячи подключений).
Tokio основан на концепции фьючерсов (futures) и задач (tasks). Основные компоненты:
Tokio использует модель кооперативной многозадачности: задачи сами решают, когда уступить управление другим задачам (через await
), что позволяет избежать накладных расходов на переключение потоков.
Добавьте зависимости в Cargo.toml
:
[dependencies]
tokio = { version = "1.0", features = ["full"] }
Флаг features = ["full"]
включает все возможности Tokio, такие как таймеры, сетевые утилиты и синхронизация.
Пример TCP-сервера, который принимает подключения и отправляет "Hello, World!":
use tokio::net::TcpListener;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
#[tokio::main]
async fn main() -> tokio::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Server running on 127.0.0.1:8080");
loop {
let (mut socket, addr) = listener.accept().await?;
println!("New connection: {}", addr);
tokio::spawn(async move {
let message = "Hello, World!\n";
if let Err(e) = socket.write_all(message.as_bytes()).await {
eprintln!("Failed to write to socket: {}", e);
}
});
}
}
#[tokio::main]
— макрос, который создаёт runtime и запускает асинхронную функцию main
.TcpListener::bind
— асинхронно привязывает сервер к адресу.tokio::spawn
— создаёт новую задачу, которая выполняется параллельно.tokio::time
):
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
println!("Start");
sleep(Duration::from_secs(2)).await;
println!("After 2 seconds");
}
tokio::net
):tokio::sync
):Mutex
, RwLock
, mpsc
(многопроизводительный, однопотребительный канал) для асинхронных задач.
use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
tx.send("Hello from channel").await.unwrap();
});
while let Some(message) = rx.recv().await {
println!("Received: {}", message);
}
}
tokio::io
):std::thread::sleep
или длительных синхронных операций блокируют runtime. Используйте tokio::task::spawn_blocking
для таких задач:
let result = tokio::task::spawn_blocking(|| {
std::thread::sleep(std::time::Duration::from_secs(1));
42
}).await?;
.await
:.await
внутри синхронного контекста или в замыканиях, не помеченных как async
.tokio::spawn
может перегрузить runtime. Используйте пулы задач или ограничивайте количество одновременных операций.tokio::select!
или тайм-ауты для управления временем жизни задач.#[tokio::main]
для простоты или создайте собственный runtime с tokio::runtime::Builder
для настройки числа потоков и поведения.Result
), чтобы избежать паники.select!
:tokio::select!
позволяет обрабатывать несколько асинхронных операций одновременно:
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() {
tokio::select! {
_ = sleep(Duration::from_secs(1)) => println!("1 second passed"),
_ = sleep(Duration::from_secs(2)) => println!("2 seconds passed"),
}
}
tokio-console
для анализа производительности асинхронных приложений..await
на операциях, которые можно оптимизировать.mpsc
идеально подходит для передачи данных между задачами.#[tokio::test]
для написания асинхронных тестов:
#[tokio::test]
async fn test_timer() {
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(1 + 1, 2);
}
tokio::sync::Semaphore
для ограничения числа одновременных подключений.Пример парсинга нескольких URL параллельно:
use tokio::time::Duration;
use reqwest;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let urls = vec![
"https://www.rust-lang.org",
"https://www.tokio.rs",
];
let mut handles = vec![];
for url in urls {
let handle = tokio::spawn(async move {
let resp = reqwest::get(url).await?.text().await?;
println!("Fetched {}: {} bytes", url, resp.len());
Ok::<_, reqwest::Error>(())
});
handles.push(handle);
}
for handle in handles {
handle.await??;
}
Ok(())
}
urls.txt
).
results.txt
.tokio::time::timeout
.tokio::sync::Semaphore
, чтобы ограничить количество одновременных запросов до 5.Примерный план:
tokio::spawn
.reqwest
для HTTP-запросов.tokio::time::timeout
и tokio::sync::Semaphore
.use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let ingredients = vec![1, 2, 3, 4, 5, 6, 7, 8];
let chunk_size = ingredients.len() / 4;
let total_weight = Arc::new(Mutex::new(0));
let mut handles = vec![];
for chunk in ingredients.chunks(chunk_size) {
let chunk = chunk.to_vec();
let total_weight = Arc::clone(&total_weight);
handles.push(thread::spawn(move || {
let sum: i32 = chunk.iter().sum();
let mut total = total_weight.lock().unwrap();
*total += sum;
}));
}
for handle in handles {
handle.join().unwrap();
}
println!("Общий вес: {}", *total_weight.lock().unwrap()); // 36
}
Мы делим массив на куски, как ингредиенты для поваров. Каждый поток считает свою часть, а затем добавляет результат в общую "кастрюлю" через Mutex
.
Создайте программу, которая параллельно парсит длину содержимого веб-страниц по списку URL.
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;
use reqwest::blocking::get;
fn fetch_url(url: &str) -> Result {
let response = get(url)?;
Ok(response.text()?.len())
}
fn main() {
let urls = vec![
"https://www.rust-lang.org",
"https://www.google.com",
"https://www.github.com",
];
let results = Arc::new(Mutex::new(Vec::new()));
let (tx, rx) = mpsc::channel();
let mut handles = vec![];
for url in urls {
let tx = tx.clone();
let url = url.to_string();
handles.push(thread::spawn(move || {
match fetch_url(&url) {
Ok(len) => tx.send((url, len)).unwrap(),
Err(e) => eprintln!("Ошибка для {}: {}", url, e),
}
}));
}
drop(tx); // Закрываем ящик после отправки
for handle in handles {
handle.join().unwrap();
}
while let Ok((url, len)) = rx.recv() {
let mut res = results.lock().unwrap();
res.push((url, len));
}
for (url, len) in results.lock().unwrap().iter() {
println!("Страница: {}, Длина: {}", url, len);
}
}
reqwest
для HTTP-запросов (добавьте в Cargo.toml
: reqwest = { version = "0.11", features = ["blocking"] }
).Arc<Mutex<Vec>>
.Каждый поток — как курьер, который едет за "посылкой" (HTML-страницей). Он отправляет результат через канал, а главный поток собирает все посылки в общую коробку (results
).
drop(tx)
нужен, чтобы канал закрылся после завершения потоков, иначе recv
будет ждать вечно.reqwest
ждёт ответа — как курьер, застрявший в пробке. Рассмотрите асинхронный подход с tokio
.Многопоточность в Rust — это как организовать вечеринку: потоки — гости, каналы — почта, а Mutex
и RwLock
— правила доступа к угощениям. Вы научились управлять этим праздником безопасно и эффективно. Практикуйтесь, экспериментируйте и наслаждайтесь мощью параллелизма!