Last active
March 28, 2017 15:02
-
-
Save Sailias/8c5d51938e964387911a3df348bd119d to your computer and use it in GitHub Desktop.
GenStage Producer that polls for new items to process when empty
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| require Logger | |
| defmodule MinerJobWoker do | |
| def start_link(worker_count, subscribe_options) do | |
| # Start the producer | |
| {:ok, producer_pid} = GenStage.start_link(__MODULE__.JobProducer, :ok) | |
| # Override consumer subscribe options | |
| subscribe_options = Keyword.put(subscribe_options, :to, producer_pid) | |
| # Start the worker consumers | |
| Enum.each(1..worker_count, fn _ -> | |
| # Start consumer | |
| {:ok, consumer_pid} = GenStage.start_link(__MODULE__.Worker, :ok) | |
| # Subscribe to producer | |
| GenStage.sync_subscribe(consumer_pid, subscribe_options) | |
| end) | |
| {:ok, producer_pid} | |
| end | |
| defmodule JobProducer do | |
| use GenStage | |
| def init(:ok), do: {:producer, {nil, 0}} | |
| def handle_demand(_demand, state) do | |
| take_jobs(state) | |
| end | |
| def handle_info(:poll, state) do | |
| take_jobs(state) | |
| end | |
| @doc """ | |
| If there is pending_demand, process the queue | |
| """ | |
| defp take_jobs({ next_timer, n }) do | |
| if (next_timer != nil) do | |
| Process.cancel_timer(next_timer) | |
| end | |
| IO.inspect(["count", n]) | |
| cond do | |
| n < 10 -> | |
| {:noreply, Enum.to_list(1..10), {nil, n + 1}} | |
| n >= 10 and n < 20 -> | |
| next_timer = Process.send_after(self(), :poll, 200) | |
| {:noreply, [], { next_timer, n + 1 }} | |
| n == 20 -> | |
| next_timer = Process.send_after(self(), :poll, 200) | |
| {:noreply, [], { next_timer, 0 }} | |
| end | |
| end | |
| end | |
| defmodule Worker do | |
| use GenStage | |
| def init(:ok), do: {:consumer, nil} | |
| def handle_events(jobs, _from, _next_timer) do | |
| Enum.each(jobs, fn (n) -> | |
| IO.inspect(n) | |
| end) | |
| {:noreply, [], nil} | |
| end | |
| end | |
| end | |
| {:ok, pid} = MinerJobWoker.start_link(2, max_demand: 1) | |
| Process.sleep(:infinity) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment