Skip to content

Commit

Permalink
auto-read body for handler; add read_body fn for handler_func
Browse files Browse the repository at this point in the history
  • Loading branch information
rawhat committed Jul 1, 2022
1 parent 5e66231 commit a449adb
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 116 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2.0.0
- uses: erlef/setup-beam@v1.9.0
- uses: actions/checkout@v3.0.2
- uses: erlef/setup-beam@v1.11.2
with:
otp-version: "25.0"
gleam-version: "0.22.0"
gleam-version: "0.22.1"
- run: gleam format --check src test
- run: gleam deps download
- run: gleam test
30 changes: 25 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub fn main() {
"hello, world!":utf8,
>>))
},
max_body_limit: 4_000_000
)
erlang.sleep_forever()
}
Expand All @@ -43,22 +44,41 @@ pub fn main() {
assert Ok(_) =
serve(
8080,
http.handler_func(fn(req: Request(BitString)) {
case request.path_segments(req) {
["echo", "test"] ->
mhttp.handler_func(fn(req) {
case req.method, request.path_segments(req) {
http.Get, ["echo", "test"] ->
websocket.echo_handler
|> websocket.with_handler
|> websocket.on_init(on_init) // do something with the Sender
|> websocket.on_close(on_close) // same
|> Upgrade
["home"] ->
http.Post, ["echo", "body"] ->
req
|> mhttp.read_body
|> result.map(fn(req) {
response.new(200)
|> response.set_body(BitBuilderBody(bit_builder.from_bit_string(
req.body,
)))
|> response.prepend_header(
"content-type",
request.get_header(req, "content-type")
|> result.unwrap("application/octet-stream"),
)
})
|> result.unwrap(
response.new(400)
|> response.set_body(BitBuilderBody(bit_builder.new())),
)
|> Response
http.Get, ["home"] ->
response.new(200)
|> response.set_body(BitBuilderBody(bit_builder.from_bit_string(<<
"sup home boy":utf8,
>>)))
// NOTE: This is response from `mist/http`
|> Response
_ ->
_, _ ->
response.new(200)
|> response.set_body(BitBuilderBody(bit_builder.from_bit_string(<<
"Hello, world!":utf8,
Expand Down
10 changes: 7 additions & 3 deletions src/mist.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ import glisten/tcp
import mist/http.{State} as mhttp

/// Runs an HTTP Request->Response server at the given port, with your defined
/// handler.
/// handler. This will automatically read the full body contents up to the
/// specified `max_body_limit` in bytes. If you'd prefer to have finer-grain
/// control over this behavior, consider using `mist.serve`.
pub fn run_service(
port: Int,
handler: mhttp.Handler,
max_body_limit max_body_limit: Int,
) -> Result(Nil, glisten.StartError) {
handler
|> mhttp.handler
|> mhttp.handler(max_body_limit)
|> tcp.acceptor_pool_with_data(mhttp.new_state())
|> glisten.serve(port, _)
}

/// Slightly more flexible alternative to `run_service`. This allows hooking
/// into the `mist/http.{handler_func}` method.
/// into the `mist/http.{handler_func}` method. Note that the request body
/// will not be automatically read. You will need to call `http.read_body`.
pub fn serve(
port: Int,
handler: tcp.LoopFn(State),
Expand Down
1 change: 1 addition & 0 deletions src/mist/encoder.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub fn status_to_bit_string(status: Int) -> BitString {
403 -> <<"Forbidden":utf8>>
404 -> <<"Not Found":utf8>>
405 -> <<"Method Not Allowed":utf8>>
413 -> <<"Request Entity Too Large":utf8>>
500 -> <<"Internal Server Error":utf8>>
502 -> <<"Bad Gateway":utf8>>
503 -> <<"Service Unavailable":utf8>>
Expand Down
178 changes: 132 additions & 46 deletions src/mist/http.gleam
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import gleam/http
import gleam/http/request.{Request}
import gleam/http/response
import gleam/int
import gleam/list
import gleam/map.{Map}
import gleam/option.{None, Option, Some}
import gleam/otp/actor
import gleam/otp/process
import gleam/pair
import gleam/result
import gleam/string
import glisten/tcp.{LoopState, Socket}
Expand Down Expand Up @@ -41,12 +43,14 @@ pub type DecodedPacket {
}

pub type DecodeError {
MalformedRequest
InvalidMethod
InvalidPath
UnknownHeader
UnknownMethod
// TODO: better name?
InvalidBody
DiscardPacket
}

external fn decode_packet(
Expand Down Expand Up @@ -127,49 +131,60 @@ fn decode_atom(value: Dynamic) -> Result(Atom, List(dynamic.DecodeError)) {
pub fn parse_request(
bs: BitString,
socket: Socket,
) -> Result(request.Request(BitString), DecodeError) {
try BinaryData(req, rest) = decode_packet(HttpBin, bs, [])
assert HttpRequest(http_method, AbsPath(path), _version) = req

try method =
http_method
|> decode_atom
|> result.map(atom.to_string)
|> result.or(dynamic.string(http_method))
|> result.replace_error(Nil)
|> result.then(http.parse_method)
|> result.replace_error(UnknownMethod)

try #(headers, rest) = parse_headers(rest, socket, map.new())

try path =
path
|> bit_string.to_string
|> result.replace_error(InvalidPath)

let body_size =
headers
|> map.get("content-length")
|> result.then(int.parse)
|> result.unwrap(0)

let remaining = body_size - bit_string.byte_size(rest)
try body = case body_size, remaining {
0, 0 -> Ok(<<>>)
0, _n ->
// is this pipelining? check for GET?
Ok(rest)
_n, 0 -> Ok(rest)
_size, _rem -> read_data(socket, Buffer(remaining, rest), InvalidBody)
) -> Result(request.Request(Body), DecodeError) {
case decode_packet(HttpBin, bs, []) {
Ok(BinaryData(HttpRequest(http_method, AbsPath(path), _version), rest)) -> {
try method =
http_method
|> decode_atom
|> result.map(atom.to_string)
|> result.or(dynamic.string(http_method))
|> result.replace_error(Nil)
|> result.then(http.parse_method)
|> result.replace_error(UnknownMethod)
try #(headers, rest) = parse_headers(rest, socket, map.new())
try path =
path
|> bit_string.to_string
|> result.replace_error(InvalidPath)
let req =
request.new()
|> request.set_body(Unread(rest, socket))
|> request.set_method(method)
|> request.set_path(path)
Ok(request.Request(..req, headers: map.to_list(headers)))
}
_ -> Error(DiscardPacket)
}
}

let req =
request.new()
|> request.set_body(body)
|> request.set_method(method)
|> request.set_path(path)
pub opaque type Body {
Unread(rest: BitString, socket: Socket)
Read(data: BitString)
}

Ok(request.Request(..req, headers: map.to_list(headers)))
pub fn read_body(req: Request(Body)) -> Result(Request(BitString), DecodeError) {
case req.body {
Unread(rest, socket) -> {
let body_size =
req.headers
|> list.find(fn(tup) { pair.first(tup) == "content-length" })
|> result.map(pair.second)
|> result.then(int.parse)
|> result.unwrap(0)
let remaining = body_size - bit_string.byte_size(rest)
case body_size, remaining {
0, 0 -> Ok(<<>>)
0, _n -> Ok(rest)
// is this pipelining? check for GET?
_n, 0 -> Ok(rest)
_size, _rem -> read_data(socket, Buffer(remaining, rest), InvalidBody)
}
|> result.map(request.set_body(req, _))
|> result.replace_error(InvalidBody)
}
Read(_data) -> Error(InvalidBody)
}
}

pub type Handler =
Expand Down Expand Up @@ -207,15 +222,39 @@ pub type HandlerResponse {
}

pub type HandlerFunc =
fn(Request(BitString)) -> HandlerResponse
fn(Request(Body)) -> HandlerResponse

const stop_normal = actor.Stop(process.Normal)

/// Creates a standard HTTP handler service to pass to `mist.serve`
pub fn handler(handler: Handler) -> tcp.LoopFn(State) {
pub fn handler(handler: Handler, max_body_limit: Int) -> tcp.LoopFn(State) {
let bad_request =
response.new(400)
|> response.set_body(bit_builder.new())
handler_func(fn(req) {
req
|> handler
case request.get_header(req, "content-length") {
Ok("0") | Error(Nil) ->
req
|> request.set_body(<<>>)
|> handler
Ok(size) ->
size
|> int.parse
|> result.map(fn(size) {
case size > max_body_limit {
True ->
response.new(413)
|> response.set_body(bit_builder.new())
|> response.prepend_header("connection", "close")
False ->
req
|> read_body
|> result.map(handler)
|> result.unwrap(bad_request)
}
})
|> result.unwrap(bad_request)
}
|> response.map(BitBuilderBody)
|> Response
})
Expand Down Expand Up @@ -287,7 +326,16 @@ pub fn handler_func(handler: HandlerFunc) -> tcp.LoopFn(State) {
}
msg
|> parse_request(socket)
|> result.map_error(logger.error)
|> result.map_error(fn(err) {
case err {
DiscardPacket -> Nil
_ -> {
logger.error(err)
tcp.close(socket)
Nil
}
}
})
|> result.replace_error(stop_normal)
|> result.map(fn(req) {
case rescue(fn() { handler(req) }) {
Expand Down Expand Up @@ -346,7 +394,7 @@ pub fn handler_func(handler: HandlerFunc) -> tcp.LoopFn(State) {
|> result.unwrap_both
Ok(Upgrade(with_handler)) ->
req
|> websocket.upgrade(socket, _)
|> upgrade(socket, _)
|> result.map(fn(_nil) {
let _ = case with_handler.on_init {
Some(func) -> func(sender)
Expand Down Expand Up @@ -383,3 +431,41 @@ pub fn handler_func(handler: HandlerFunc) -> tcp.LoopFn(State) {
}
})
}

pub fn upgrade_socket(
req: Request(Body),
) -> Result(response.Response(BitBuilder), Request(Body)) {
try _upgrade =
request.get_header(req, "upgrade")
|> result.replace_error(req)
try key =
request.get_header(req, "sec-websocket-key")
|> result.replace_error(req)
try _version =
request.get_header(req, "sec-websocket-version")
|> result.replace_error(req)

let accept_key = websocket.parse_key(key)

response.new(101)
|> response.set_body(bit_builder.from_bit_string(<<"":utf8>>))
|> response.prepend_header("Upgrade", "websocket")
|> response.prepend_header("Connection", "Upgrade")
|> response.prepend_header("Sec-WebSocket-Accept", accept_key)
|> Ok
}

// TODO: improve this error type
pub fn upgrade(socket: Socket, req: Request(Body)) -> Result(Nil, Nil) {
try resp =
upgrade_socket(req)
|> result.replace_error(Nil)

try _sent =
resp
|> encoder.to_bit_builder
|> tcp.send(socket, _)
|> result.replace_error(Nil)

Ok(Nil)
}
Loading

0 comments on commit a449adb

Please sign in to comment.