Skip to content

Instantly share code, notes, and snippets.

@yakamoto69
Created October 18, 2012 10:21
Show Gist options
  • Select an option

  • Save yakamoto69/3910888 to your computer and use it in GitHub Desktop.

Select an option

Save yakamoto69/3910888 to your computer and use it in GitHub Desktop.
Execute the Same Task Many Times as Much as Possible
import com.twitter.concurrent.{NamedPoolThreadFactory, Broker}
import com.twitter.util.{ExecutorServiceFuturePool, FuturePool}
import java.util.concurrent.{Executors, TimeUnit}
import annotation.tailrec
object RepeatTask {
/**
* Executes the "task" many times as much as possible within given "repeatMilliSecs".
* The "task" must be thread safe, since it is called simultaneously.
*/
def run(repeatMilliSecs: Long, task: => Unit): Int = {
val b = new Broker[Unit]
// copied from FuturePool.defaultPool
// FuturePool.defaultPool does not fit this case, because a pool must be shutdown to wait until all running tasks finish.
val pool = new ExecutorServiceFuturePool(
Executors.newFixedThreadPool(
scala.collection.parallel.availableProcessors,
new NamedPoolThreadFactory("RepeatTask")
)
)
val o = b.recv
val maxRunningSize = scala.collection.parallel.availableProcessors
val t = Timer.start(repeatMilliSecs)
@tailrec
def cycle(runningCnt: Int = 0, acc: Int = 0): Int = {
if (t.isOver) return acc
if (runningCnt < maxRunningSize) {
pool(task).onSuccess { b ! _ }
cycle(runningCnt + 1, acc + 1)
} else {
o.syncWait()
cycle(runningCnt - 1, acc)
}
}
val cnt = cycle()
// wait until all tasks finish
pool.executor.shutdown()
assert(pool.executor.awaitTermination(1, TimeUnit.SECONDS))
cnt
}
}
object Timer {
def start(milliSecs: Long) = new Timer(milliSecs)
}
class Timer(milliSecs: Long) {
val start = System.nanoTime()
def isOver = System.nanoTime() - start > 1000000 * milliSecs
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment