Skip to content

Commit

Permalink
let's delete the callbacks... ?
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Hamblen committed Feb 6, 2011
1 parent 960912e commit 49f94f8
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 33 deletions.
4 changes: 2 additions & 2 deletions futures/src/main/scala/Futures.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object ActorsFuture extends Futures {
def future[T](result: => T) = scala.actors.Futures.future(result)
}

trait AbortableFuture[T] extends (() => T) {
trait StoppableFuture[T] extends (() => T) {
def isSet: Boolean
def abort()
def stop()
}
6 changes: 3 additions & 3 deletions http/src/main/scala/dispatch/Threads.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ trait ThreadSafety { self: BlockingHttp =>
}

trait ThreadFuture extends ThreadSafety { self: BlockingHttp =>
type HttpPackage[T] = dispatch.futures.AbortableFuture[T]
def pack[T](request: HttpRequestBase, result: => T) = new AbortableFuture[T] {
type HttpPackage[T] = dispatch.futures.StoppableFuture[T]
def pack[T](request: HttpRequestBase, result: => T) = new StoppableFuture[T] {
val delegate = DefaultFuture.future(result)
def apply() = delegate.apply()
def isSet = delegate.isSet
def abort() = request.abort()
def stop() = request.abort()
}
}

Expand Down
89 changes: 61 additions & 28 deletions nio/src/main/scala/nio.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import org.apache.http.client.methods._
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.nio.{ContentDecoder,IOControl,NHttpConnection}
import org.apache.http.nio.client.HttpAsyncResponseConsumer
import org.apache.http.nio.entity.ConsumingNHttpEntity
import org.apache.http.impl.nio.client.BasicHttpAsyncResponseConsumer
import org.apache.http.nio.concurrent.FutureCallback
import java.net.InetSocketAddress
import java.io.IOException
Expand All @@ -19,27 +20,61 @@ object Http {
}

class Http extends dispatch.HttpExecutor {
val client = new DefaultHttpAsyncClient()
val client = new DefaultHttpAsyncClient
client.start()

type HttpPackage[T] = dispatch.futures.AbortableFuture[T]
type HttpPackage[T] = dispatch.futures.StoppableFuture[T]

class StoppableConsumer extends BasicHttpAsyncResponseConsumer {
@volatile private var stopped = false
final override def consumeContent(decoder: ContentDecoder, ioctrl: IOControl) {
if (stopped) {
ioctrl.shutdown()
cancel()
}
else consume(decoder, ioctrl)
}
def consume(decoder: ContentDecoder, ioctrl: IOControl) {
super.consumeContent(decoder, ioctrl)
}
def stop() { stopped = true }
}

class ConsumerFuture(
underlying: Future,
consumer: StoppableConsumer
) extends dispatch.futures.StoppableFuture[T] {
def apply() = { fut.get(); result.get }
def isSet = fut.isDone
def stop() = {
consumer.stop()
fut.cancel(true)
}
}

def execute[T](host: HttpHost, credsopt: Option[dispatch.Credentials],
req: HttpRequestBase, block: HttpResponse => T) = {
credsopt.map { creds =>
error("todo")
} getOrElse {
var result: Option[T] = None
val fut = client.execute(host, req, new FutureCallback[HttpResponse] {
def cancelled() { }
def completed(res: HttpResponse) { result = Some(block(res)) }
def failed(ex: Exception) { ex.printStackTrace() }
})
new dispatch.futures.AbortableFuture[T] {
def apply() = { fut.get(); result.get }
def isSet = fut.isDone
def abort() = req.abort()
}
val consumer = new StoppableConsumer
val context = new BasicHttpContext
val fut = client.execute(
new Producer(host, req),
consumer,
new FutureCallback[HttpResponse] {
var isCancelled = false
def cancelled() {
isCancelled = true
}
def completed(res: HttpResponse) {
result = Some(block(res))
}
def failed(ex: Exception) { ex.printStackTrace() }
}
)
new WrappedFuture
}
}

Expand All @@ -49,31 +84,29 @@ class Http extends dispatch.HttpExecutor {
error("todo")
} getOrElse {
val ioc = DecodingCallback(callback)
var response: Option[HttpResponse] = None
var result: Option[T] = None
val fut = client.execute(new Producer(host, req),
new HttpAsyncResponseConsumer[HttpResponse] {
def cancel() { }
def responseReceived(res: HttpResponse) {
val consumer = new StoppableConsumer {
override def responseReceived(res: HttpResponse) {
response = Some(res)
}
def consumeContent(decoder: ContentDecoder, ioctrl: IOControl) {
override def consume(decoder: ContentDecoder, ioctrl: IOControl) {
ioc.with_decoder(response.get, decoder)
}
def failed(ex: Exception) { ex.printStackTrace() }
def responseCompleted() {
result = Some(ioc.callback.finish(response.get))
}
def getResult() = response.get
}, new FutureCallback[HttpResponse] {
}

val fut = client.execute(new Producer(host, req),
consumer,
new FutureCallback[HttpResponse] {
def cancelled() { }
def completed(res: HttpResponse) { }
def completed(res: HttpResponse) {
result = Some(ioc.callback.finish(response))
}
def failed(ex: Exception) { ex.printStackTrace() }
})
new dispatch.futures.AbortableFuture[T] {
new dispatch.futures.StoppableFuture[T] {
def apply() = { fut.get(); result.get }
def isSet = fut.isDone
def abort() = req.abort()
def stop() = { consumer.stop(); fut.cancel(true) }
}
}
}
Expand Down

0 comments on commit 49f94f8

Please sign in to comment.