Skip to content

Instantly share code, notes, and snippets.

@shamsimam
Last active March 30, 2022 04:03
Show Gist options
  • Select an option

  • Save shamsimam/2d0b924ecbafff4904fa7c14984e96ba to your computer and use it in GitHub Desktop.

Select an option

Save shamsimam/2d0b924ecbafff4904fa7c14984e96ba to your computer and use it in GitHub Desktop.
Websocket Out of Memory Errors
2022-03-28 14:09:51,353 ERROR waiter.websocket [async-dispatch-48] - [CID=oom-12329bd58ba60-62fdcc507e6a1643] error from instance websocket request
java.lang.OutOfMemoryError: Java heap space
at java.base/java.io.ByteArrayOutputStream.<init>(ByteArrayOutputStream.java:81)
at org.eclipse.jetty.util.ByteArrayOutputStream2.<init>(ByteArrayOutputStream2.java:36)
at org.eclipse.jetty.websocket.common.message.SimpleBinaryMessage.appendFrame(SimpleBinaryMessage.java:60)
at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.appendMessage(AbstractEventDriver.java:61)
at org.eclipse.jetty.websocket.common.events.JettyListenerEventDriver.onBinaryFrame(JettyListenerEventDriver.java:74)
at org.eclipse.jetty.websocket.common.events.AbstractEventDriver.incomingFrame(AbstractEventDriver.java:145)
at org.eclipse.jetty.websocket.common.WebSocketSession.incomingFrame(WebSocketSession.java:326)
at org.eclipse.jetty.websocket.common.extensions.ExtensionStack.incomingFrame(ExtensionStack.java:202)
at org.eclipse.jetty.websocket.common.Parser.notifyFrame(Parser.java:225)
at org.eclipse.jetty.websocket.common.Parser.parseSingleFrame(Parser.java:259)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:459)
at org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection.onFillable(AbstractWebSocketConnection.java:440)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:375)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
at java.base/java.lang.Thread.run(Thread.java:833)
"clojure.core.async.timers/timeout-daemon" #78 daemon prio=5 os_prio=31 cpu=32888.53ms elapsed=14315.14s tid=0x00007ff56cb72600 nid=0x11c03 runnable [0x0000700008476000]
java.lang.Thread.State: TIMED_WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17.0.1/Native Method)
- parking to wait for <0x00000007009b7ff0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(java.base@17.0.1/LockSupport.java:252)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(java.base@17.0.1/AbstractQueuedSynchronizer.java:1672)
at java.util.concurrent.DelayQueue.take(java.base@17.0.1/DelayQueue.java:229)
at clojure.core.async.impl.timers$timeout_worker.invokeStatic(timers.clj:47)
at clojure.core.async.impl.timers$timeout_worker.invoke(timers.clj:43)
at clojure.lang.AFn.run(AFn.java:22)
at java.lang.Thread.run(java.base@17.0.1/Thread.java:833)
(deftest ^:parallel ^:integration-fast test-request-stream-bytes-and-string
(testing-using-waiter-url
(let [auth-cookie-value (auth-cookie waiter-url)
uncorrupted-data-streamed-atom (atom false)
correlation-id (str "oom-" (utils/unique-identifier))
waiter-headers (assoc (kitchen-request-headers)
"x-cid" correlation-id
"x-waiter-metric-group" "waiter_ws_test"
"x-waiter-name" (rand-name))]
(is auth-cookie-value)
(try
(println "CID:" correlation-id)
(.flush System/out)
(Thread/sleep 1000)
(let [response-promise (promise)
^WebSocketClient websocket-client (websocket-client-factory)
message-length 23000000 ;; jetty default is 65536
max-message-length (+ 1024 message-length)]
(update-max-message-sizes websocket-client max-message-length max-message-length)
(ws-client/connect!
websocket-client
(ws-url waiter-url "/events/connect")
(fn [{:keys [in out]}]
(async/go
(async/>! out "hi")
(println (async/<! in)) ;; hi response
(let [repeats 200
eager-term-repeats (- repeats 150)
message-size (* 22 1000 1000)
delay-ms 0]
(async/>! out (str "send " repeats " " message-size " " delay-ms))
(dotimes [index eager-term-repeats]
(let [backend-bytes (async/<! in)]
(if (instance? ByteBuffer backend-bytes)
(println (str"received-" index) " bytes:" (.capacity backend-bytes))
(println (str"received-" index) backend-bytes)))))
(async/>! out "bye")
(println (async/<! in)) ;; connection closed
(deliver response-promise :done)
(async/close! out)))
{:middleware (fn [_ ^UpgradeRequest request]
(websocket/add-headers-to-upgrade-request! request waiter-headers)
(add-auth-cookie request auth-cookie-value))})
(is (= :done (deref response-promise default-timeout-period :timed-out))))
(is @uncorrupted-data-streamed-atom)
(finally
(delete-service waiter-url waiter-headers))))))
2022-03-28 14:08:49,955 INFO waiter.websocket [async-dispatch-22] - [CID=oom-1231bde10dbca-208026a54c1af5e0] forwarding request for service w9091-waiwebinttestesreqstrbytandstr320077575689638-7c974a17cd27930f1d5c06e687f0f486 to ws://127.0.0.1:8090/events/connect
2022-03-28 14:08:49,978 INFO waiter.websocket [waiter-client-ws-66] - [CID=oom-1231bde10dbca-208026a54c1af5e0] successfully connected with backend
2022-03-28 14:08:49,986 INFO waiter.websocket [async-dispatch-26] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 0 received 2 bytes from client
2022-03-28 14:08:50,000 INFO waiter.websocket [async-dispatch-49] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 0 received 2 bytes from instance
2022-03-28 14:08:50,005 INFO waiter.websocket [async-dispatch-43] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 1 received 19 bytes from client
2022-03-28 14:08:50,759 INFO waiter.websocket [async-dispatch-10] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 1 received 22000000 bytes from instance
2022-03-28 14:08:50,880 INFO waiter.websocket [async-dispatch-28] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 2 received 22000000 bytes from instance
2022-03-28 14:08:51,059 INFO waiter.websocket [async-dispatch-29] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 3 received 22000000 bytes from instance
2022-03-28 14:08:51,219 INFO waiter.websocket [async-dispatch-31] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 4 received 22000000 bytes from instance
2022-03-28 14:08:51,312 INFO waiter.websocket [async-dispatch-35] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 5 received 22000000 bytes from instance
2022-03-28 14:08:51,444 INFO waiter.websocket [async-dispatch-24] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 6 received 22000000 bytes from instance
2022-03-28 14:08:51,506 INFO waiter.websocket [async-dispatch-38] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 7 received 22000000 bytes from instance
2022-03-28 14:08:51,606 INFO waiter.websocket [async-dispatch-41] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 8 received 22000000 bytes from instance
2022-03-28 14:08:53,171 INFO waiter.websocket [async-dispatch-19] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 9 received 22000000 bytes from instance
2022-03-28 14:08:54,623 INFO waiter.websocket [async-dispatch-10] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 10 received 22000000 bytes from instance
2022-03-28 14:08:56,351 INFO waiter.websocket [async-dispatch-5] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 11 received 22000000 bytes from instance
2022-03-28 14:08:57,484 INFO waiter.websocket [async-dispatch-26] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 12 received 22000000 bytes from instance
2022-03-28 14:08:58,735 INFO waiter.websocket [async-dispatch-20] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 13 received 22000000 bytes from instance
2022-03-28 14:09:00,194 INFO waiter.websocket [async-dispatch-16] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 14 received 22000000 bytes from instance
2022-03-28 14:09:01,673 INFO waiter.websocket [async-dispatch-58] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 15 received 22000000 bytes from instance
2022-03-28 14:09:02,912 INFO waiter.websocket [async-dispatch-37] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 16 received 22000000 bytes from instance
2022-03-28 14:09:04,456 INFO waiter.websocket [async-dispatch-36] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 17 received 22000000 bytes from instance
2022-03-28 14:09:06,511 INFO waiter.websocket [async-dispatch-25] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 18 received 22000000 bytes from instance
2022-03-28 14:09:07,428 INFO waiter.websocket [async-dispatch-50] - [CID=oom-1231bde10dbca-208026a54c1af5e0] received on instance ctrl chan: :qbits.jet.websocket/error nil
2022-03-28 14:09:07,427 INFO waiter.websocket [async-dispatch-55] - [CID=oom-1231bde10dbca-208026a54c1af5e0] propagating error to response in case websocket connection failed
2022-03-28 14:09:07,433 ERROR waiter.websocket [async-dispatch-50] - [CID=oom-1231bde10dbca-208026a54c1af5e0] error from instance websocket request
2022-03-28 14:09:07,444 INFO waiter.websocket [async-dispatch-50] - [CID=oom-1231bde10dbca-208026a54c1af5e0] instance requesting close of websocket: :instance-error nil
2022-03-28 14:09:07,445 INFO waiter.websocket [async-dispatch-43] - [CID=oom-1231bde10dbca-208026a54c1af5e0] websocket connections requested to be closed due to :instance :instance-error nil
2022-03-28 14:09:08,038 INFO waiter.websocket [async-dispatch-4] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 19 received 22000000 bytes from instance
2022-03-28 14:09:08,593 INFO waiter.websocket [async-dispatch-46] - [CID=oom-1231bde10dbca-208026a54c1af5e0] closing websocket channels
2022-03-28 14:09:08,594 INFO waiter.process-request [async-dispatch-40] - [CID=oom-1231bde10dbca-208026a54c1af5e0] done processing request :instance-error
2022-03-28 14:09:08,595 INFO waiter.websocket [async-dispatch-21] - [CID=oom-1231bde10dbca-208026a54c1af5e0] closing client session with code 1011 Java heap space
2022-03-28 14:09:08,758 INFO waiter.websocket [async-dispatch-65] - [CID=oom-1231bde10dbca-208026a54c1af5e0] client input channel has been closed, bytes streamed: 21
2022-03-28 14:09:08,759 INFO waiter.websocket [async-dispatch-35] - [CID=oom-1231bde10dbca-208026a54c1af5e0] received on client ctrl chan: :qbits.jet.websocket/close status code 1011
2022-03-28 14:09:08,759 INFO waiter.state.responder [async-dispatch-36] - [CID=oom-1231bde10dbca-208026a54c1af5e0] w9091-waiwebinttestesreqstrbytandstr320077575689638-7c974a17cd27930f1d5c06e687f0f486.1231c2b6ccdeb-163835a5447f009 with status :instance-error has 1 consecutive failures
2022-03-28 14:09:08,761 INFO waiter.core [qtp308770021-172] - [CID=oom-1231bde10dbca-208026a54c1af5e0] request received: {:internal-protocol HTTP/1.1, :request-id 12320e5da6cfc-1e63b9f90f43b690-http, :remote-addr 127.0.0.1, :client-protocol HTTP/1.1, :headers {host 127.0.0.1:9091, x-waiter-cmd-type shell, x-waiter-name waiwebinttestesreqstrbytandstr320077575689638, x-waiter-version version-does-not-matter, user-agent waiter-test/websocket-oom-fix-20220324.http1, x-waiter-grace-period-secs 120, x-waiter-idle-timeout-mins 10, x-waiter-metric-group waiter_test, x-waiter-health-check-url /status, x-cid oom-1231bde10dbca-208026a54c1af5e0, x-waiter-min-instances 1, accept-encoding gzip, x-waiter-cpus 0.1, x-waiter-mem 256, x-waiter-cmd /Users/shamsimam/projects/github-projects/twosigma-waiter/containers/test-app...}, :content-length nil, :content-type nil, :character-encoding nil, :uri /service-id, :query-string nil, :router-id r9091-12319cef7deb1-1e824d65fe3b7bd8, :scheme :http, :request-method :get}
2022-03-28 14:09:08,762 INFO waiter.websocket [async-dispatch-35] - [CID=oom-1231bde10dbca-208026a54c1af5e0] client requesting close of websocket: :connection-closed Java heap space
2022-03-28 14:09:08,764 INFO waiter.state.responder [async-dispatch-36] - [CID=oom-1231bde10dbca-208026a54c1af5e0] ejecting instance w9091-waiwebinttestesreqstrbytandstr320077575689638-7c974a17cd27930f1d5c06e687f0f486.1231c2b6ccdeb-163835a5447f009 for 10000.0 ms.
2022-03-28 14:09:08,781 INFO waiter.websocket [async-dispatch-18] - [CID=oom-1231bde10dbca-208026a54c1af5e0] frame 20 received 22000000 bytes from instance
2022-03-28 14:09:18,803 INFO waiter.state.responder [async-dispatch-55] - [CID=oom-1231bde10dbca-208026a54c1af5e0] requesting instance w9091-waiwebinttestesreqstrbytandstr320077575689638-7c974a17cd27930f1d5c06e687f0f486.1231c2b6ccdeb-163835a5447f009 to be unejected
2022-03-28 14:09:28,784 ERROR waiter.websocket [async-dispatch-37] - [CID=oom-1231bde10dbca-208026a54c1af5e0] unable to stream to client {:cid oom-1231bde10dbca-208026a54c1af5e0, :bytes-streamed 418000002}
package shams;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter;
import javax.servlet.ServletException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Locale;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class WebsocketServer {
public static void main(String[] args) throws ServletException {
Server server = new Server();
ServerConnector connector = new ServerConnector(server);
connector.setPort(8090);
server.addConnector(connector);
// Setup the basic application "context" for this application at "/"
// This is also known as the handler tree (in jetty speak)
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath("/");
server.setHandler(context);
// Configure specific websocket behavior
NativeWebSocketServletContainerInitializer.configure(context,
(servletContext, nativeWebSocketConfiguration) ->
{
// Configure default max size
nativeWebSocketConfiguration.getPolicy().setMaxTextMessageBufferSize(65535);
// Add websockets
nativeWebSocketConfiguration.addMapping("/events/*", EventSocket.class);
});
// Add generic filter that will accept WebSocket upgrade.
WebSocketUpgradeFilter.configure(context);
try {
server.start();
server.join();
} catch (Throwable t) {
t.printStackTrace(System.err);
}
}
public static String generateRandomAlphabeticString(int targetStringLength) {
int leftLimit = 97; // letter 'a'
int rightLimit = 122; // letter 'z'
Random random = new Random();
String generatedString = random.ints(leftLimit, rightLimit + 1)
.limit(targetStringLength)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
return generatedString;
}
public static class EventSocket extends WebSocketAdapter {
private final CountDownLatch closureLatch = new CountDownLatch(1);
@Override
public void onWebSocketConnect(Session sess) {
super.onWebSocketConnect(sess);
System.out.println("Socket Connected: " + sess);
}
@Override
public void onWebSocketText(String message) {
super.onWebSocketText(message);
System.out.println("Received TEXT message: " + message);
final String[] fragments = message.split(" ");
System.out.println("Fragments: " + Arrays.toString(fragments));
if (message.toLowerCase(Locale.US).contains("hi")) {
try {
getRemote().sendString("Hi");
} catch (IOException e) {
throw new RuntimeException("Error in sending string via websocket", e);
}
} else if ("send".equals(fragments[0])) {
int repeats = Integer.parseInt(fragments[1]);
int length = Integer.parseInt(fragments[2]);
int delayMs = Integer.parseInt(fragments[3]);
String randomString = generateRandomAlphabeticString(length);
for (int i = 0; i < repeats; i++) {
try {
System.out.println("Iteration-" + i + " sending " + randomString.length() + " bytes");
ByteBuffer responseBuffer = ByteBuffer.wrap(randomString.getBytes());
getRemote().sendBytes(responseBuffer);
} catch (IOException e) {
throw new RuntimeException("Error in sending string via websocket", e);
}
if (delayMs > 0) {
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
throw new RuntimeException("Error in sleeping inside websocket", e);
}
}
}
try {
System.out.println("Flushing data sent to remote");
getRemote().flush();
} catch (IOException e) {
throw new RuntimeException("Error in flushing websocket", e);
}
} else if (message.toLowerCase(Locale.US).contains("bye")) {
getSession().close(StatusCode.NORMAL, "Thanks");
} else {
System.out.println("Unsupported message: " + message);
}
}
@Override
public void onWebSocketClose(int statusCode, String reason) {
super.onWebSocketClose(statusCode, reason);
System.out.println("Socket Closed: [" + statusCode + "] " + reason);
closureLatch.countDown();
}
@Override
public void onWebSocketError(Throwable cause) {
super.onWebSocketError(cause);
cause.printStackTrace(System.err);
}
public void awaitClosure() throws InterruptedException {
System.out.println("Awaiting closure from remote");
closureLatch.await();
}
}
}
@shamsimam
Copy link
Author

Example with OOM:

Screen Shot 2022-03-29 at 9 38 55 PM

Example with backpressure and no OOM:

Screen Shot 2022-03-29 at 9 38 44 PM

@shamsimam
Copy link
Author

Dominator tree with OOM:

Screen Shot 2022-03-29 at 9 38 17 PM

Dominator tree with backpressure and no OOM:

Screen Shot 2022-03-29 at 9 38 28 PM

@shamsimam
Copy link
Author

After core.async version upgrade to 1.5.648, the memory leak seems to be gone after running the same client streaming lots of data:

Screen Shot 2022-03-29 at 11 01 38 PM

Screen Shot 2022-03-29 at 11 01 51 PM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment