Skip to content

Instantly share code, notes, and snippets.

@Sailias
Last active March 28, 2017 15:02
Show Gist options
  • Select an option

  • Save Sailias/8c5d51938e964387911a3df348bd119d to your computer and use it in GitHub Desktop.

Select an option

Save Sailias/8c5d51938e964387911a3df348bd119d to your computer and use it in GitHub Desktop.
GenStage Producer that polls for new items to process when empty
require Logger
defmodule MinerJobWoker do
def start_link(worker_count, subscribe_options) do
# Start the producer
{:ok, producer_pid} = GenStage.start_link(__MODULE__.JobProducer, :ok)
# Override consumer subscribe options
subscribe_options = Keyword.put(subscribe_options, :to, producer_pid)
# Start the worker consumers
Enum.each(1..worker_count, fn _ ->
# Start consumer
{:ok, consumer_pid} = GenStage.start_link(__MODULE__.Worker, :ok)
# Subscribe to producer
GenStage.sync_subscribe(consumer_pid, subscribe_options)
end)
{:ok, producer_pid}
end
defmodule JobProducer do
use GenStage
def init(:ok), do: {:producer, {nil, 0}}
def handle_demand(_demand, state) do
take_jobs(state)
end
def handle_info(:poll, state) do
take_jobs(state)
end
@doc """
If there is pending_demand, process the queue
"""
defp take_jobs({ next_timer, n }) do
if (next_timer != nil) do
Process.cancel_timer(next_timer)
end
IO.inspect(["count", n])
cond do
n < 10 ->
{:noreply, Enum.to_list(1..10), {nil, n + 1}}
n >= 10 and n < 20 ->
next_timer = Process.send_after(self(), :poll, 200)
{:noreply, [], { next_timer, n + 1 }}
n == 20 ->
next_timer = Process.send_after(self(), :poll, 200)
{:noreply, [], { next_timer, 0 }}
end
end
end
defmodule Worker do
use GenStage
def init(:ok), do: {:consumer, nil}
def handle_events(jobs, _from, _next_timer) do
Enum.each(jobs, fn (n) ->
IO.inspect(n)
end)
{:noreply, [], nil}
end
end
end
{:ok, pid} = MinerJobWoker.start_link(2, max_demand: 1)
Process.sleep(:infinity)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment