Skip to content

Instantly share code, notes, and snippets.

@nickva
Created February 14, 2026 20:16
Show Gist options
  • Select an option

  • Save nickva/d03cbf3c6456c5125ca71a1e79197b1c to your computer and use it in GitHub Desktop.

Select an option

Save nickva/d03cbf3c6456c5125ca71a1e79197b1c to your computer and use it in GitHub Desktop.
Erlang/OTP segfault
-module(cwb_bench2).
-export([run/5, status/1, stop/1]).
-export([init/1, terminate/2]).
-export([handle_call/3, handle_cast/2, handle_info/2]).
-behaviour(gen_server).
-define(HIBERNATE_DEFAULT, true).
-record(q, {
queue = queue:new(),
blocked = [],
max_size,
max_items,
items = 0,
size = 0,
worker = undefined,
close_on_dequeue = false,
hibernate = ?HIBERNATE_DEFAULT
}).
new(Options) ->
gen_server:start_link(?MODULE, Options, []).
queue(Wq, Item) when is_binary(Item) ->
gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity);
queue(Wq, Item) ->
gen_server:call(Wq, {queue, Item, erlang:external_size(Item)}, infinity).
dequeue(Wq) ->
dequeue(Wq, all).
dequeue(Wq, MaxItems) ->
try
gen_server:call(Wq, {dequeue, MaxItems}, infinity)
catch
_:_ -> closed
end.
close(Wq) ->
gen_server:cast(Wq, close).
init(Options) ->
Q = #q{
max_size = get_value(max_size, Options, nil),
max_items = get_value(max_items, Options, nil),
hibernate = get_value(hibernate, Options, ?HIBERNATE_DEFAULT)
},
{ok, Q}.
terminate(_Reason, #q{worker = undefined}) ->
ok;
terminate(_Reason, #q{worker = {W, _}}) ->
gen_server:reply(W, closed),
ok.
handle_call({queue, Item, Size}, From, #q{worker = undefined} = Q0) ->
Q = Q0#q{
size = Q0#q.size + Size,
items = Q0#q.items + 1,
queue = queue:in({Item, Size}, Q0#q.queue)
},
IsFull = (Q#q.size >= Q#q.max_size) orelse (Q#q.items >= Q#q.max_items),
case {IsFull, Q#q.hibernate} of
{true, true} -> {noreply, Q#q{blocked = [From | Q#q.blocked]}, hibernate};
{true, false} -> {noreply, Q#q{blocked = [From | Q#q.blocked]}};
{false, true} -> {reply, ok, Q, hibernate};
{false, false} -> {reply, ok, Q}
end;
handle_call({queue, Item, _}, _From, #q{worker = {W, _Max}} = Q) ->
gen_server:reply(W, {ok, [Item]}),
case Q#q.hibernate of
true -> {reply, ok, Q#q{worker = undefined}, hibernate};
false -> {reply, ok, Q#q{worker = undefined}}
end;
handle_call({dequeue, _Max}, _From, #q{worker = {_, _}}) ->
% Something went wrong - the same or a different worker is
% trying to dequeue an item. We only allow one worker to wait
% for work at a time, so we exit with an error.
exit(multiple_workers_error);
handle_call({dequeue, Max}, From, #q{worker = undefined, items = Count} = Q) ->
case Count of
0 ->
{noreply, Q#q{worker = {From, Max}}};
C when C > 0 ->
deliver_queue_items(Max, Q)
end;
handle_call(item_count, _From, Q) ->
{reply, Q#q.items, Q};
handle_call(size, _From, Q) ->
{reply, Q#q.size, Q}.
deliver_queue_items(Max, Q) ->
#q{
queue = Queue,
items = Count,
size = Size,
close_on_dequeue = Close,
blocked = Blocked
} = Q,
case (Max =:= all) orelse (Max >= Count) of
false ->
{Items, Size2, Queue2, Blocked2} = dequeue_items(
Max, Size, Queue, Blocked, []
),
Q2 = Q#q{
items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2
},
{reply, {ok, Items}, Q2};
true ->
lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked),
Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()},
Items = [Item || {Item, _} <- queue:to_list(Queue)],
case Close of
false ->
{reply, {ok, Items}, Q2};
true ->
{stop, normal, {ok, Items}, Q2}
end
end.
dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) ->
{lists:reverse(DequeuedAcc), Size, Queue, Blocked};
dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) ->
{{value, {Item, ItemSize}}, Queue2} = queue:out(Queue),
case Blocked of
[] ->
Blocked2 = Blocked;
[From | Blocked2] ->
gen_server:reply(From, ok)
end,
dequeue_items(
NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc]
).
handle_cast(close, #q{items = 0} = Q) ->
{stop, normal, Q};
handle_cast(close, Q) ->
{noreply, Q#q{close_on_dequeue = true}}.
handle_info(X, Q) ->
{stop, X, Q}.
run(BatchSize, BinarySize, MaxQueueItems, MaxQueueBytes, Hibernate) ->
io:format("Batch Size (List Length) : ~p~n", [BatchSize]),
io:format("Binary Size : ~p bytes~n", [BinarySize]),
io:format("Max Queue Items : ~p~n", [MaxQueueItems]),
io:format("Max Queue Size : ~p bytes~n", [MaxQueueBytes]),
io:format("Hibernate : ~p ~n", [Hibernate]),
{ok, Q} = new([
{max_items, MaxQueueItems},
{max_size, MaxQueueBytes},
{hibernate, Hibernate}
]),
rand:seed(default, {1, 2, 3}),
Consumer = spawn_link(fun() ->
rand:seed(default, {1, 2, 3}),
consumer(Q)
end),
Producer = spawn_link(fun() ->
rand:seed(default, {1, 2, 3}),
producer(Q, BinarySize, BatchSize)
end),
spawn_link(fun() ->
rand:seed(default, {1, 2, 3}),
checker(10, Producer, Consumer, 0, 0)
end).
status(Pid) ->
case is_process_alive(Pid) of
false ->
dead;
true ->
Pid ! {status, self()},
receive
{Pid, Produced, Consumed} ->
{Produced, Consumed}
end
end.
stop(Pid) ->
case is_process_alive(Pid) of
false ->
dead;
true ->
Pid ! stop
end.
checker(MaxN, Producer, Consumer, Produced, Produced) ->
N = rand:uniform(MaxN),
Producer ! {produce, N, self()},
checker(MaxN, Producer, Consumer, Produced + N, Produced);
checker(MaxN, Producer, Consumer, Produced, Consumed) ->
receive
{consumed, N} ->
checker(MaxN, Producer, Consumer, Produced, Consumed + N);
{status, From} ->
From ! {self(), Produced, Consumed},
checker(MaxN, Producer, Consumer, Produced, Consumed);
stop ->
Producer ! close
end.
payload(BatchSize, BinarySize) ->
{[
{rand:bytes(rand:uniform(10)), rand:bytes(rand:uniform(BinarySize))}
|| _ <- lists:seq(0, BatchSize)]}.
producer(Q, BinarySize, BatchSize) ->
receive
{produce, N, NotifyPid} ->
lists:foreach(fun(_) ->
Payload = payload(BatchSize, BinarySize),
queue(Q, {Payload, NotifyPid}),
case rand:uniform() < 0.2 of
true -> timer:sleep(round(rand:uniform()*20));
false -> ok
end
end, lists:seq(1, N)),
producer(Q, BinarySize, BatchSize);
close ->
close(Q)
end.
consumer(Q) ->
case dequeue(Q) of
{ok, Items} ->
[{_Payload, NotifyPid}|_]= Items,
NotifyPid ! {consumed, length(Items)},
case rand:uniform() < 0.2 of
true -> timer:sleep(round(rand:uniform()*20));
false -> ok
end,
consumer(Q);
closed ->
close(Q)
end.
get_value(Key, List, Default) ->
case lists:keysearch(Key, 1, List) of
{value, {K, Value}} when K =:= Key ->
Value;
false ->
Default
end.
@nickva
Copy link
Author

nickva commented Feb 16, 2026

Unfortunately I can't reproduce it any longer. Perhaps it was a temporary hardware issue (overheating) or there was something else consuming memory in the background at that time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment