Created
March 16, 2025 20:15
-
-
Save sgobotta/c379205a6d139ab88e870923c259dddc to your computer and use it in GitHub Desktop.
Calculating an average score using streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| defmodule AverageScore do | |
| # @file_path "tiny-students.json" | |
| # @file_path "students.json" | |
| @file_path "big-students.json" | |
| require Logger | |
| def students, do: @students | |
| def average_score(students) do | |
| students | |
| # Process the students list and discard invalid scores (scores that are not | |
| # integers or are negative integers) | |
| |> Task.async_stream( | |
| fn student -> | |
| case student do | |
| %{name: nil} -> | |
| {:invalid, 0} | |
| %{score: score} when is_integer(score) and score >= 0 and score <= 100 -> | |
| {:valid, score} | |
| _invalid_score -> | |
| {:invalid, 0} | |
| end | |
| end, | |
| ordered: false | |
| ) | |
| |> Enum.to_list() | |
| # Here we filter out invalid scores | |
| |> Stream.filter(fn {:ok, {score_status, _score}} -> score_status == :valid end) | |
| |> Stream.map(fn {:ok, {:valid, score}} -> score end) | |
| # Now we'll reduce valid results into the average. | |
| |> Stream.transform( | |
| # This start function returns an initial accumulator | |
| fn -> {0, 0} end, | |
| # This reducer accumulates scores and students count | |
| fn score, {scores_sum, students_count} -> | |
| new_state = {scores_sum + score, students_count + 1} | |
| {[], new_state} # No emissions, just accumulate state | |
| end, | |
| # Run `last_fun` callback, processing the stream result to get | |
| # the average | |
| fn | |
| # No students to process, returning 0 | |
| {_scores_sum, 0} -> | |
| {[0], nil} | |
| # Get the average score | |
| {sum, count} -> | |
| result = if count > 0, do: sum / count, else: nil | |
| # Missing return | |
| # Got it working by using `Stream.transform/5` instead of Stream.transform/4 | |
| {[result], nil} | |
| end, | |
| # Finally we run the after_fun callback, nothing to emit. | |
| # | |
| fn _acc -> | |
| :done | |
| end) | |
| |> Enum.at(0) | |
| end | |
| def read_file do | |
| with {:ok, body} <- File.read(@file_path), | |
| {:ok, json} <- Jason.decode(body, keys: :atoms), do: {:ok, json} | |
| end | |
| def read_file_stream do | |
| File.stream!(@file_path) | |
| end | |
| def process_average_scores do | |
| {time_in_microseconds, result} = :timer.tc(fn -> | |
| read_file_stream() | |
| |> Jaxon.Stream.from_enumerable() | |
| |> Jaxon.Stream.query([:root, :all]) | |
| |> Task.async_stream( | |
| fn student -> | |
| case student do | |
| %{"name" => nil} -> | |
| {:invalid, 0} | |
| %{"score" => score} when is_integer(score) and score >= 0 and score <= 100 -> | |
| {:valid, score} | |
| _invalid_score -> | |
| {:invalid, 0} | |
| end | |
| end, | |
| ordered: false, | |
| max_concurrency: System.schedulers_online() | |
| ) | |
| # Here we filter out invalid scores | |
| |> Stream.filter(fn {:ok, {score_status, _score}} -> score_status == :valid end) | |
| |> Stream.map(fn {:ok, {:valid, score}} -> score end) | |
| # Now we'll reduce valid results into the average. | |
| |> Stream.transform( | |
| # This start function returns an initial accumulator | |
| fn -> {0, 0} end, | |
| # This reducer accumulates scores and students count | |
| fn score, {scores_sum, students_count} -> | |
| new_state = {scores_sum + score, students_count + 1} | |
| {[], new_state} # No emissions, just accumulate state | |
| end, | |
| # Run `last_fun` callback, processing the stream result to get | |
| # the average | |
| fn | |
| # No students to process, returning 0 | |
| {_scores_sum, 0} -> | |
| {[0], nil} | |
| # Get the average score | |
| {sum, count} -> | |
| result = if count > 0, do: sum / count, else: nil | |
| # Missing return | |
| # Got it working by using `Stream.transform/5` instead of Stream.transform/4 | |
| {[result], nil} | |
| end, | |
| # Finally we run the after_fun callback, nothing to emit. | |
| # | |
| fn _acc -> | |
| :done | |
| end) | |
| |> Enum.at(0) | |
| end) | |
| time_in_milliseconds = time_in_microseconds / 1_000 | |
| Logger.info("Execution time: #{time_in_milliseconds} ms") | |
| result | |
| end | |
| def process_average_scores_parallel do | |
| chunk_size = calculate_chunk_size() | |
| {time_in_microseconds, result} = :timer.tc(fn -> | |
| read_file_stream() | |
| |> Jaxon.Stream.from_enumerable() | |
| |> Jaxon.Stream.query([:root, :all]) | |
| |> Stream.chunk_every(chunk_size) # Dynamic chunk size | |
| |> Task.async_stream(&process_chunk/1, ordered: false) | |
| |> Stream.map(fn {:ok, {sum, count}} -> {sum, count} end) | |
| |> Enum.reduce({0, 0}, fn {chunk_sum, chunk_count}, {total_sum, total_count} -> | |
| {total_sum + chunk_sum, total_count + chunk_count} | |
| end) | |
| |> case do | |
| {_, 0} -> 0 | |
| {sum, count} -> sum / count | |
| end | |
| end) | |
| time_in_milliseconds = time_in_microseconds / 1_000 | |
| Logger.info("Execution time: #{time_in_milliseconds} ms") | |
| result | |
| end | |
| # Helper function to process each chunk | |
| defp process_chunk(chunk) do | |
| Enum.reduce(chunk, {0, 0}, fn student, {sum, count} -> | |
| case student do | |
| %{"name" => nil} -> {sum, count} | |
| %{"score" => score} when is_integer(score) and score >= 0 and score <= 100 -> | |
| {sum + score, count + 1} | |
| _ -> {sum, count} | |
| end | |
| end) | |
| end | |
| # Dynamically calculate the chunk size based on dataset size | |
| defp calculate_chunk_size do | |
| total_records = count_total_records() | |
| cond do | |
| total_records <= 100_000 -> 500 | |
| total_records <= 1_000_000 -> 1000 | |
| total_records <= 10_000_000 -> 5000 | |
| true -> 10_000 | |
| end | |
| end | |
| # Mock function to count records (replace this with real logic) | |
| defp count_total_records do | |
| 10_000_000 # For example purposes | |
| end | |
| end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment