Send Http Request in Handle function or in started function When Using Actix crate
- Send Http Request in Handle function
- Send Http Request in started function
- Full source code
- Appendix
Send Http Request in Handle function
When using actors to develop concurrent applications, you may need to run asynchronous functions, such as sending HTTP requests, when an actor is started or when handling specific messages.
We know there's a method called started
when implementing Actor
trait. The Actor
trait is defined as follows:
#![allow(unused)] fn main() { pub trait Actor: Sized + Unpin + 'static { /// Actor execution context type type Context: ActorContext; /// Called when an actor gets polled the first time. fn started(&mut self, ctx: &mut Self::Context) {} /// Called after an actor is in `Actor::Stopping` state. /// /// There can be several reasons for stopping: /// /// - `Context::stop` gets called by the actor itself. /// - All addresses to the current actor get dropped and no more /// evented objects are left in the context. /// /// An actor can return from the stopping state to the running /// state by returning `Running::Continue`. fn stopping(&mut self, ctx: &mut Self::Context) -> Running { Running::Stop } /// Called after an actor is stopped. /// /// This method can be used to perform any needed cleanup work or /// to spawn more actors. This is the final state, after this /// method got called, the actor will be dropped. fn stopped(&mut self, ctx: &mut Self::Context) {} /// Start a new asynchronous actor, returning its address. fn start(self) -> Addr<Self> where Self: Actor<Context = Context<Self>>, { Context::new().run(self) } /// Construct and start a new asynchronous actor, returning its /// address. /// /// This is constructs a new actor using the `Default` trait, and /// invokes its `start` method. fn start_default() -> Addr<Self> where Self: Actor<Context = Context<Self>> + Default, { Self::default().start() } /// Start new actor in arbiter's thread. fn start_in_arbiter<F>(wrk: &ArbiterHandle, f: F) -> Addr<Self> where Self: Actor<Context = Context<Self>>, F: FnOnce(&mut Context<Self>) -> Self + Send + 'static, { let (tx, rx) = channel::channel(DEFAULT_CAPACITY); // create actor wrk.spawn_fn(move || { let mut ctx = Context::with_receiver(rx); let act = f(&mut ctx); let fut = ctx.into_future(act); actix_rt::spawn(fut); }); Addr::new(tx) } /// Start a new asynchronous actor given a `Context`. /// /// Use this method if you need the `Context` object during actor /// initialization. fn create<F>(f: F) -> Addr<Self> where Self: Actor<Context = Context<Self>>, F: FnOnce(&mut Context<Self>) -> Self, { let mut ctx = Context::new(); let act = f(&mut ctx); ctx.run(act) } } }
The started
function will be called when actor is started, but if we call async function in started
function(e.g. sending http request), we'll get an error:
#![allow(unused)] fn main() { error[E0728]: `await` is only allowed inside `async` functions and blocks --> src/bin/call-async-in-non-async-function.rs:25:57 | 22 | / fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result { 23 | | // async move { Ok(()) } 24 | | 25 | | let response = reqwest::get("https://hyper.rs").await.unwrap(); | | ^^^^^ only allowed inside `async` functions and blocks ... | 35 | | // }) 36 | | } | |_____- this is not `async` For more information about this error, try `rustc --explain E0728`. warning: `actix_example` (bin "call-async-in-non-async-function") generated 6 warnings error: could not compile `actix_example` (bin "call-async-in-non-async-function") due to previous error; 6 warnings emitted }
In Rust, await
can only be used within an async function or an async block. You can refer to Async book for more details.
The solution is easy, I'll explain it step by step.
Return type Result<(), ()>
Let's start with calling async function or async block in handle
method.
We can specify the result to be a ResponseFuture<Result<(), ()>>
and wrapper async block with Box::pin
.
#![allow(unused)] fn main() { #[derive(Message)] #[rtype(result = "Result<(), ()>")] struct Msg; struct MyActor2; impl Actor for MyActor2 { type Context = Context<Self>; } impl Handler<Msg> for MyActor2 { type Result = ResponseFuture<Result<(), ()>>; fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result { Box::pin(async move { // Some async computation println!("Box::pin called"); Ok(()) }) } } }
As we use ResponseFuture<Result<(), ()>>
type in Handler
trait's associated type Result
, we can return a Box Future using Box::pin
function in handle
method.
Return type Result<usize, ()>
Now, let's change return type from Result<(), ()>
to Result<usize, ()>
, which will return a usize
from async block.
#![allow(unused)] fn main() { #[derive(Message)] #[rtype(result = "Result<usize, ()>")] struct Msg3; struct MyActor3; impl Actor for MyActor3 { type Context = Context<Self>; } impl Handler<Msg3> for MyActor3 { type Result = ResponseActFuture<Self, Result<usize, ()>>; fn handle(&mut self, _: Msg3, _: &mut Context<Self>) -> Self::Result { Box::pin( async { println!("will return 42"); // Some async computation 42 } .into_actor(self) // converts future to ActorFuture .map(|res, _act, _ctx| { println!("map"); // Do some computation with actor's state or context Ok(res) }), ) } } }
We need to change in 3 places:
- Using
#[rtype(result = "Result<usize, ()>")]
macro instruct Msg3
- Change associated type from
ResponseActFuture<Self, Result<(), ()>>;
toResponseActFuture<Self, Result<usize, ()>>;
- Change async block to return a value of
usize
Return type Result<u16, ()>
If we care about the status code from http response, what should we do? Obviousely, we can declare a Result<u16, ()>
type. Here u16
represents the status code from http response.
#![allow(unused)] fn main() { #[derive(Message)] #[rtype(result = "Result<u16, ()>")] // return http status code struct Msg4; struct MyActor4; impl Actor for MyActor4 { type Context = Context<Self>; } impl Handler<Msg4> for MyActor4 { // type Result = ResponseActFuture<Self, Result<usize, ()>>; type Result = ResponseActFuture<Self, Result<u16, ()>>; fn handle(&mut self, _: Msg4, _: &mut Context<Self>) -> Self::Result { // let res = reqwest::get("https://hyper.rs").await?; // println!("Status: {}", res.status()); // let body = res.text().await?; Box::pin( async { println!("will return 42"); let status_code = match reqwest::get("https://hyper.rs").await { Ok(response) => { println!("Got status from hyper.rs {}", response.status()); response.status().as_u16() }, Err(err) => { println!("get response error : {err}"); 42 as u16 }, }; status_code } .into_actor(self) // converts future to ActorFuture .map(|res, _act, _ctx| { println!("result in map process : {res}"); // Do some computation with actor's state or context Ok(res) }), ) } } }
In async block, we return status code using response.status().as_u16()
.
Return type Result<String, ()>
What if we want to use the response body, what should we do? It's quite easy to change from u16
to String
. The code looks like this:
#![allow(unused)] fn main() { #[derive(Message)] #[rtype(result = "Result<String, ()>")] // return http reponse body struct Msg5; struct MyActor5; impl Actor for MyActor5 { type Context = Context<Self>; } impl Handler<Msg5> for MyActor5 { // type Result = ResponseActFuture<Self, Result<usize, ()>>; type Result = ResponseActFuture<Self, Result<String, ()>>; fn handle(&mut self, _: Msg5, _: &mut Context<Self>) -> Self::Result { // let res = reqwest::get("https://hyper.rs").await?; // println!("Status: {}", res.status()); // let body = res.text().await?; Box::pin( async { let status_code = match reqwest::get("https://hyper.rs").await { Ok(response) => { println!("Reponse Ok from hyper.rs {}", response.status()); match response.text().await { Ok(body) => body, Err(err) => { format!("Convert Reposne to string error : {err}") } } }, Err(err) => { format!("Reposne error from hyper.rs, error : {err}") }, }; status_code } .into_actor(self) // converts future to ActorFuture .map(|res, _act, _ctx| { println!("result in map process : {res}"); // Do some computation with actor's state or context Ok(res) }), ) } } }
Now, we use response.text().await
to convert reponse to string and return the response body for later use.
Send Http Request in started function
If we want to store some state in actor and initialize it when actor is started, we can use context.wait
to wait an async block, turn it into an actor through into_actor
and store the return value of async block in then
method.
#![allow(unused)] fn main() { #[derive(Clone)] struct MyActor { status_code: Option<u16>, } impl MyActor { fn print_status_code(&mut self, context: &mut Context<Self>) { println!("status code: {:?}", self.status_code); } } impl Actor for MyActor { type Context = Context<Self>; fn started(&mut self, context: &mut Context<Self>) { println!("In started"); // ✅NOTE: This will run context.wait( async move { // send http reqwest let status_code = match reqwest::get("https://hyper.rs").await { Ok(response) => { println!( "Got status from hyper.rs {}", response.status() ); response.status().as_u16() } Err(err) => { println!("get response error : {err}"); 42 as u16 } }; println!("status code: {status_code}"); status_code } .into_actor(self) .then(|output, s, ctx| { s.status_code = Some(output); fut::ready(()) }), ); IntervalFunc::new(Duration::from_millis(5000), Self::print_status_code) .finish() .spawn(context); context.run_later(Duration::from_millis(20000), |_, _| { System::current().stop() }); } } }
In this example, we store status code as Option<u16>
in MyActor
and save it then method from ActorFutureExt
trait:
#![allow(unused)] fn main() { fn started(&mut self, context: &mut Context<Self>) { context.wait( async move { // send http reqwest let status_code = match reqwest::get("https://hyper.rs").await { Ok(response) => { response.status().as_u16() } Err(err) => { 42 as u16 } }; status_code } .into_actor(self) .then(|output, s, ctx| { s.status_code = Some(output); fut::ready(()) }), ); } }
Here is the definition of ActorFutureExt
trait.
#![allow(unused)] fn main() { pub trait ActorFutureExt<A: Actor>: ActorFuture<A> { /// Map this future's result to a different type, returning a new future of /// the resulting type. fn map<F, U>(self, f: F) -> Map<Self, F> where F: FnOnce(Self::Output, &mut A, &mut A::Context) -> U, Self: Sized, { Map::new(self, f) } /// Chain on a computation for when a future finished, passing the result of /// the future to the provided closure `f`. fn then<F, Fut>(self, f: F) -> Then<Self, Fut, F> where F: FnOnce(Self::Output, &mut A, &mut A::Context) -> Fut, Fut: ActorFuture<A>, Self: Sized, { then::new(self, f) } /// Add timeout to futures chain. /// /// `Err(())` returned as a timeout error. fn timeout(self, timeout: Duration) -> Timeout<Self> where Self: Sized, { Timeout::new(self, timeout) } /// Wrap the future in a Box, pinning it. /// /// A shortcut for wrapping in [`Box::pin`]. fn boxed_local(self) -> LocalBoxActorFuture<A, Self::Output> where Self: Sized + 'static, { Box::pin(self) } } }
Full source code
Send http request in handle
function
use actix::prelude::*; use anyhow::Result; use futures::prelude::*; use tokio::time::{sleep, Duration}; #[derive(Message)] #[rtype(result = "Result<(), ()>")] struct Msg; struct MyActor2; impl Actor for MyActor2 { type Context = Context<Self>; } impl Handler<Msg> for MyActor2 { type Result = ResponseFuture<Result<(), ()>>; fn handle(&mut self, _: Msg, _: &mut Context<Self>) -> Self::Result { Box::pin(async move { // Some async computation println!("Box::pin called"); Ok(()) }) } } #[derive(Message)] #[rtype(result = "Result<usize, ()>")] struct Msg3; struct MyActor3; impl Actor for MyActor3 { type Context = Context<Self>; } impl Handler<Msg3> for MyActor3 { type Result = ResponseActFuture<Self, Result<usize, ()>>; fn handle(&mut self, _: Msg3, _: &mut Context<Self>) -> Self::Result { Box::pin( async { println!("will return 42"); // Some async computation 42 } .into_actor(self) // converts future to ActorFuture .map(|res, _act, _ctx| { println!("map"); // Do some computation with actor's state or context Ok(res) }), ) } } #[derive(Message)] #[rtype(result = "Result<u16, ()>")] // return http status code struct Msg4; struct MyActor4; impl Actor for MyActor4 { type Context = Context<Self>; } impl Handler<Msg4> for MyActor4 { // type Result = ResponseActFuture<Self, Result<usize, ()>>; type Result = ResponseActFuture<Self, Result<u16, ()>>; fn handle(&mut self, _: Msg4, _: &mut Context<Self>) -> Self::Result { // let res = reqwest::get("https://hyper.rs").await?; // println!("Status: {}", res.status()); // let body = res.text().await?; Box::pin( async { println!("will return 42"); let status_code = match reqwest::get("https://hyper.rs").await { Ok(response) => { println!("Got status from hyper.rs {}", response.status()); response.status().as_u16() }, Err(err) => { println!("get response error : {err}"); 42 as u16 }, }; status_code } .into_actor(self) // converts future to ActorFuture .map(|res, _act, _ctx| { println!("result in map process : {res}"); // Do some computation with actor's state or context Ok(res) }), ) } } #[derive(Message)] #[rtype(result = "Result<String, ()>")] // return http reponse body struct Msg5; struct MyActor5; impl Actor for MyActor5 { type Context = Context<Self>; } impl Handler<Msg5> for MyActor5 { // type Result = ResponseActFuture<Self, Result<usize, ()>>; type Result = ResponseActFuture<Self, Result<String, ()>>; fn handle(&mut self, _: Msg5, _: &mut Context<Self>) -> Self::Result { // let res = reqwest::get("https://hyper.rs").await?; // println!("Status: {}", res.status()); // let body = res.text().await?; Box::pin( async { let status_code = match reqwest::get("https://hyper.rs").await { Ok(response) => { println!("Reponse Ok from hyper.rs {}", response.status()); match response.text().await { Ok(body) => body, Err(err) => { format!("Convert Reposne to string error : {err}") } } }, Err(err) => { format!("Reposne error from hyper.rs, error : {err}") }, }; status_code } .into_actor(self) // converts future to ActorFuture .map(|res, _act, _ctx| { println!("result in map process : {res}"); // Do some computation with actor's state or context Ok(res) }), ) } } fn main() -> Result<()> { let mut sys = actix::System::new(); sys.block_on(async { // let _addr = MyActor {}.start(); // let _addr = MyActor2 {}.start(); // let addr = MyActor3 {}.start(); // addr.do_send(Msg3 {}) // OK // let addr = MyActor4 {}.start(); // addr.do_send(Msg4 {}) // OK let addr = MyActor5 {}.start(); addr.do_send(Msg5 {}) }); sys.run()?; Ok(()) }
Send http request in started
function
use actix::prelude::*; use actix::utils::IntervalFunc; use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot::channel; use tokio::sync::Mutex; #[derive(Clone)] struct MyActor { status_code: Option<u16>, } impl MyActor { fn tick(&mut self, context: &mut Context<Self>) { println!("tick"); } fn print_status_code(&mut self, context: &mut Context<Self>) { println!("status code: {:?}", self.status_code); } } impl Actor for MyActor { type Context = Context<Self>; fn started(&mut self, context: &mut Context<Self>) { println!("In started"); // ✅NOTE: This will run context.wait( async move { // send http reqwest let status_code = match reqwest::get("https://hyper.rs").await { Ok(response) => { println!( "Got status from hyper.rs {}", response.status() ); response.status().as_u16() } Err(err) => { println!("get response error : {err}"); 42 as u16 } }; println!("status code: {status_code}"); status_code } .into_actor(self) .then(|output, s, ctx| { s.status_code = Some(output); fut::ready(()) }), ); IntervalFunc::new(Duration::from_millis(5000), Self::print_status_code) .finish() .spawn(context); context.run_later(Duration::from_millis(20000), |_, _| { System::current().stop() }); } } fn main() { let mut sys = System::new(); let addr = sys.block_on(async { MyActor { status_code: None }.start() }); sys.run(); }
Appendix
Actor trait
/// Actors are objects which encapsulate state and behavior. /// /// Actors run within a specific execution context /// [`Context<A>`](struct.Context.html). The context object is available /// only during execution. Each actor has a separate execution /// context. The execution context also controls the lifecycle of an /// actor. /// /// Actors communicate exclusively by exchanging messages. The sender /// actor can wait for a response. Actors are not referenced directly, /// but by address [`Addr`](struct.Addr.html) To be able to handle a /// specific message actor has to provide a /// [`Handler<M>`](trait.Handler.html) implementation for this /// message. All messages are statically typed. A message can be /// handled in asynchronous fashion. An actor can spawn other actors /// or add futures or streams to the execution context. The actor /// trait provides several methods that allow controlling the actor /// lifecycle. /// /// # Actor lifecycle /// /// ## Started /// /// An actor starts in the `Started` state, during this state the /// `started` method gets called. /// /// ## Running /// /// After an actor's `started` method got called, the actor /// transitions to the `Running` state. An actor can stay in the /// `running` state for an indefinite amount of time. /// /// ## Stopping /// /// The actor's execution state changes to `stopping` in the following /// situations: /// /// * `Context::stop` gets called by actor itself /// * all addresses to the actor get dropped /// * no evented objects are registered in its context. /// /// An actor can return from the `stopping` state to the `running` /// state by creating a new address or adding an evented object, like /// a future or stream, in its `Actor::stopping` method. /// /// If an actor changed to a `stopping` state because /// `Context::stop()` got called, the context then immediately stops /// processing incoming messages and calls the `Actor::stopping()` /// method. If an actor does not return back to a `running` state, /// all unprocessed messages get dropped. /// /// ## Stopped /// /// If an actor does not modify execution context while in stopping /// state, the actor state changes to `Stopped`. This state is /// considered final and at this point the actor gets dropped. #[allow(unused_variables)] pub trait Actor: Sized + Unpin + 'static { /// Actor execution context type type Context: ActorContext; /// Called when an actor gets polled the first time. fn started(&mut self, ctx: &mut Self::Context) {} /// Called after an actor is in `Actor::Stopping` state. /// /// There can be several reasons for stopping: /// /// - `Context::stop` gets called by the actor itself. /// - All addresses to the current actor get dropped and no more /// evented objects are left in the context. /// /// An actor can return from the stopping state to the running /// state by returning `Running::Continue`. fn stopping(&mut self, ctx: &mut Self::Context) -> Running { Running::Stop } /// Called after an actor is stopped. /// /// This method can be used to perform any needed cleanup work or /// to spawn more actors. This is the final state, after this /// method got called, the actor will be dropped. fn stopped(&mut self, ctx: &mut Self::Context) {} /// Start a new asynchronous actor, returning its address. /// /// # Examples /// /// ``` /// use actix::prelude::*; /// /// struct MyActor; /// impl Actor for MyActor { /// type Context = Context<Self>; /// } /// /// #[actix::main] /// async fn main() { /// // start actor and get its address /// let addr = MyActor.start(); /// # System::current().stop(); /// } /// ``` fn start(self) -> Addr<Self> where Self: Actor<Context = Context<Self>>, { Context::new().run(self) } /// Construct and start a new asynchronous actor, returning its /// address. /// /// This is constructs a new actor using the `Default` trait, and /// invokes its `start` method. fn start_default() -> Addr<Self> where Self: Actor<Context = Context<Self>> + Default, { Self::default().start() } /// Start new actor in arbiter's thread. fn start_in_arbiter<F>(wrk: &ArbiterHandle, f: F) -> Addr<Self> where Self: Actor<Context = Context<Self>>, F: FnOnce(&mut Context<Self>) -> Self + Send + 'static, { let (tx, rx) = channel::channel(DEFAULT_CAPACITY); // create actor wrk.spawn_fn(move || { let mut ctx = Context::with_receiver(rx); let act = f(&mut ctx); let fut = ctx.into_future(act); actix_rt::spawn(fut); }); Addr::new(tx) } /// Start a new asynchronous actor given a `Context`. /// /// Use this method if you need the `Context` object during actor /// initialization. /// /// # Examples /// /// ``` /// use actix::prelude::*; /// /// struct MyActor { /// val: usize, /// } /// impl Actor for MyActor { /// type Context = Context<Self>; /// } /// /// #[actix::main] /// async fn main() { /// let addr = MyActor::create(|ctx: &mut Context<MyActor>| MyActor { val: 10 }); /// # System::current().stop(); /// } /// ``` fn create<F>(f: F) -> Addr<Self> where Self: Actor<Context = Context<Self>>, F: FnOnce(&mut Context<Self>) -> Self, { let mut ctx = Context::new(); let act = f(&mut ctx); ctx.run(act) } }