Zware waarschuwing Ik heb deze bibliotheek nog nooit eerder gebruikt, en mijn kennis op laag niveau van sommige concepten is een beetje... ontbreekt. Meestal lees ik de tutorial door. Ik ben er vrij zeker van dat iedereen die async werk heeft gedaan dit zal lezen en erom zal lachen, maar het kan een nuttig startpunt zijn voor andere mensen. Waarschuwing emptor!
Laten we beginnen met iets eenvoudigers, en laten zien hoe een Stream
werken. We kunnen een iterator van Result
. converteren s in een stream:
extern crate futures;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
let payloads = stream::iter(payloads.into_iter());
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
}
Dit toont ons een manier om de stream te consumeren. We gebruiken and_then
om iets met elke lading te doen (hier drukt u het gewoon af) en dan for_each
om de Stream
te converteren terug naar een Future
. We kunnen dan de toekomst uitvoeren door de vreemd genaamde forget
. aan te roepen methode.
Het volgende is om de Redis-bibliotheek in de mix te binden en slechts één bericht te verwerken. Sinds de get_message()
methode blokkeert, moeten we enkele threads in de mix introduceren. Het is geen goed idee om grote hoeveelheden werk uit te voeren in dit type asynchroon systeem, omdat al het andere wordt geblokkeerd. Bijvoorbeeld:
Tenzij anders is afgesproken, moet ervoor worden gezorgd dat implementaties van deze functie zeer snel worden voltooid .
In een ideale wereld zou de redis-krat bovenop een bibliotheek zoals futures worden gebouwd en dit alles native blootleggen.
extern crate redis;
extern crate futures;
use std::thread;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let msg = pubsub.get_message().expect("Unable to get message");
let payload: Result<String, _> = msg.get_payload();
tx.send(payload).forget();
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Mijn begrip wordt hier vager. In een aparte thread blokkeren we voor het bericht en duwen het in het kanaal wanneer we het ontvangen. Wat ik niet begrijp is waarom we het handvat van de draad moeten vasthouden. Ik zou verwachten dat foo.forget
zou zichzelf blokkeren, wachtend tot de stream leeg is.
Stuur in een telnet-verbinding met de Redis-server dit:
publish rust awesome
En je zult zien dat het werkt. Het toevoegen van printstatements laat zien dat de (voor mij) de foo.forget
statement wordt uitgevoerd voordat de thread wordt voortgebracht.
Meerdere berichten is lastiger. De Sender
verbruikt zichzelf om te voorkomen dat de producerende kant te ver voorloopt op de consumerende kant. Dit wordt bereikt door een andere toekomst terug te sturen van send
! We moeten het daar weer wegbrengen om het opnieuw te gebruiken voor de volgende iteratie van de lus:
extern crate redis;
extern crate futures;
use std::thread;
use std::sync::mpsc;
use futures::Future;
use futures::stream::{self, Stream};
fn main() {
let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");
let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");
let (tx, payloads) = stream::channel();
let redis_thread = thread::spawn(move || {
let mut tx = tx;
while let Ok(msg) = pubsub.get_message() {
let payload: Result<String, _> = msg.get_payload();
let (next_tx_tx, next_tx_rx) = mpsc::channel();
tx.send(payload).and_then(move |new_tx| {
next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
futures::finished(())
}).forget();
tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
}
});
let foo = payloads
.and_then(|payload| futures::finished(println!("{}", payload)))
.for_each(|_| Ok(()));
foo.forget();
redis_thread.join().expect("unable to join to thread");
}
Ik ben er zeker van dat er in de loop van de tijd meer ecosystemen zullen zijn voor dit soort samenwerking. De futures-cpupool-krat kan bijvoorbeeld waarschijnlijk worden uitgebreid om een soortgelijke usecase te ondersteunen.