Created
October 18, 2012 10:21
-
-
Save yakamoto69/3910888 to your computer and use it in GitHub Desktop.
Execute the Same Task Many Times as Much as Possible
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import com.twitter.concurrent.{NamedPoolThreadFactory, Broker} | |
| import com.twitter.util.{ExecutorServiceFuturePool, FuturePool} | |
| import java.util.concurrent.{Executors, TimeUnit} | |
| import annotation.tailrec | |
| object RepeatTask { | |
| /** | |
| * Executes the "task" many times as much as possible within given "repeatMilliSecs". | |
| * The "task" must be thread safe, since it is called simultaneously. | |
| */ | |
| def run(repeatMilliSecs: Long, task: => Unit): Int = { | |
| val b = new Broker[Unit] | |
| // copied from FuturePool.defaultPool | |
| // FuturePool.defaultPool does not fit this case, because a pool must be shutdown to wait until all running tasks finish. | |
| val pool = new ExecutorServiceFuturePool( | |
| Executors.newFixedThreadPool( | |
| scala.collection.parallel.availableProcessors, | |
| new NamedPoolThreadFactory("RepeatTask") | |
| ) | |
| ) | |
| val o = b.recv | |
| val maxRunningSize = scala.collection.parallel.availableProcessors | |
| val t = Timer.start(repeatMilliSecs) | |
| @tailrec | |
| def cycle(runningCnt: Int = 0, acc: Int = 0): Int = { | |
| if (t.isOver) return acc | |
| if (runningCnt < maxRunningSize) { | |
| pool(task).onSuccess { b ! _ } | |
| cycle(runningCnt + 1, acc + 1) | |
| } else { | |
| o.syncWait() | |
| cycle(runningCnt - 1, acc) | |
| } | |
| } | |
| val cnt = cycle() | |
| // wait until all tasks finish | |
| pool.executor.shutdown() | |
| assert(pool.executor.awaitTermination(1, TimeUnit.SECONDS)) | |
| cnt | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object Timer { | |
| def start(milliSecs: Long) = new Timer(milliSecs) | |
| } | |
| class Timer(milliSecs: Long) { | |
| val start = System.nanoTime() | |
| def isOver = System.nanoTime() - start > 1000000 * milliSecs | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment