Skip to content

Instantly share code, notes, and snippets.

@sgobotta
Created March 16, 2025 20:15
Show Gist options
  • Select an option

  • Save sgobotta/c379205a6d139ab88e870923c259dddc to your computer and use it in GitHub Desktop.

Select an option

Save sgobotta/c379205a6d139ab88e870923c259dddc to your computer and use it in GitHub Desktop.
Calculating an average score using streams
defmodule AverageScore do
# @file_path "tiny-students.json"
# @file_path "students.json"
@file_path "big-students.json"
require Logger
def students, do: @students
def average_score(students) do
students
# Process the students list and discard invalid scores (scores that are not
# integers or are negative integers)
|> Task.async_stream(
fn student ->
case student do
%{name: nil} ->
{:invalid, 0}
%{score: score} when is_integer(score) and score >= 0 and score <= 100 ->
{:valid, score}
_invalid_score ->
{:invalid, 0}
end
end,
ordered: false
)
|> Enum.to_list()
# Here we filter out invalid scores
|> Stream.filter(fn {:ok, {score_status, _score}} -> score_status == :valid end)
|> Stream.map(fn {:ok, {:valid, score}} -> score end)
# Now we'll reduce valid results into the average.
|> Stream.transform(
# This start function returns an initial accumulator
fn -> {0, 0} end,
# This reducer accumulates scores and students count
fn score, {scores_sum, students_count} ->
new_state = {scores_sum + score, students_count + 1}
{[], new_state} # No emissions, just accumulate state
end,
# Run `last_fun` callback, processing the stream result to get
# the average
fn
# No students to process, returning 0
{_scores_sum, 0} ->
{[0], nil}
# Get the average score
{sum, count} ->
result = if count > 0, do: sum / count, else: nil
# Missing return
# Got it working by using `Stream.transform/5` instead of Stream.transform/4
{[result], nil}
end,
# Finally we run the after_fun callback, nothing to emit.
#
fn _acc ->
:done
end)
|> Enum.at(0)
end
def read_file do
with {:ok, body} <- File.read(@file_path),
{:ok, json} <- Jason.decode(body, keys: :atoms), do: {:ok, json}
end
def read_file_stream do
File.stream!(@file_path)
end
def process_average_scores do
{time_in_microseconds, result} = :timer.tc(fn ->
read_file_stream()
|> Jaxon.Stream.from_enumerable()
|> Jaxon.Stream.query([:root, :all])
|> Task.async_stream(
fn student ->
case student do
%{"name" => nil} ->
{:invalid, 0}
%{"score" => score} when is_integer(score) and score >= 0 and score <= 100 ->
{:valid, score}
_invalid_score ->
{:invalid, 0}
end
end,
ordered: false,
max_concurrency: System.schedulers_online()
)
# Here we filter out invalid scores
|> Stream.filter(fn {:ok, {score_status, _score}} -> score_status == :valid end)
|> Stream.map(fn {:ok, {:valid, score}} -> score end)
# Now we'll reduce valid results into the average.
|> Stream.transform(
# This start function returns an initial accumulator
fn -> {0, 0} end,
# This reducer accumulates scores and students count
fn score, {scores_sum, students_count} ->
new_state = {scores_sum + score, students_count + 1}
{[], new_state} # No emissions, just accumulate state
end,
# Run `last_fun` callback, processing the stream result to get
# the average
fn
# No students to process, returning 0
{_scores_sum, 0} ->
{[0], nil}
# Get the average score
{sum, count} ->
result = if count > 0, do: sum / count, else: nil
# Missing return
# Got it working by using `Stream.transform/5` instead of Stream.transform/4
{[result], nil}
end,
# Finally we run the after_fun callback, nothing to emit.
#
fn _acc ->
:done
end)
|> Enum.at(0)
end)
time_in_milliseconds = time_in_microseconds / 1_000
Logger.info("Execution time: #{time_in_milliseconds} ms")
result
end
def process_average_scores_parallel do
chunk_size = calculate_chunk_size()
{time_in_microseconds, result} = :timer.tc(fn ->
read_file_stream()
|> Jaxon.Stream.from_enumerable()
|> Jaxon.Stream.query([:root, :all])
|> Stream.chunk_every(chunk_size) # Dynamic chunk size
|> Task.async_stream(&process_chunk/1, ordered: false)
|> Stream.map(fn {:ok, {sum, count}} -> {sum, count} end)
|> Enum.reduce({0, 0}, fn {chunk_sum, chunk_count}, {total_sum, total_count} ->
{total_sum + chunk_sum, total_count + chunk_count}
end)
|> case do
{_, 0} -> 0
{sum, count} -> sum / count
end
end)
time_in_milliseconds = time_in_microseconds / 1_000
Logger.info("Execution time: #{time_in_milliseconds} ms")
result
end
# Helper function to process each chunk
defp process_chunk(chunk) do
Enum.reduce(chunk, {0, 0}, fn student, {sum, count} ->
case student do
%{"name" => nil} -> {sum, count}
%{"score" => score} when is_integer(score) and score >= 0 and score <= 100 ->
{sum + score, count + 1}
_ -> {sum, count}
end
end)
end
# Dynamically calculate the chunk size based on dataset size
defp calculate_chunk_size do
total_records = count_total_records()
cond do
total_records <= 100_000 -> 500
total_records <= 1_000_000 -> 1000
total_records <= 10_000_000 -> 5000
true -> 10_000
end
end
# Mock function to count records (replace this with real logic)
defp count_total_records do
10_000_000 # For example purposes
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment