Skip to content

Commit

Permalink
api: Pure netty HTTP server. I hope @mpenet can forgive me.
Browse files Browse the repository at this point in the history
  • Loading branch information
pyr committed Oct 16, 2015
1 parent 5ccaf42 commit 8af6055
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 5 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
[spootnik/uncaught "0.5.2"]
[spootnik/globber "0.4.1"]
[instaparse "1.4.1"]
[cheshire "5.5.0"]
[metrics-clojure "2.5.1"]
[ring/ring-codec "1.0.0"]
[clj-yaml "0.4.0"]
[cc.qbits/jet "0.6.6"]
[cc.qbits/alia "2.7.2"]
[com.boundary/high-scale-lib "1.0.6"]
[net.jpountz.lz4/lz4 "1.3"]
Expand Down
8 changes: 4 additions & 4 deletions src/io/cyanite/api.clj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
[io.cyanite.index :as index]
[io.cyanite.store :as store]
[io.cyanite.query :as query]
[qbits.jet.server :refer [run-jetty]]
[io.cyanite.http :as http]
[io.cyanite.utils :refer [nbhm assoc-if-absent! now!]]
[clojure.tools.logging :refer [info debug error]]
[clojure.string :refer [lower-case blank?]]))
Expand Down Expand Up @@ -128,9 +128,9 @@
(if (:disabled options)
this
(let [handler (make-handler store index engine)
server (run-jetty (assoc options :ring-handler handler :join? false))]
server (http/run-server options handler)]
(assoc this :server server))))
(stop [this]
(when server
(.stop server))
(when (fn? server)
(server))
(assoc this :server nil)))
139 changes: 139 additions & 0 deletions src/io/cyanite/http.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
(ns io.cyanite.http
"Small wrapper around netty for HTTP servers"
(:require [com.stuartsierra.component :as component]
[clojure.tools.logging :refer [error info]])
(:import io.netty.channel.ChannelHandlerContext
io.netty.channel.ChannelHandlerAdapter
io.netty.channel.ChannelInboundHandlerAdapter
io.netty.channel.ChannelOutboundHandlerAdapter
io.netty.channel.ChannelHandler
io.netty.channel.ChannelOption
io.netty.channel.ChannelInitializer
io.netty.channel.ChannelFutureListener
io.netty.channel.nio.NioEventLoopGroup
io.netty.channel.socket.nio.NioServerSocketChannel
io.netty.channel.epoll.Epoll
io.netty.channel.epoll.EpollServerSocketChannel
io.netty.channel.epoll.EpollEventLoopGroup
io.netty.handler.logging.LoggingHandler
io.netty.handler.logging.LogLevel
io.netty.handler.codec.http.FullHttpRequest
io.netty.handler.codec.http.HttpServerCodec
io.netty.handler.codec.http.HttpMethod
io.netty.handler.codec.http.HttpHeaders
io.netty.handler.codec.http.HttpResponseStatus
io.netty.handler.codec.http.DefaultFullHttpResponse
io.netty.handler.codec.http.HttpVersion
io.netty.handler.codec.http.HttpObjectAggregator
io.netty.bootstrap.ServerBootstrap
io.netty.buffer.Unpooled
java.nio.charset.Charset))

(defn epoll?
"Find out if epoll is available on the underlying platform."
[]
(Epoll/isAvailable))

(defn bb->string
"Convert a ByteBuf to a UTF-8 String."
[bb]
(.toString bb (Charset/forName "UTF-8")))

(def method->data
"Yield a keyword representing an HTTP method."
{HttpMethod/CONNECT :connect
HttpMethod/DELETE :delete
HttpMethod/GET :get
HttpMethod/HEAD :head
HttpMethod/OPTIONS :options
HttpMethod/PATCH :patch
HttpMethod/POST :post
HttpMethod/PUT :put
HttpMethod/TRACE :trace})

(defn headers
"Get a map out of netty headers."
[^HttpHeaders headers]
(into
{}
(for [[^String k ^String v] (-> headers .entries seq)]
[(-> k .toLowerCase keyword) v])))

(defn data->response
"Create a netty full http response from a map."
[{:keys [status body headers]} version]
(let [resp (DefaultFullHttpResponse.
version
(HttpResponseStatus/valueOf (int status))
(Unpooled/wrappedBuffer (.getBytes body)))
hmap (.headers resp)]
(doseq [[k v] headers]
(.set hmap (name k) v))
resp))

(defn request-handler
"Capture context and msg and yield a closure
which generates a response.
The closure may be called at once or submitted to a pool."
[f ^ChannelHandlerContext ctx ^FullHttpRequest msg]
(fn []
(let [req {:uri (.getUri msg)
:request-method (method->data (.getMethod msg))
:version (-> msg .getProtocolVersion .text)
:headers (headers (.headers msg))
:body (bb->string (.content msg))}
resp (data->response (f req) (.getProtocolVersion msg))]
(-> (.writeAndFlush ctx resp)
(.addListener ChannelFutureListener/CLOSE)))))

(defn netty-handler
"Simple netty-handler, everything may happen in
channel read, since we're expecting a full http request."
[f]
(proxy [ChannelInboundHandlerAdapter] []
(exceptionCaught [^ChannelHandlerContext ctx e]
(error e "http server exception caught"))
(channelRead [^ChannelHandlerContext ctx ^FullHttpRequest msg]
(let [callback (request-handler f ctx msg)]
(callback)))))

(defn initializer
"Our channel initializer."
[handler]
(proxy [ChannelInitializer] []
(initChannel [channel]
(let [pipeline (.pipeline channel)]
(.addLast pipeline "codec" (HttpServerCodec.))
(.addLast pipeline "aggregator" (HttpObjectAggregator. 1048576))
(.addLast pipeline "handler" (netty-handler handler))))))

(defn run-server
"Prepare a bootstrap channel and start it."
([options handler]
(run-server (assoc options :ring-handler handler)))
([options]
(let [thread-count (or (:loop-thread-count options) 1)
boss-group (if (and (epoll?) (not (:disable-epoll options)))
(EpollEventLoopGroup. thread-count)
(NioEventLoopGroup. thread-count))
so-backlog (int (or (:so-backlog options) 1024))]
(try
(let [bootstrap (doto (ServerBootstrap.)
(.option ChannelOption/SO_BACKLOG so-backlog)
(.group boss-group)
(.channel (if (epoll?)
EpollServerSocketChannel
NioServerSocketChannel))
(.handler (LoggingHandler. LogLevel/INFO))
(.childHandler (initializer (:ring-handler options))))
channel (-> bootstrap
(.bind ^String (or (:host options) "127.0.0.1")
(int (or (:port options) 8080)))
(.sync)
(.channel))]
(future
(-> channel .closeFuture .sync))
(fn []
(.close channel)
(.shutdownGracefully boss-group)))))))

0 comments on commit 8af6055

Please sign in to comment.