Skip to content

Instantly share code, notes, and snippets.

@kojilin
Created May 10, 2021 12:26
Show Gist options
  • Select an option

  • Save kojilin/b671b822a7e944463754911d395ad1d9 to your computer and use it in GitHub Desktop.

Select an option

Save kojilin/b671b822a7e944463754911d395ad1d9 to your computer and use it in GitHub Desktop.
package com.linecorp.armeria.client.grpc;
import static com.linecorp.armeria.grpc.testing.Messages.PayloadType.COMPRESSABLE;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import com.google.protobuf.ByteString;
import com.linecorp.armeria.client.Clients;
import com.linecorp.armeria.client.circuitbreaker.CircuitBreaker;
import com.linecorp.armeria.client.circuitbreaker.CircuitBreakerClient;
import com.linecorp.armeria.client.circuitbreaker.CircuitBreakerDecision;
import com.linecorp.armeria.client.circuitbreaker.CircuitBreakerRuleWithContent;
import com.linecorp.armeria.client.retry.RetryDecision;
import com.linecorp.armeria.client.retry.RetryRuleWithContent;
import com.linecorp.armeria.client.retry.RetryingClient;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.SessionProtocol;
import com.linecorp.armeria.common.grpc.GrpcSerializationFormats;
import com.linecorp.armeria.grpc.testing.Messages.Payload;
import com.linecorp.armeria.grpc.testing.Messages.ResponseParameters;
import com.linecorp.armeria.grpc.testing.Messages.StreamingOutputCallRequest;
import com.linecorp.armeria.grpc.testing.Messages.StreamingOutputCallResponse;
import com.linecorp.armeria.grpc.testing.TestServiceGrpc.TestServiceBlockingStub;
import com.linecorp.armeria.internal.common.grpc.TestServiceImpl;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.grpc.GrpcService;
import com.linecorp.armeria.testing.junit5.server.ServerExtension;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.ResourceLeakDetector.Level;
class GrpcRetryWithCircuitBreakerTest {
@RegisterExtension
static ServerExtension server = new ServerExtension() {
@Override
protected void configure(ServerBuilder sb) throws Exception {
sb.service(GrpcService.builder()
.addService(new TestServiceImpl(Executors.newSingleThreadScheduledExecutor()))
.build());
}
};
@Test
void retryWithCircuitBreaker() throws InterruptedException {
ResourceLeakDetector.setLevel(Level.PARANOID);
final RetryRuleWithContent<HttpResponse> retryRuleWithContent =
(ctx, response, cause) -> {
return response.aggregate().thenApply(aggregatedHttpResponse -> {
return RetryDecision.noRetry();
});
};
final CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaultName();
final CircuitBreakerRuleWithContent<HttpResponse> cbRuleWithContent =
(ctx, response, cause) -> {
return response.aggregate().thenApply(aggregatedHttpResponse -> {
return CircuitBreakerDecision.success();
});
};
final TestServiceBlockingStub blockingStub =
Clients.builder(server.uri(SessionProtocol.HTTP, GrpcSerializationFormats.PROTO))
.decorator(CircuitBreakerClient.newDecorator(circuitBreaker, cbRuleWithContent))
.decorator(RetryingClient.newDecorator(retryRuleWithContent))
.build(TestServiceBlockingStub.class);
for (int i = 0; i < 10000; i++) {
final StreamingOutputCallRequest request =
StreamingOutputCallRequest.newBuilder()
.setResponseType(COMPRESSABLE)
.addResponseParameters(
ResponseParameters.newBuilder()
.setSize(31)
.setIntervalUs(1000))
.addResponseParameters(ResponseParameters.newBuilder()
.setSize(9)
.setIntervalUs(1000))
.build();
final List<StreamingOutputCallResponse> goldenResponses = Arrays.asList(
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[31])))
.build(),
StreamingOutputCallResponse.newBuilder()
.setPayload(Payload.newBuilder()
.setType(COMPRESSABLE)
.setBody(ByteString.copyFrom(new byte[9])))
.build());
final List<StreamingOutputCallResponse> responses = new ArrayList<>();
final Iterator<StreamingOutputCallResponse> it = blockingStub.streamingOutputCall(request);
while (it.hasNext()) {
responses.add(it.next());
}
assertThat(responses).containsExactlyElementsOf(goldenResponses);
Thread.sleep(100);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment