Skip to content

Instantly share code, notes, and snippets.

@mark-jay
Created August 29, 2020 08:50
Show Gist options
  • Select an option

  • Save mark-jay/dfc4895375b2677fc89853e0a7edcf5e to your computer and use it in GitHub Desktop.

Select an option

Save mark-jay/dfc4895375b2677fc89853e0a7edcf5e to your computer and use it in GitHub Desktop.
import java.util.Date
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.concurrent.atomic.AtomicLong
import scala.util.{Failure, Success, Try}
case class SimpleLimiterQueueItem(weight: Long, date: Date)
case class SimpleLimiter(maxInvocation: Long, perPeriodMs: Long) extends Limiter {
private val queue = new ConcurrentLinkedDeque[SimpleLimiterQueueItem]()
private val size = new AtomicLong()
private def removeLatestIfPossible(currentTime: Date): Boolean = {
Try { queue.getLast } match {
case Success(last) => {
if (currentTime.getTime - last.date.getTime >= perPeriodMs) {
println(s"removing ${last}")
if (queue.removeLastOccurrence(last)) {
var s = size.get()
while (!size.compareAndSet(s, s - last.weight)) {
s = size.get()
}
}
return true
}
false
}
case Failure(noSuchElementException: NoSuchElementException) => {
false
}
}
}
private def cleanQueue(currentTime: Date) = {
while (removeLatestIfPossible(currentTime)) {}
}
def tryLimit(weight: Long, currentTime: Date): Boolean = {
cleanQueue(currentTime)
val s = size.get()
if (s + weight <= maxInvocation) {
if (size.compareAndSet(s, s + weight)) {
val item = SimpleLimiterQueueItem(weight, currentTime)
println(s"inserting ${item}, new size = ${size.get()}, s = ${s}")
queue.addFirst(item)
return true
}
}
false
}
override def tryLimit(weight: Long): Boolean = {
tryLimit(weight, new Date())
}
}
import java.util
import java.util.Date
import org.specs2.mutable.Specification
import scala.collection.{JavaConversions, JavaConverters}
class SimpleLimiterTest extends Specification {
val oneSecond = 1000
"SimpleLimiterTest" should {
"tryLimit" in {
val service = SimpleLimiter(10, oneSecond)
service.tryLimit(1, new Date(0L)) === true
service.tryLimit(9, new Date(1L)) === true
service.tryLimit(1, new Date(1L)) === false
service.tryLimit(1, new Date(999L)) === false
service.tryLimit(1, new Date(1000L)) === true
service.tryLimit(1, new Date(1000L)) === false
service.tryLimit(5, new Date(1001L)) === true
service.tryLimit(4, new Date(1001L)) === true
service.tryLimit(1, new Date(1001L)) === false
ok
}
"tryLimit multiple threads" in {
val maxInvocations = 1000
val service = SimpleLimiter(maxInvocations, oneSecond)
val list = new util.ArrayList[Long](Runtime.getRuntime.availableProcessors())
(1 to Runtime.getRuntime.availableProcessors()).foreach(i => list.add(0L))
(1 to Runtime.getRuntime.availableProcessors()).map(i => {
val thread = new Thread() {
override def run(): Unit = {
var local = 0L
while (service.tryLimit(i, new Date(0L))) {
local += i
}
list.set(i - 1, local)
}
}
thread.start()
thread
}).foreach(_.join())
// JavaConverters.asScalaBuffer(list).toList.sum should be lessThan maxInvocations
JavaConverters.asScalaBuffer(list).toList.sum === maxInvocations
ok
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment