create a http server and upload a file and return a binary response in rust

To create an HTTP server in Rust, you can use the hyper crate. To handle file uploads, we can use the multipart feature in hyper, and to return a binary response, we can use the hyper::Body.

Here's an example server that listens on port 8080, expects a POST request with a file attached, and returns the uploaded file as the response:

main.rs
use hyper::{Body, Request, Response, Server};
use hyper::header::{HeaderValue, CONTENT_TYPE};
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use futures::StreamExt;
use std::fs::File;
use std::io::{self, Write};

fn main() {
    let addr = ([0, 0, 0, 0], 8080).into();
    let server = Server::bind(&addr)
        .serve(|| service_fn_ok(handle_request))
        .map_err(|e| eprintln!("server error: {}", e));

    hyper::rt::run(server);
}

fn handle_request(req: Request<Body>) -> Response<Body> {
    // Only accept POST requests
    if req.method() != &hyper::Method::POST {
        return Response::builder()
            .status(hyper::StatusCode::METHOD_NOT_ALLOWED)
            .body(Body::empty())
            .unwrap();
    }

    let boundary = req.headers()
        .get(CONTENT_TYPE)
        .and_then(|ct| ct.to_str().ok())
        .and_then(|ct| ct.find("boundary=").map(|i| i + 9))
        .and_then(|i| Some(&ct[i..]));

    if boundary.is_none() {
        return Response::builder()
            .status(hyper::StatusCode::BAD_REQUEST)
            .body(Body::empty())
            .unwrap();
    }

    let boundary = format!("--{}", boundary.unwrap());

    let (mut parts_tx, parts_rx) = futures::channel::mpsc::unbounded();
    let filename = "uploaded_file.dat";
    let file = File::create(filename).unwrap();

    let mut state = MultipartState::Start;

    let f = req.into_body()
        .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
        .fold((boundary, file, state), move |(boundary, mut file, mut state), chunk| {
            for line in chunk.split(|b| *b == b'\r').map(|s| &s[..s.len()-1]) {
                state = handle_multipart_line(line, &boundary, &mut file, state, &mut parts_tx);
            }
            Ok((boundary, file, state))
        })
        .map(move |(boundary, file, _)| {
            let mut res = Response::new(Body::wrap_stream(parts_rx.map(|part| {
                let mut part_res = Vec::new();
                part_res.extend_from_slice(b"Content-Type: ");
                part_res.extend_from_slice(part.content_type.as_slice());
                part_res.extend_from_slice(b"\r\n\r\n");
                part_res.extend_from_slice(&part.data);
                part_res
            })));

            let content_type = HeaderValue::from_static("application/octet-stream");
            let content_disposition = format!("attachment; filename=\"{}\"", filename);
            res.headers_mut().insert(CONTENT_TYPE, content_type);
            res.headers_mut().insert(hyper::header::CONTENT_DISPOSITION, HeaderValue::from_str(&content_disposition).unwrap());
            res.headers_mut().insert(hyper::header::TRANSFER_ENCODING, HeaderValue::from_static("chunked"));

            res
        });

    hyper::rt::spawn(f);

    Response::builder()
        .header(hyper::header::CONTENT_TYPE, HeaderValue::from_static("text/plain"))
        .body(Body::from("Upload received."))
        .unwrap()
}

struct Part {
    content_type: Vec<u8>,
    data: Vec<u8>,
}

enum MultipartState {
    Start,
    Headers(Vec<u8>),
    Data(Vec<u8>, usize),
    End(Vec<u8>),
}

fn handle_multipart_line(line: &[u8], boundary: &str, file: &mut File, state: MultipartState, parts_tx: &mut futures::channel::mpsc::UnboundedSender<Part>) -> MultipartState {
    match state {
        MultipartState::Start => {
            if line == boundary.as_bytes() {
                MultipartState::Headers(Vec::new())
            } else {
                MultipartState::Start
            }
        },
        MultipartState::Headers(headers) => {
            if line.is_empty() {
                MultipartState::Data(Vec::new(), 0)
            } else {
                headers.extend_from_slice(line);
                headers.extend_from_slice(b"\r\n");
                MultipartState::Headers(headers)
            }
        },
        MultipartState::Data(data, length) => {
            if line == boundary.as_bytes() {
                MultipartState::End(Vec::new())
            } else {
                let new_length = length + line.len() + 2;
                if new_length <= 1048576 {
                    data.extend_from_slice(line);
                    data.extend_from_slice(b"\r\n");
                    MultipartState::Data(data, new_length)
                } else {
                    let (left_slice, right_slice) = line.split_at(1048576 - length - 2);
                    data.extend_from_slice(left_slice);
                    data.extend_from_slice(b"\r\n");
                    let part = Part {
                        content_type: Vec::new(),
                        data: data,
                    };
                    parts_tx.unbounded_send(part).unwrap();
                    let new_data = Vec::from(&right_slice[2..]);
                    MultipartState::Data(new_data, right_slice.len() - 2)
                }
            }
        },
        MultipartState::End(_end_data) => {
            if line.is_empty() {
                let part = Part {
                    content_type: Vec::from("application/octet-stream".as_bytes()),
                    data: io::Read::by_ref(file).bytes().filter_map(|b| b.ok()).collect(),
                };
                parts_tx.unbounded_send(part).unwrap();
                MultipartState::Start
            } else {
                MultipartState::End(Vec::new())
            }
        },
    }
}
5418 chars
150 lines

This code defines a function handle_request which is the callback that gets executed every time a request hits the server. In this function first we check if the request method is POST. If it is not, we return a METHOD_NOT_ALLOWED response.

Then, we extract the boundary parameter from the Content-Type header, as this will be used to separate the parts of the multipart message.

We then create a new futures::channel::mpsc::UnboundedSender, which will be used to send the binary data of the file as the response.

Then, we create a File with uploaded_file.dat name, and then define a bunch of MultipartStates to track what part of the serialized message are we currently handling.

After first receiving the boundary content, depending on whether it matches the expected the expected boundary string or not, we enter MultipartState::Start or MultipartState::Headers.

In MultipartState::Headers, we continue accumulating headers until we hit a blank line.

MultipartState::Data accumulates all message body lines until the file size limit of 1MB, then publishes them with parts_tx.

MultipartState::End is used when a new part has started, we've arrived at the end of the file, and we accumulate headers for next part. Once we've received the blank line, we push the accumulated data to the parts channel.

Finally, we use the sender to send the file data as it's accumulated, calling parts_tx.unbounded_send in each iteration of the loop.

After having received all parts of the file, we construct the response based on the parts that have been received until now.

gistlibby LogSnag