How Hyper Server setup
hello.rs from offcial hyper repository
Below is a simple example of how to set up a Hyper server that listens for incoming HTTP requests and responds with "Hello World !". This example uses Tokio for asynchronous I/O and is structured to run in an asynchronous context.
#![deny(warnings)] use std::convert::Infallible; use std::net::SocketAddr; use bytes::Bytes; use http_body_util::Full; use hyper::server::conn::http1; use hyper::service::service_fn; use hyper::{Request, Response}; use tokio::net::TcpListener; // This would normally come from the `hyper-util` crate, but we can't depend // on that here because it would be a cyclical dependency. #[path = "../benches/support/mod.rs"] mod support; use support::{TokioIo, TokioTimer}; // An async function that consumes a request, does nothing with it and returns a // response. async fn hello(_: Request<impl hyper::body::Body>) -> Result<Response<Full<Bytes>>, Infallible> { Ok(Response::new(Full::new(Bytes::from("Hello World !")))) } #[tokio::main] pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { pretty_env_logger::init(); // This address is localhost let addr: SocketAddr = ([127, 0, 0, 1], 3000).into(); // Bind to the port and listen for incoming TCP connections let listener = TcpListener::bind(addr).await?; println!("Listening on http://{}", addr); loop { // When an incoming TCP connection is received grab a TCP stream for // client<->server communication. // // Note, this is a .await point, this loop will loop forever but is not a busy loop. The // .await point allows the Tokio runtime to pull the task off of the thread until the task // has work to do. In this case, a connection arrives on the port we are listening on and // the task is woken up, at which point the task is then put back on a thread, and is // driven forward by the runtime, eventually yielding a TCP stream. let (tcp, _) = listener.accept().await?; // Use an adapter to access something implementing `tokio::io` traits as if they implement // `hyper::rt` IO traits. let io = TokioIo::new(tcp); // Spin up a new task in Tokio so we can continue to listen for new TCP connection on the // current task without waiting for the processing of the HTTP1 connection we just received // to finish tokio::task::spawn(async move { // Handle the connection from the client using HTTP1 and pass any // HTTP requests received on that connection to the `hello` function if let Err(err) = http1::Builder::new() .timer(TokioTimer::new()) .serve_connection(io, service_fn(hello)) .await { println!("Error serving connection: {:?}", err); } }); } }
First look
- use tokio::main macro to wrapper up async main function
- use SocketAddr to define the address to bind to
- use TcpListener to bind to a port and listen for incoming connections
- use loop to continuously accept incoming connections
- use listener.accept() to wait for an incoming connection
- use TokioIo to adapt the TCP stream to the hyper IO traits
- use tokio::task::spawn to handle each connection in a separate task
- use http1::Builder to create an HTTP1 connection handler
- use service_fn to create a service that handles requests
- use serve_connection from Builder to create Connection to handle client requests
Deep dive
TokioIo
TokioIo is a wrapper that implements Tokio's IO traits for an inner type that implements hyper's IO traits, or vice versa.
It allows you to use hyper's IO traits with types that implement Tokio's IO traits, and vice versa.
#![allow(unused)] fn main() { pin_project! { /// A wrapper that implements Tokio's IO traits for an inner type that /// implements hyper's IO traits, or vice versa (implements hyper's IO /// traits for a type that implements Tokio's IO traits). #[derive(Debug)] pub struct TokioIo<T> { #[pin] inner: T, } } impl<T> hyper::rt::Read for TokioIo<T> where T: tokio::io::AsyncRead, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: hyper::rt::ReadBufCursor<'_>, ) -> Poll<Result<(), std::io::Error>> { let n = unsafe { let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { Poll::Ready(Ok(())) => tbuf.filled().len(), other => return other, } }; unsafe { buf.advance(n); } Poll::Ready(Ok(())) } } impl<T> hyper::rt::Write for TokioIo<T> where T: tokio::io::AsyncWrite, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, std::io::Error>> { tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), std::io::Error>> { tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) } fn is_write_vectored(&self) -> bool { tokio::io::AsyncWrite::is_write_vectored(&self.inner) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[std::io::IoSlice<'_>], ) -> Poll<Result<usize, std::io::Error>> { tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) } } impl<T> tokio::io::AsyncRead for TokioIo<T> where T: hyper::rt::Read, { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, tbuf: &mut tokio::io::ReadBuf<'_>, ) -> Poll<Result<(), std::io::Error>> { //let init = tbuf.initialized().len(); let filled = tbuf.filled().len(); let sub_filled = unsafe { let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { Poll::Ready(Ok(())) => buf.filled().len(), other => return other, } }; let n_filled = filled + sub_filled; // At least sub_filled bytes had to have been initialized. let n_init = sub_filled; unsafe { tbuf.assume_init(n_init); tbuf.set_filled(n_filled); } Poll::Ready(Ok(())) } } impl<T> tokio::io::AsyncWrite for TokioIo<T> where T: hyper::rt::Write, { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize, std::io::Error>> { hyper::rt::Write::poll_write(self.project().inner, cx, buf) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> { hyper::rt::Write::poll_flush(self.project().inner, cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Result<(), std::io::Error>> { hyper::rt::Write::poll_shutdown(self.project().inner, cx) } fn is_write_vectored(&self) -> bool { hyper::rt::Write::is_write_vectored(&self.inner) } fn poll_write_vectored( self: Pin<&mut Self>, cx: &mut Context<'_>, bufs: &[std::io::IoSlice<'_>], ) -> Poll<Result<usize, std::io::Error>> { hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) } } }
Builder
Builder is used to configure the HTTP/1 server connection. It allows you to set various options such as half-close behavior, keep-alive settings, header case sensitivity, maximum buffer size, and more.
/// A configuration builder for HTTP/1 server connections. /// /// **Note**: The default values of options are *not considered stable*. They /// are subject to change at any time. /// /// # Example /// /// ``` /// # use std::time::Duration; /// # use hyper::server::conn::http1::Builder; /// # fn main() { /// let mut http = Builder::new(); /// // Set options one at a time /// http.half_close(false); /// /// // Or, chain multiple options /// http.keep_alive(false).title_case_headers(true).max_buf_size(8192); /// /// # } /// ``` /// /// Use [`Builder::serve_connection`](struct.Builder.html#method.serve_connection) /// to bind the built connection to a service. #[derive(Clone, Debug)] pub struct Builder { h1_parser_config: httparse::ParserConfig, timer: Time, h1_half_close: bool, h1_keep_alive: bool, h1_title_case_headers: bool, h1_preserve_header_case: bool, h1_max_headers: Option<usize>, h1_header_read_timeout: Dur, h1_writev: Option<bool>, max_buf_size: Option<usize>, pipeline_flush: bool, date_header: bool, }
Let's see how serve_connection
is used to create a Connection
future that processes all HTTP state for the IO object.
impl Builder { /// Bind a connection together with a [`Service`](crate::service::Service). /// /// This returns a Future that must be polled in order for HTTP to be /// driven on the connection. /// /// # Panics /// /// If a timeout option has been configured, but a `timer` has not been /// provided, calling `serve_connection` will panic. /// /// # Example /// /// ``` /// # use hyper::{body::Incoming, Request, Response}; /// # use hyper::service::Service; /// # use hyper::server::conn::http1::Builder; /// # use hyper::rt::{Read, Write}; /// # async fn run<I, S>(some_io: I, some_service: S) /// # where /// # I: Read + Write + Unpin + Send + 'static, /// # S: Service<hyper::Request<Incoming>, Response=hyper::Response<Incoming>> + Send + 'static, /// # S::Error: Into<Box<dyn std::error::Error + Send + Sync>>, /// # S::Future: Send, /// # { /// let http = Builder::new(); /// let conn = http.serve_connection(some_io, some_service); /// /// if let Err(e) = conn.await { /// eprintln!("server connection error: {}", e); /// } /// # } /// # fn main() {} /// ``` pub fn serve_connection<I, S>(&self, io: I, service: S) -> Connection<I, S> where S: HttpService<IncomingBody>, S::Error: Into<Box<dyn StdError + Send + Sync>>, S::ResBody: 'static, <S::ResBody as Body>::Error: Into<Box<dyn StdError + Send + Sync>>, I: Read + Write + Unpin, { let mut conn = proto::Conn::new(io); conn.set_h1_parser_config(self.h1_parser_config.clone()); conn.set_timer(self.timer.clone()); if !self.h1_keep_alive { conn.disable_keep_alive(); } if self.h1_half_close { conn.set_allow_half_close(); } if self.h1_title_case_headers { conn.set_title_case_headers(); } if self.h1_preserve_header_case { conn.set_preserve_header_case(); } if let Some(max_headers) = self.h1_max_headers { conn.set_http1_max_headers(max_headers); } if let Some(dur) = self .timer .check(self.h1_header_read_timeout, "header_read_timeout") { conn.set_http1_header_read_timeout(dur); }; if let Some(writev) = self.h1_writev { if writev { conn.set_write_strategy_queue(); } else { conn.set_write_strategy_flatten(); } } conn.set_flush_pipeline(self.pipeline_flush); if let Some(max) = self.max_buf_size { conn.set_max_buf_size(max); } if !self.date_header { conn.disable_date_header(); } let sd = proto::h1::dispatch::Server::new(service); let proto = proto::h1::Dispatcher::new(sd, conn); Connection { conn: proto } } }
There are several steps to create a Connection
from the Builder
:
#![allow(unused)] fn main() { let mut conn = proto::Conn::new(io); let sd = proto::h1::dispatch::Server::new(service); let proto = proto::h1::Dispatcher::new(sd, conn); Connection { conn: proto } }
Conn
The Conn
struct is a low-level representation of an HTTP connection. It wrappers an IO object that implements Read
and Write
, and manages the state of the connection, including parsing incoming messages and writing responses.
The Buffered
type is used to handle buffered I/O, allowing for efficient reading and writing of data.
#![allow(unused)] fn main() { /// This handles a connection, which will have been established over an /// `Read + Write` (like a socket), and will likely include multiple /// `Transaction`s over HTTP. /// /// The connection will determine when a message begins and ends as well as /// determine if this connection can be kept alive after the message, /// or if it is complete. pub(crate) struct Conn<I, B, T> { io: Buffered<I, EncodedBuf<B>>, state: State, _marker: PhantomData<fn(T)>, } }
Server
The Server
struct is a wrapper around a service(service: S
) that handles HTTP requests.
We've implemented the Dispatch
trait for Server
, which allows dispatcher to receive messages, poll incoming messages.
recv_msg
is used to receive a message, which includes the request head and body.
It creates a new request and calls the service with it, setting the in_flight
future to the service's response.
Later, poll_msg
is used to poll the in-flight future and return the response when it's ready.
#![allow(unused)] fn main() { use crate::service::HttpService; pub(crate) struct Server<S: HttpService<B>, B> { in_flight: Pin<Box<Option<S::Future>>>, pub(crate) service: S, } impl<S, B> Server<S, B> where S: HttpService<B>, { pub(crate) fn new(service: S) -> Server<S, B> { Server { in_flight: Box::pin(None), service, } } pub(crate) fn into_service(self) -> S { self.service } } // Service is never pinned impl<S: HttpService<B>, B> Unpin for Server<S, B> {} impl<S, Bs> Dispatch for Server<S, IncomingBody> where S: HttpService<IncomingBody, ResBody = Bs>, S::Error: Into<Box<dyn StdError + Send + Sync>>, Bs: Body, { type PollItem = MessageHead<http::StatusCode>; type PollBody = Bs; type PollError = S::Error; type RecvItem = RequestHead; fn poll_msg( mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> { let mut this = self.as_mut(); let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { let resp = ready!(fut.as_mut().poll(cx)?); let (parts, body) = resp.into_parts(); let head = MessageHead { version: parts.version, subject: parts.status, headers: parts.headers, extensions: parts.extensions, }; Poll::Ready(Some(Ok((head, body)))) } else { unreachable!("poll_msg shouldn't be called if no inflight"); }; // Since in_flight finished, remove it this.in_flight.set(None); ret } fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, IncomingBody)>) -> crate::Result<()> { let (msg, body) = msg?; let mut req = Request::new(body); *req.method_mut() = msg.subject.0; *req.uri_mut() = msg.subject.1; *req.headers_mut() = msg.headers; *req.version_mut() = msg.version; *req.extensions_mut() = msg.extensions; let fut = self.service.call(req); self.in_flight.set(Some(fut)); Ok(()) } fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), ()>> { if self.in_flight.is_some() { Poll::Pending } else { Poll::Ready(Ok(())) } } fn should_poll(&self) -> bool { self.in_flight.is_some() } } }
Dispatcher
Dispatcher
is a struct that stores the connection(Conn
) and the dispatch(Server
).
We also implement the Future
trait for Dispatcher
, allowing it to be polled for incoming requests.
It will call poll_catch
to handle the incoming messages and process them.
#![allow(unused)] fn main() { pub(crate) struct Dispatcher<D, Bs: Body, I, T> { conn: Conn<I, Bs::Data, T>, dispatch: D, body_tx: Option<crate::body::Sender>, body_rx: Pin<Box<Option<Bs>>>, is_closing: bool, } impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T> where D: Dispatch< PollItem = MessageHead<T::Outgoing>, PollBody = Bs, RecvItem = MessageHead<T::Incoming>, > + Unpin, D::PollError: Into<Box<dyn StdError + Send + Sync>>, I: Read + Write + Unpin, T: Http1Transaction + Unpin, Bs: Body + 'static, Bs::Error: Into<Box<dyn StdError + Send + Sync>>, { type Output = crate::Result<Dispatched>; #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { self.poll_catch(cx, true) } } impl<D, Bs, I, T> Dispatcher<D, Bs, I, T> where D: Dispatch< PollItem = MessageHead<T::Outgoing>, PollBody = Bs, RecvItem = MessageHead<T::Incoming>, > + Unpin, D::PollError: Into<Box<dyn StdError + Send + Sync>>, I: Read + Write + Unpin, T: Http1Transaction + Unpin, Bs: Body + 'static, Bs::Error: Into<Box<dyn StdError + Send + Sync>>, { fn poll_catch( &mut self, cx: &mut Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>> { Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| { // Be sure to alert a streaming body of the failure. if let Some(mut body) = self.body_tx.take() { body.send_error(crate::Error::new_body("connection error")); } // An error means we're shutting down either way. // We just try to give the error to the user, // and close the connection with an Ok. If we // cannot give it to the user, then return the Err. self.dispatch.recv_msg(Err(e))?; Ok(Dispatched::Shutdown) })) } fn poll_inner( &mut self, cx: &mut Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>> { T::update_date(); ready!(self.poll_loop(cx))?; if self.is_done() { if let Some(pending) = self.conn.pending_upgrade() { self.conn.take_error()?; return Poll::Ready(Ok(Dispatched::Upgrade(pending))); } else if should_shutdown { ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?; } self.conn.take_error()?; Poll::Ready(Ok(Dispatched::Shutdown)) } else { Poll::Pending } } fn poll_loop(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { // Limit the looping on this connection, in case it is ready far too // often, so that other futures don't starve. // // 16 was chosen arbitrarily, as that is number of pipelined requests // benchmarks often use. Perhaps it should be a config option instead. for _ in 0..16 { let _ = self.poll_read(cx)?; let _ = self.poll_write(cx)?; let _ = self.poll_flush(cx)?; // This could happen if reading paused before blocking on IO, // such as getting to the end of a framed message, but then // writing/flushing set the state back to Init. In that case, // if the read buffer still had bytes, we'd want to try poll_read // again, or else we wouldn't ever be woken up again. // // Using this instead of task::current() and notify() inside // the Conn is noticeably faster in pipelined benchmarks. if !self.conn.wants_read_again() { //break; return Poll::Ready(Ok(())); } } trace!("poll_loop yielding (self = {:p})", self); task::yield_now(cx).map(|never| match never {}) } fn poll_read(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { loop { if self.is_closing { return Poll::Ready(Ok(())); } else if self.conn.can_read_head() { ready!(self.poll_read_head(cx))?; } else if let Some(mut body) = self.body_tx.take() { if self.conn.can_read_body() { match body.poll_ready(cx) { Poll::Ready(Ok(())) => (), Poll::Pending => { self.body_tx = Some(body); return Poll::Pending; } Poll::Ready(Err(_canceled)) => { // user doesn't care about the body // so we should stop reading trace!("body receiver dropped before eof, draining or closing"); self.conn.poll_drain_or_close_read(cx); continue; } } match self.conn.poll_read_body(cx) { Poll::Ready(Some(Ok(frame))) => { if frame.is_data() { let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); match body.try_send_data(chunk) { Ok(()) => { self.body_tx = Some(body); } Err(_canceled) => { if self.conn.can_read_body() { trace!("body receiver dropped before eof, closing"); self.conn.close_read(); } } } } else if frame.is_trailers() { let trailers = frame.into_trailers().unwrap_or_else(|_| unreachable!()); match body.try_send_trailers(trailers) { Ok(()) => { self.body_tx = Some(body); } Err(_canceled) => { if self.conn.can_read_body() { trace!("body receiver dropped before eof, closing"); self.conn.close_read(); } } } } else { // we should have dropped all unknown frames in poll_read_body error!("unexpected frame"); } } Poll::Ready(None) => { // just drop, the body will close automatically } Poll::Pending => { self.body_tx = Some(body); return Poll::Pending; } Poll::Ready(Some(Err(e))) => { body.send_error(crate::Error::new_body(e)); } } } else { // just drop, the body will close automatically } } else { return self.conn.poll_read_keep_alive(cx); } } } fn poll_read_head(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { // can dispatch receive, or does it still care about other incoming message? match ready!(self.dispatch.poll_ready(cx)) { Ok(()) => (), Err(()) => { trace!("dispatch no longer receiving messages"); self.close(); return Poll::Ready(Ok(())); } } // dispatch is ready for a message, try to read one match ready!(self.conn.poll_read_head(cx)) { Some(Ok((mut head, body_len, wants))) => { let body = match body_len { DecodedLength::ZERO => IncomingBody::empty(), other => { let (tx, rx) = IncomingBody::new_channel(other, wants.contains(Wants::EXPECT)); self.body_tx = Some(tx); rx } }; if wants.contains(Wants::UPGRADE) { let upgrade = self.conn.on_upgrade(); debug_assert!(!upgrade.is_none(), "empty upgrade"); debug_assert!( head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set" ); head.extensions.insert(upgrade); } self.dispatch.recv_msg(Ok((head, body)))?; Poll::Ready(Ok(())) } Some(Err(err)) => { debug!("read_head error: {}", err); self.dispatch.recv_msg(Err(err))?; // if here, the dispatcher gave the user the error // somewhere else. we still need to shutdown, but // not as a second error. self.close(); Poll::Ready(Ok(())) } None => { // read eof, the write side will have been closed too unless // allow_read_close was set to true, in which case just do // nothing... debug_assert!(self.conn.is_read_closed()); if self.conn.is_write_closed() { self.close(); } Poll::Ready(Ok(())) } } } fn poll_write(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> { loop { if self.is_closing { return Poll::Ready(Ok(())); } else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() { if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { let (head, body) = msg.map_err(crate::Error::new_user_service)?; let body_type = if body.is_end_stream() { self.body_rx.set(None); None } else { let btype = body .size_hint() .exact() .map(BodyLength::Known) .or(Some(BodyLength::Unknown)); self.body_rx.set(Some(body)); btype }; self.conn.write_head(head, body_type); } else { self.close(); return Poll::Ready(Ok(())); } } else if !self.conn.can_buffer_body() { ready!(self.poll_flush(cx))?; } else { // A new scope is needed :( if let (Some(mut body), clear_body) = OptGuard::new(self.body_rx.as_mut()).guard_mut() { debug_assert!(!*clear_body, "opt guard defaults to keeping body"); if !self.conn.can_write_body() { trace!( "no more write body allowed, user body is_end_stream = {}", body.is_end_stream(), ); *clear_body = true; continue; } let item = ready!(body.as_mut().poll_frame(cx)); if let Some(item) = item { let frame = item.map_err(|e| { *clear_body = true; crate::Error::new_user_body(e) })?; if frame.is_data() { let chunk = frame.into_data().unwrap_or_else(|_| unreachable!()); let eos = body.is_end_stream(); if eos { *clear_body = true; if chunk.remaining() == 0 { trace!("discarding empty chunk"); self.conn.end_body()?; } else { self.conn.write_body_and_end(chunk); } } else { if chunk.remaining() == 0 { trace!("discarding empty chunk"); continue; } self.conn.write_body(chunk); } } else if frame.is_trailers() { *clear_body = true; self.conn.write_trailers( frame.into_trailers().unwrap_or_else(|_| unreachable!()), ); } else { trace!("discarding unknown frame"); continue; } } else { *clear_body = true; self.conn.end_body()?; } } else { // If there's no body_rx, end the body if self.conn.can_write_body() { self.conn.end_body()?; } else { return Poll::Pending; } } } } } } }
poll
will callpoll_catch
to process the connection and handle incoming messages.poll_catch
will callpoll_inner
, except when an error occurs, it will send an error to the body and returnDispatched::Shutdown
.poll_inner
will callpoll_loop
and check if the connection is done. If it is done, it will check for pending upgrades or shutdown the connection.poll_loop
will loop up to 16 times, polling for read, write, and flush operations on the connection. Thepoll_read
,poll_write
, andpoll_flush
methods are responsible for handling the actual I/O operations.
Connection
Now, let's look at how Connection
is defined and how it uses the Dispatcher
.
Connection is a future that processes all HTTP state for the IO object.
Internally, it uses a Dispatcher
to manage the state of the connection and handle incoming and outgoing messages.
The Connection
future is designed to be spawned into an executor, allowing it to process requests asynchronously.
The inner
field of Connection
is an instance of Dispatcher
, which handles the actual HTTP processing.
#![allow(unused)] fn main() { /// A future that processes all HTTP state for the IO object. /// /// In most cases, this should just be spawned into an executor, so that it /// can process incoming and outgoing messages, notice hangups, and the like. /// /// Instances of this type are typically created via the [`handshake`] function #[must_use = "futures do nothing unless polled"] pub struct Connection<T, B> where T: Read + Write, B: Body + 'static, { // The inner dispatcher we creaed earlier, which implements the Future trait. // So that we can poll it to process the connection. inner: Dispatcher<T, B>, } impl<T, B> Future for Connection<T, B> where T: Read + Write + Unpin, B: Body + 'static, B::Data: Send, B::Error: Into<Box<dyn StdError + Send + Sync>>, { type Output = crate::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match ready!(Pin::new(&mut self.inner).poll(cx))? { proto::Dispatched::Shutdown => Poll::Ready(Ok(())), proto::Dispatched::Upgrade(pending) => { // With no `Send` bound on `I`, we can't try to do // upgrades here. In case a user was trying to use // `upgrade` with this API, send a special // error letting them know about that. pending.manual(); Poll::Ready(Ok(())) } } } } }