Created
February 14, 2026 20:16
-
-
Save nickva/d03cbf3c6456c5125ca71a1e79197b1c to your computer and use it in GitHub Desktop.
Erlang/OTP segfault
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| -module(cwb_bench2). | |
| -export([run/5, status/1, stop/1]). | |
| -export([init/1, terminate/2]). | |
| -export([handle_call/3, handle_cast/2, handle_info/2]). | |
| -behaviour(gen_server). | |
| -define(HIBERNATE_DEFAULT, true). | |
| -record(q, { | |
| queue = queue:new(), | |
| blocked = [], | |
| max_size, | |
| max_items, | |
| items = 0, | |
| size = 0, | |
| worker = undefined, | |
| close_on_dequeue = false, | |
| hibernate = ?HIBERNATE_DEFAULT | |
| }). | |
| new(Options) -> | |
| gen_server:start_link(?MODULE, Options, []). | |
| queue(Wq, Item) when is_binary(Item) -> | |
| gen_server:call(Wq, {queue, Item, byte_size(Item)}, infinity); | |
| queue(Wq, Item) -> | |
| gen_server:call(Wq, {queue, Item, erlang:external_size(Item)}, infinity). | |
| dequeue(Wq) -> | |
| dequeue(Wq, all). | |
| dequeue(Wq, MaxItems) -> | |
| try | |
| gen_server:call(Wq, {dequeue, MaxItems}, infinity) | |
| catch | |
| _:_ -> closed | |
| end. | |
| close(Wq) -> | |
| gen_server:cast(Wq, close). | |
| init(Options) -> | |
| Q = #q{ | |
| max_size = get_value(max_size, Options, nil), | |
| max_items = get_value(max_items, Options, nil), | |
| hibernate = get_value(hibernate, Options, ?HIBERNATE_DEFAULT) | |
| }, | |
| {ok, Q}. | |
| terminate(_Reason, #q{worker = undefined}) -> | |
| ok; | |
| terminate(_Reason, #q{worker = {W, _}}) -> | |
| gen_server:reply(W, closed), | |
| ok. | |
| handle_call({queue, Item, Size}, From, #q{worker = undefined} = Q0) -> | |
| Q = Q0#q{ | |
| size = Q0#q.size + Size, | |
| items = Q0#q.items + 1, | |
| queue = queue:in({Item, Size}, Q0#q.queue) | |
| }, | |
| IsFull = (Q#q.size >= Q#q.max_size) orelse (Q#q.items >= Q#q.max_items), | |
| case {IsFull, Q#q.hibernate} of | |
| {true, true} -> {noreply, Q#q{blocked = [From | Q#q.blocked]}, hibernate}; | |
| {true, false} -> {noreply, Q#q{blocked = [From | Q#q.blocked]}}; | |
| {false, true} -> {reply, ok, Q, hibernate}; | |
| {false, false} -> {reply, ok, Q} | |
| end; | |
| handle_call({queue, Item, _}, _From, #q{worker = {W, _Max}} = Q) -> | |
| gen_server:reply(W, {ok, [Item]}), | |
| case Q#q.hibernate of | |
| true -> {reply, ok, Q#q{worker = undefined}, hibernate}; | |
| false -> {reply, ok, Q#q{worker = undefined}} | |
| end; | |
| handle_call({dequeue, _Max}, _From, #q{worker = {_, _}}) -> | |
| % Something went wrong - the same or a different worker is | |
| % trying to dequeue an item. We only allow one worker to wait | |
| % for work at a time, so we exit with an error. | |
| exit(multiple_workers_error); | |
| handle_call({dequeue, Max}, From, #q{worker = undefined, items = Count} = Q) -> | |
| case Count of | |
| 0 -> | |
| {noreply, Q#q{worker = {From, Max}}}; | |
| C when C > 0 -> | |
| deliver_queue_items(Max, Q) | |
| end; | |
| handle_call(item_count, _From, Q) -> | |
| {reply, Q#q.items, Q}; | |
| handle_call(size, _From, Q) -> | |
| {reply, Q#q.size, Q}. | |
| deliver_queue_items(Max, Q) -> | |
| #q{ | |
| queue = Queue, | |
| items = Count, | |
| size = Size, | |
| close_on_dequeue = Close, | |
| blocked = Blocked | |
| } = Q, | |
| case (Max =:= all) orelse (Max >= Count) of | |
| false -> | |
| {Items, Size2, Queue2, Blocked2} = dequeue_items( | |
| Max, Size, Queue, Blocked, [] | |
| ), | |
| Q2 = Q#q{ | |
| items = Count - Max, size = Size2, blocked = Blocked2, queue = Queue2 | |
| }, | |
| {reply, {ok, Items}, Q2}; | |
| true -> | |
| lists:foreach(fun(F) -> gen_server:reply(F, ok) end, Blocked), | |
| Q2 = Q#q{items = 0, size = 0, blocked = [], queue = queue:new()}, | |
| Items = [Item || {Item, _} <- queue:to_list(Queue)], | |
| case Close of | |
| false -> | |
| {reply, {ok, Items}, Q2}; | |
| true -> | |
| {stop, normal, {ok, Items}, Q2} | |
| end | |
| end. | |
| dequeue_items(0, Size, Queue, Blocked, DequeuedAcc) -> | |
| {lists:reverse(DequeuedAcc), Size, Queue, Blocked}; | |
| dequeue_items(NumItems, Size, Queue, Blocked, DequeuedAcc) -> | |
| {{value, {Item, ItemSize}}, Queue2} = queue:out(Queue), | |
| case Blocked of | |
| [] -> | |
| Blocked2 = Blocked; | |
| [From | Blocked2] -> | |
| gen_server:reply(From, ok) | |
| end, | |
| dequeue_items( | |
| NumItems - 1, Size - ItemSize, Queue2, Blocked2, [Item | DequeuedAcc] | |
| ). | |
| handle_cast(close, #q{items = 0} = Q) -> | |
| {stop, normal, Q}; | |
| handle_cast(close, Q) -> | |
| {noreply, Q#q{close_on_dequeue = true}}. | |
| handle_info(X, Q) -> | |
| {stop, X, Q}. | |
| run(BatchSize, BinarySize, MaxQueueItems, MaxQueueBytes, Hibernate) -> | |
| io:format("Batch Size (List Length) : ~p~n", [BatchSize]), | |
| io:format("Binary Size : ~p bytes~n", [BinarySize]), | |
| io:format("Max Queue Items : ~p~n", [MaxQueueItems]), | |
| io:format("Max Queue Size : ~p bytes~n", [MaxQueueBytes]), | |
| io:format("Hibernate : ~p ~n", [Hibernate]), | |
| {ok, Q} = new([ | |
| {max_items, MaxQueueItems}, | |
| {max_size, MaxQueueBytes}, | |
| {hibernate, Hibernate} | |
| ]), | |
| rand:seed(default, {1, 2, 3}), | |
| Consumer = spawn_link(fun() -> | |
| rand:seed(default, {1, 2, 3}), | |
| consumer(Q) | |
| end), | |
| Producer = spawn_link(fun() -> | |
| rand:seed(default, {1, 2, 3}), | |
| producer(Q, BinarySize, BatchSize) | |
| end), | |
| spawn_link(fun() -> | |
| rand:seed(default, {1, 2, 3}), | |
| checker(10, Producer, Consumer, 0, 0) | |
| end). | |
| status(Pid) -> | |
| case is_process_alive(Pid) of | |
| false -> | |
| dead; | |
| true -> | |
| Pid ! {status, self()}, | |
| receive | |
| {Pid, Produced, Consumed} -> | |
| {Produced, Consumed} | |
| end | |
| end. | |
| stop(Pid) -> | |
| case is_process_alive(Pid) of | |
| false -> | |
| dead; | |
| true -> | |
| Pid ! stop | |
| end. | |
| checker(MaxN, Producer, Consumer, Produced, Produced) -> | |
| N = rand:uniform(MaxN), | |
| Producer ! {produce, N, self()}, | |
| checker(MaxN, Producer, Consumer, Produced + N, Produced); | |
| checker(MaxN, Producer, Consumer, Produced, Consumed) -> | |
| receive | |
| {consumed, N} -> | |
| checker(MaxN, Producer, Consumer, Produced, Consumed + N); | |
| {status, From} -> | |
| From ! {self(), Produced, Consumed}, | |
| checker(MaxN, Producer, Consumer, Produced, Consumed); | |
| stop -> | |
| Producer ! close | |
| end. | |
| payload(BatchSize, BinarySize) -> | |
| {[ | |
| {rand:bytes(rand:uniform(10)), rand:bytes(rand:uniform(BinarySize))} | |
| || _ <- lists:seq(0, BatchSize)]}. | |
| producer(Q, BinarySize, BatchSize) -> | |
| receive | |
| {produce, N, NotifyPid} -> | |
| lists:foreach(fun(_) -> | |
| Payload = payload(BatchSize, BinarySize), | |
| queue(Q, {Payload, NotifyPid}), | |
| case rand:uniform() < 0.2 of | |
| true -> timer:sleep(round(rand:uniform()*20)); | |
| false -> ok | |
| end | |
| end, lists:seq(1, N)), | |
| producer(Q, BinarySize, BatchSize); | |
| close -> | |
| close(Q) | |
| end. | |
| consumer(Q) -> | |
| case dequeue(Q) of | |
| {ok, Items} -> | |
| [{_Payload, NotifyPid}|_]= Items, | |
| NotifyPid ! {consumed, length(Items)}, | |
| case rand:uniform() < 0.2 of | |
| true -> timer:sleep(round(rand:uniform()*20)); | |
| false -> ok | |
| end, | |
| consumer(Q); | |
| closed -> | |
| close(Q) | |
| end. | |
| get_value(Key, List, Default) -> | |
| case lists:keysearch(Key, 1, List) of | |
| {value, {K, Value}} when K =:= Key -> | |
| Value; | |
| false -> | |
| Default | |
| end. |
Author
Author
Unfortunately I can't reproduce it any longer. Perhaps it was a temporary hardware issue (overheating) or there was something else consuming memory in the background at that time.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
A run without the extra allocator args segfaults but in another area:
gdb