Channels in Clojure
Thu, Jan 21, 2010I have been kicking the tires on Google’s Go. I was especially impressed with its RPC framework example. I wanted to implement a similar framework in Clojure.
Let’s start with the client interface:
(ns rpc-example
(import [java.util.concurrent BlockingQueue
LinkedBlockingQueue]))
(defn rpc [#^BlockingQueue q f & args]
"Performs a blocking call using the given queue."
(let [result (promise)]
(.put q {:fn f :args args :result result})
@result))
While Clojure does not currently provide anything like Go’s channels,
Java’s BlockingQueue
is close enough1. A request is simply a
map of a function, a list of arguments, and a promise2. This
map is put on the queue for a waiting handler.
The handler
function takes requests off the queue, processes them,
and delivers the result:
(defn handler [#^BlockingQueue q]
"Repeatedly processes requests on the given queue."
(doseq [req (repeatedly #(.take q))]
(let [result (apply (:fn req) (:args req))]
(deliver (:result req) result))))
Finally, I define some functions to manage the execution of handlers.
Like the Go example, I want the ability to run handlers concurrently.
For that, I use future
3.
(defn serve
"Create n handlers for the given queue."
[#^BlockingQueue q n]
(doall (for [_ (range n)]
(future (handler q)))))
(defn quit [server]
"Cancel all handlers for the given server."
(doall (map future-cancel server)))
Putting the pieces together:
(def q (LinkedBlockingQueue. 10))
; #'rpc-example/q
(def rpc-call (future (rpc q + 1 2 3)))
; #'rpc-example/rpc-call
rpc-call
; #<Object$IDeref$Future$2b9be1f9@13c4c09: :pending>
(def server (serve q 3))
; #'rpc-example/server
rpc-call
; #<Object$IDeref$Future$2b9be1f9@13c4c09: 6>
(quit server)
; (true true true)
PS. An asynchronous client interface:
(defn rpc-async [#^BlockingQueue q callback f & args]
(future (callback (apply rpc q f args))))
-
Like channels,
BlockingQueue
provides both blocking (put
andtake
) and non-blocking (offer
andpoll
) queue operations. However, unlike Java’spoll
that usesnull
to signal an empty queue, Go’s channels signal receive failures out-of-band, thus allowing them to transmit nil values. ↩︎ -
Introduced in Clojure’s 1.1 release, a promise is a dataflow variable: it can be bound to a value once, and computations that rely on its value block until the value is provided. ↩︎
-
Also a Clojure 1.1 feature, a future represents an asynchronous computation. The body is executed on another thread. Once created, requests for the result of the computation will block. However, in this case the computation is infinite and yields no value, so I use
future-cancel
to terminate it. ↩︎