Skip to content

Instantly share code, notes, and snippets.

@fancellu
Created December 3, 2025 17:08
Show Gist options
  • Select an option

  • Save fancellu/e51fadc133b5bd1b7097a20af91f7c65 to your computer and use it in GitHub Desktop.

Select an option

Save fancellu/e51fadc133b5bd1b7097a20af91f7c65 to your computer and use it in GitHub Desktop.
Example scio/beam usage inside of ZIO

ZIO Beam WordCount

A simple word count program built with ZIO and Apache Beam/Scio that demonstrates retry logic and error handling.

Features

  • Counts words in a text file using Apache Beam
  • Built with ZIO for functional effect management
  • Includes retry logic with fixed delays
  • Simulates random failures for testing resilience

Running the Program

From sbt terminal:

sbt "runMain beam.WordCountZIO --output=wc-output"

From IntelliJ:

Run the WordCountZIO object directly with program arguments: --output=wc-output

Program Arguments

  • --input=<path> - Input file path (defaults to src/main/resources/kinglear.txt)
  • --output=<path> - Output directory (defaults to wc-output)
  • --runner=DirectRunner - Beam runner (optional, DirectRunner is default)

Behavior

The program:

  1. Generates a random number and fails 90% of the time to demonstrate retry logic
  2. Retries up to 10 times with 1-second fixed delays
  3. Processes the input text file and counts word occurrences
  4. Outputs results to numbered part files in the specified directory

Output

Results are written as text files in the output directory with format: word: count

0.32632995
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.8723043
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.89009273
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.4717887
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.63323087
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.7908458
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.7886189
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.6163985
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.85410094
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.64915174
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.8425201
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
Task failed after all retries: Random failure occurred
timestamp=2025-12-03T16:35:28.393615Z level=ERROR thread=#zio-fiber-1804108919 message="" cause="java.lang.RuntimeException: Random failure occurred
at beam.WordCountZIO$.$anonfun$wordCount$9(WordCountZIO.scala:20)
at zio.ZIO$.$anonfun$fail$1(ZIO.scala:3252)
at zio.ZIO$.$anonfun$failCause$1(ZIO.scala:3263)
at beam.WordCountZIO.wordCount(WordCountZIO.scala:20)
at beam.WordCountZIO.wordCount(WordCountZIO.scala:19)
at beam.WordCountZIO.run(WordCountZIO.scala:10)
at beam.WordCountZIO.run(WordCountZIO.scala:11)
at beam.WordCountZIO.run(WordCountZIO.scala:12)
0.7614539
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.88121456
Should fail!
Failed, retrying because java.lang.RuntimeException: Random failure occurred ...
0.940289
Should work now
package beam
import com.spotify.scio._
import zio.Console.printLine
import zio._
object WordCountZIO extends ZIOAppDefault {
def run =
wordCount.tapError(th => printLine(s"Failed, retrying because $th ..."))
.retry(Schedule.recurs(10).delayed(_ => 100.millis))
.tapError(th => printLine(s"Task failed after all retries: ${th.getMessage}"))
// of course, we could make this more sophisticated with use of andThen/linear/jitered/fibonacci etc
// if this were not a toy program
private val wordCount = {
for {
randomFloat <- Random.nextFloat
_ <- printLine(randomFloat)
shouldFailBool = randomFloat < 0.9f
_ <- ZIO.when(shouldFailBool)(printLine("Should fail!")
*> ZIO.fail(new RuntimeException("Random failure occurred")))
_ <- printLine("Should work now")
rawArgs <- getArgs
result <- ZIO.attempt(ContextAndArgs(rawArgs.toArray))
(sc, args) = result
exampleData = "src/main/resources/kinglear.txt"
input = args.getOrElse("input", exampleData)
output = args.getOrElse("output", "wc-output")
_ <-
ZIO.attempt(sc.textFile(input)
.map(_.trim)
.flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty))
.countByValue
.map(t => t._1 + ": " + t._2)
.saveAsTextFile(output))
scioResult <- ZIO.attempt(sc.run().waitUntilFinish())
} yield scioResult
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment