-
-
Save sgodbillon/4250672 to your computer and use it in GitHub Desktop.
| package bugs | |
| import play.api.libs.iteratee._ | |
| import scala.util.Failure | |
| import scala.util.Success | |
| import scala.concurrent.Future | |
| import scala.concurrent.Promise | |
| object StackOverflowErrorBug { | |
| import scala.concurrent.ExecutionContext.Implicits.global | |
| trait Cursor { | |
| def iterator :Iterator[String] | |
| def hasNext :Boolean | |
| def next :Future[Cursor] | |
| def i: Int = 0 | |
| def n(cursor: Cursor) = | |
| if(cursor.iterator.hasNext) { | |
| Future(Some((cursor,Some(cursor.iterator.next)))) | |
| } else if (cursor.hasNext) { | |
| val fut = //cursor.next.map(c => Some((c,None))) | |
| Future(Some(DefaultCursor(cursor.i + 1) -> None)) | |
| print(fut + ",") | |
| fut | |
| } else { | |
| Future(None) | |
| } | |
| def enumerate = { | |
| CustomEnumerator.unfoldM(this) { cursor => | |
| n(cursor) | |
| }.andThen(Enumerator.eof).onDoneEnumerating{ | |
| println("done") | |
| } &> Enumeratee.collect { | |
| case Some(e) => e | |
| } | |
| } | |
| object CustomEnumerator { | |
| def unfoldM[S,E](s:S)(f: S => Future[Option[(S,E)]] ): Enumerator[E] = checkContinue1(s)(new TreatCont1[E,S]{ | |
| def apply[A](loop: (Iteratee[E,A],S) => Future[Iteratee[E,A]], s:S, k: Input[E] => Iteratee[E,A]):Future[Iteratee[E,A]] = f(s).flatMap { | |
| case Some((newS,e)) => { | |
| // if we don't create this intermediate promise, then a stackoverflowerror is eventually thrown | |
| // original code -> | |
| // loop(k(Input.El(e)),newS) | |
| // <- original code | |
| val promise = Promise[play.api.libs.iteratee.Iteratee[E,A]]() | |
| loop(k(Input.El(e)),newS).onComplete { | |
| case Success(s) => | |
| promise.success(s) | |
| case Failure(f) => | |
| promise.failure(f) | |
| } | |
| promise.future | |
| } | |
| case None => Future(Cont(k)) | |
| } | |
| }) | |
| trait TreatCont1[E,S]{ | |
| def apply[A](loop: (Iteratee[E,A],S) => Future[Iteratee[E,A]], s:S, k: Input[E] => Iteratee[E,A]):Future[Iteratee[E,A]] | |
| } | |
| def checkContinue1[E,S](s:S)(inner:TreatCont1[E,S]) = new Enumerator[E] { | |
| def apply[A](it: Iteratee[E, A]): Future[Iteratee[E, A]] = { | |
| def step(it: Iteratee[E, A], state:S): Future[Iteratee[E,A]] = it.fold{ | |
| case Step.Done(a, e) => Future(Done(a,e)) | |
| case Step.Cont(k) => inner[A](step,state,k) | |
| case Step.Error(msg, e) => Future(Error(msg,e)) | |
| } | |
| step(it,s) | |
| } | |
| } | |
| def loop(cursor: Cursor) :Future[Option[(Option[String], Cursor)]] = { | |
| if(cursor.iterator.hasNext) | |
| Future(Some(Some(cursor.iterator.next) -> cursor)) | |
| else if(cursor.hasNext) | |
| cursor.next.map(c => Some(None -> c)) | |
| else Future(None) | |
| } | |
| } | |
| } | |
| case class DefaultCursor(override val i: Int) extends Cursor { | |
| val iterator = { | |
| val r = (for(j <- 0 to 1) yield i + "" + j) | |
| r.toIterator | |
| } | |
| def hasNext = i < 5000 | |
| def next = { | |
| Future(DefaultCursor(i + 1)) | |
| } | |
| } | |
| case class FlattenedCursor(cursor: Future[Cursor]) extends Cursor { | |
| val iterator = Iterator.empty | |
| def hasNext = true | |
| def next = cursor | |
| } | |
| // should print "done: <some result>" at the end | |
| def test = { | |
| val enumerator = FlattenedCursor(Future(DefaultCursor(0))).enumerate | |
| val fut = enumerator.apply(Iteratee.foreach({ e => | |
| //println(e) | |
| })) | |
| val ff = Iteratee.flatten(fut).run | |
| ff.onComplete { | |
| case e => | |
| println("done: " + e) | |
| } | |
| } | |
| } |
| name := "TestIteratees" | |
| version := "1.0" | |
| scalaVersion := "2.10.0-RC1" | |
| resolvers += "Typesafe repository snapshots" at "http://repo.typesafe.com/typesafe/snapshots/" | |
| resolvers += "Typesafe repository releases" at "http://repo.typesafe.com/typesafe/releases/" | |
| libraryDependencies ++= Seq( | |
| "play" % "play-iteratees_2.10" % "2.1-RC1" | |
| ) |
Tried it, sadly, completeWith also fails :/
does this count as a valid (but minimal) test?
Enumerator.unfoldM(0){ (i => Future(Option((i+1,i)).filterNot(_ => i > 5000 )))} |>>> Iteratee.getChunks
not minimal enough? here might be a smaller one:
def tata(f:Future[Int]):Future[Int] = f.flatMap(i => if (i < 0) tata(Future(i+1)) else Future(i))
// small enough number, -750 is enough in the console
tata(-1000)
java.lang.IllegalStateException: Promise already completed.
at scala.concurrent.Promise$class.complete(Promise.scala:55)
at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:58)
at scala.concurrent.Promise$class.failure(Promise.scala:107)
at scala.concurrent.impl.Promise$DefaultPromise.failure(Promise.scala:58)
at scala.concurrent.Future$$anonfun$flatMap$1.liftedTree3$1(Future.scala:283)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:277)
at scala.concurrent.Future$$anonfun$flatMap$1.apply(Future.scala:274)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:29)
at scala.concurrent.forkjoin.ForkJoinTask$AdaptedRunnableAction.exec(ForkJoinTask.java:1417)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:915)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:980)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
You're right :)
You can most likely replace the following:
val promise = Promiseplay.api.libs.iteratee.Iteratee[E,A]
loop(k(Input.El(e)),newS).onComplete {
case Success(s) =>
promise.success(s)
case Failure(f) =>
promise.failure(f)
}
promise.future
with