Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save asardaes/f48ae9ca631060e00fae19eb5eb5fdf4 to your computer and use it in GitHub Desktop.

Select an option

Save asardaes/f48ae9ca631060e00fae19eb5eb5fdf4 to your computer and use it in GitHub Desktop.
Streaming JSON Body Extractor
package org.test;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonPointer;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteBufferFeeder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.ReactiveHttpInputMessage;
import org.springframework.web.reactive.function.BodyExtractor;
import reactor.core.publisher.Flux;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
// adapted from https://gist.github.com/HaloFour/148002a3da8e8b5b48c6a55a548a3b97
/**
* Implementation of a {@link BodyExtractor} that can asynchronously parse and scan into a JSON document to find
* an array or object at a specified {@link JsonPointer} and deserialize each entity at that location without
* buffering the entire response payload into memory.
* </p>
* Given the following JSON docuent:
* <pre>{@code
* {
* "resolveChannelsResponse": {
* "channels": [
* {
* "channelId": "merlin:linear:channel:5903704021496414104"
* },
* {
* "channelId": "merlin:linear:channel:8692448035122410104"
* }
* ],
* "version": "123"
* }
* }
* }</pre>
* <p>
* You can use the extractor as follows:
*
* <pre>{@code
* return client.post()
* .uri(serviceDiscovery.getServiceEndpoint(ServiceTypes.GRID_WS), builder -> {
* builder = withCommonComponents(builder);
* return builder.build();
* })
* .contentType(MediaType.APPLICATION_JSON)
* .bodyValue(Map.of("resolveChannels", request))
* .accept(MediaType.APPLICATION_JSON)
* .exchange()
* .flatMapMany(response -> {
* if (response.statusCode().isError()) {
* return response.createException();
* }
* return response.body(streamingJsonBodyExtractor.toFlux(ChannelInfo.class,
* JsonPointer.compile("/resolveChannelsResponse/channels")))
* });
* }</pre>
* <p>
* The result will be a {@link Flux} with each element emitted as it is parsed and deserialized.
*/
public class StreamingJsonBodyExtractor {
private final ObjectMapper mapper;
/**
* Creates an instance of the {@link StreamingJsonBodyExtractor} for the configured {@link ObjectMapper}
*
* @param mapper the {@link ObjectMapper}
*/
public StreamingJsonBodyExtractor(ObjectMapper mapper) {
this.mapper = mapper;
}
/**
* Extractor to stream deserialized entities from a JSON pointer to a node within the response.
*
* @param type the {@link Class} of the element type to decode to
* @param pointer the {@link JsonPointer} to the nested location within the JSON body to start streaming
* @param <T> the element type to decode to
* @param <M> the type of the HTTP input message
* @return the {@link BodyExtractor} to stream the entities into a {@link Flux}
*/
public <T, M extends ReactiveHttpInputMessage> BodyExtractor<Flux<T>, M> toFlux(Class<T> type, JsonPointer pointer) {
return toFlux(ResolvableType.forClass(type), pointer);
}
/**
* Extractor to stream deserialized entities from a JSON pointer to a node within the response.
*
* @param type the {@link ParameterizedTypeReference} of the generic element type to decode to
* @param pointer the {@link JsonPointer} to the nested location within the JSON body to start streaming
* @param <T> the element type to decode to
* @param <M> the type of the HTTP input message
* @return the {@link BodyExtractor} to stream the entities into a {@link Flux}
*/
public <T, M extends ReactiveHttpInputMessage> BodyExtractor<Flux<T>, M> toFlux(ParameterizedTypeReference<T> type, JsonPointer pointer) {
return toFlux(ResolvableType.forType(type), pointer);
}
/**
* Extractor to stream deserialized entities from a JSON pointer to a node within the response.
*
* @param type the {@link ResolvableType} of the element type to decode to
* @param pointer the {@link JsonPointer} to the nested location within the JSON body to start streaming
* @param <T> the element type to decode to
* @param <M> the type of the HTTP input message
* @return the {@link BodyExtractor} to stream the entities into a {@link Flux}
*/
public <T, M extends ReactiveHttpInputMessage> BodyExtractor<Flux<T>, M> toFlux(ResolvableType type, JsonPointer pointer) {
return (inputMessage, context) -> extract(inputMessage, type, pointer);
}
/**
* Extracts the streaming entities from the given message.
*
* @param inputMessage the request to extract from
* @param type the {@link ResolvableType} of the element type to decode to
* @param pointer the {@link JsonPointer} to the nested location within the JSON body to start streaming
* @param <T> the element type to decode to
* @param <M> the type of the HTTP input message
* @return the {@link Flux} of the entities being streamed
*/
public <T, M extends ReactiveHttpInputMessage> Flux<T> extract(M inputMessage, ResolvableType type, JsonPointer pointer) {
return extract(inputMessage.getBody(), type, pointer);
}
/**
* Extracts the streaming entities from the data buffers.
*
* @param dataBuffers the streaming body
* @param <T> the element type to decode to
* @return the {@link Flux} of the entities being streamed
*/
private <T> Flux<T> extract(Flux<DataBuffer> dataBuffers, ResolvableType type, JsonPointer pointer) {
try {
var factory = mapper.getFactory();
var parser = factory.createNonBlockingByteBufferParser();
var feeder = (ByteBufferFeeder) parser.getNonBlockingInputFeeder();
Supplier<TokenBuffer> tokenBufferSupplier = () -> new TokenBuffer(parser, mapper.getDeserializationContext());
var tokenizer = new StreamingTokenizer(parser, feeder, tokenBufferSupplier, pointer);
return dataBuffers.concatMap(tokenizer::tokenize)
.concatWith(tokenizer.endOfInput())
.map(deserialize(mapper, type));
} catch (Exception exception) {
return Flux.error(exception);
}
}
/**
* Returns a function to deserialize the buffered tokens into an entity
*
* @param mapper the {@link ObjectMapper} used to deserialize the entities
* @param <T> the element type to decode to
* @return the {@link Function} that deserializes each {@link TokenBuffer}
*/
private <T> Function<TokenBuffer, T> deserialize(ObjectMapper mapper, ResolvableType type) {
var javaType = mapper.getTypeFactory().constructType(type.getType());
return tokenBuffer -> {
try (var parser = tokenBuffer.asParser()) {
return mapper.readValue(parser, javaType);
} catch (IOException exception) {
throw new UncheckedIOException(exception);
}
};
}
/**
* Helper state machine class that tracks the current state and position as it parses through
* the {@link JsonToken} extracted from the {@link JsonParser} and fills a {@link TokenBuffer}
* with the complete graph for each entity to be deserialized.
*/
private static final class StreamingTokenizer {
private final JsonParser parser;
private final ByteBufferFeeder feeder;
private final Supplier<TokenBuffer> tokenBufferSupplier;
private final String[] segments;
private final String[] actual;
private int depth;
private ParserState state = ParserState.BEFORE_STREAM;
private TokenBuffer currentTokenBuffer;
StreamingTokenizer(JsonParser parser,
ByteBufferFeeder feeder,
Supplier<TokenBuffer> tokenBufferSupplier,
JsonPointer pointer) {
this.parser = parser;
this.feeder = feeder;
this.tokenBufferSupplier = tokenBufferSupplier;
this.segments = pointerToSegments(pointer);
this.actual = new String[segments.length];
this.depth = -1;
}
/**
* Converts the {@link JsonPointer} into an array of the path segments to make it easier
* to detect depth and compare the current location within the {@link JsonParser}.
*
* @param pointer the pointer to a node within the JSON document
* @return an array of the segments
*/
private String[] pointerToSegments(JsonPointer pointer) {
var list = new ArrayList<String>();
var current = pointer;
while (current != null) {
var property = current.getMatchingProperty();
if (property != null && !property.isBlank()) {
list.add(property);
}
current = current.tail();
}
return list.toArray(new String[0]);
}
/**
* Feeds the current {@link DataBuffer} to the {@link JsonParser} and parses through parsed tokens
*
* @param dataBuffer the data buffer
* @return a {@link Flux} of the {@link TokenBuffer} of any object graphs to be deserialized
*/
Flux<TokenBuffer> tokenize(DataBuffer dataBuffer) {
return Flux.fromIterable(dataBuffer::readableByteBuffers)
.concatMap(byteBuffer -> {
try {
feeder.feedInput(byteBuffer);
return parseTokens();
} catch (IOException e) {
return Flux.error(e);
}
})
.doOnTerminate(() -> DataBufferUtils.release(dataBuffer));
}
/**
* Signals that the end of the message has been reached and to parse any remaining tokens
*
* @return a {@link Flux} of the {@link TokenBuffer} of any object graphs to be deserialized
*/
Flux<TokenBuffer> endOfInput() {
return Flux.defer(() -> {
feeder.endOfInput();
return parseTokens();
});
}
/**
* Parses any tokens currently available in the {@link JsonParser}
*
* @return a {@link Flux} of the {@link TokenBuffer} of any object graphs to be deserialized
*/
private Flux<TokenBuffer> parseTokens() {
return Flux.defer(() -> {
try {
List<TokenBuffer> tokenBuffers = Collections.emptyList();
while (!parser.isClosed()) {
var token = parser.nextToken();
if (token == null) {
token = parser.nextToken();
}
if (token == null || token == JsonToken.NOT_AVAILABLE) {
break;
}
tokenBuffers = parseToken(parser, token, tokenBuffers);
}
return Flux.fromIterable(tokenBuffers);
} catch (IOException exception) {
return Flux.error(exception);
}
});
}
/**
* Parses the current {@link JsonToken} from the {@link JsonParser} and tracks the current position and state
* within the JSON document.
*
* @param parser the json parser
* @param token the current token being parsed
* @param tokenBuffers the current list of token buffers
* @return a list of the {@link TokenBuffer} for any parsed object graphs to be deserialized
* @throws IOException an exception occurred parsing the JSON
*/
private List<TokenBuffer> parseToken(JsonParser parser, JsonToken token, List<TokenBuffer> tokenBuffers) throws IOException {
switch (state) {
case BEFORE_STREAM:
if (isAtStreamingStructStart(parser, token)) {
state = ParserState.WITHIN_STREAM;
// reset the node depth as we don't care about how deep we are into the
// entire JSON document anymore, only the depth from the current node
depth = 0;
}
break;
case WITHIN_STREAM:
if (token.isStructEnd() && depth == 0) {
// We've reached the end token after the graph of tokens we intend to deserialize
// so change the parser state so that we ignore the rest of the tokens
state = ParserState.AFTER_STREAM;
return appendTokenBuffer(tokenBuffers);
}
// Append the current token to the current token buffer
appendToken(parser);
// Track the start and end of any object or array nodes so that we can keep
// count of the nesting depth and know when we reach the end of the object
// graph we need to buffer
if (token.isStructStart()) {
depth += 1;
} else if (token.isStructEnd()) {
depth -= 1;
if (depth == 0) {
// We've reached the end token of the current object, append the
// current token buffer to the list of token buffers to be emitted
// in the flux to the subscriber
return appendTokenBuffer(tokenBuffers);
}
} else if (depth == 0 && token.isScalarValue()) {
// Array of scalars
return appendTokenBuffer(tokenBuffers);
}
break;
case AFTER_STREAM:
default:
// do nothing, ignore the remainder of the tokens
break;
}
return tokenBuffers;
}
/**
* Determines if the current {@link JsonToken} is at the position specified by the {@link JsonPointer}.
*
* @param parser the json parser
* @param token the current token being parsed
* @return {@code true} if the current token is at the position specified by the {@link JsonPointer}
* @throws IOException an exception occurred parsing the JSON
*/
private boolean isAtStreamingStructStart(JsonParser parser, JsonToken token) throws IOException {
if (token.isStructStart()) {
depth += 1;
if (depth > 0 && depth <= segments.length) {
// record the name of the current node to the array of actual path segments
var name = parser.currentName();
actual[depth - 1] = name;
if (depth == segments.length) {
// if the current depth is the same as the expected depth then check whether
// or not the path segments match, indicating that we're currently at the node
// indicated by the JsonPointer
return compareSegments(segments, actual);
}
}
}
return false;
}
/**
* Compares the expected JSON path segments with the current position of the {@link JsonParser} to
* determine if the parser is currently at the position specified by the {@link JsonPointer}.
*
* @param segments the expected JSON path segments
* @param actual the current JSON path segments
* @return {@code true} if the segments match; otherwise, {@code false}
*/
private boolean compareSegments(String[] segments, String[] actual) {
for (int i = 0; i < segments.length; i++) {
var iSegment = segments[i];
String iActual = actual[i];
if (!iSegment.equals(iActual)) {
return false;
}
}
return true;
}
/**
* Appends the current token from the parser to the current {@link TokenBuffer}, creating one
* if necessary
*
* @param parser the JSON parser
* @throws IOException an exception occurred parsing the JSON
*/
private void appendToken(JsonParser parser) throws IOException {
if (currentTokenBuffer == null) {
currentTokenBuffer = tokenBufferSupplier.get();
}
currentTokenBuffer.copyCurrentEvent(parser);
}
/**
* Appends the current token buffer to the list of token buffers
*
* @param list the current list of token buffers
* @return a list containing the additional token buffer
*/
private List<TokenBuffer> appendTokenBuffer(List<TokenBuffer> list) {
var tokenBuffer = currentTokenBuffer;
currentTokenBuffer = null;
if (tokenBuffer != null) {
if (list.isEmpty()) {
return Collections.singletonList(tokenBuffer);
} else if (list.size() == 1) {
var newList = new ArrayList<TokenBuffer>(2);
newList.add(list.get(0));
newList.add(tokenBuffer);
return newList;
} else {
list.add(tokenBuffer);
return list;
}
}
return list;
}
/**
* The current state of the parser state machine.
*/
private enum ParserState {
/**
* Indicates that the {@link JsonParser} has not yet reached the
* node indicated by the {@link JsonPointer}
*/
BEFORE_STREAM,
/**
* Indicates that the {@link JsonParser} is currently within the
* object graph under the node indicated by the {@link JsonPointer}
*/
WITHIN_STREAM,
/**
* Indicates that the {@link JsonParser} has passed the node indicated
* by the {@link JsonPointer} and that the remaining payload will be ignored
*/
AFTER_STREAM
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment