Skip to content

Instantly share code, notes, and snippets.

@ardiereally
Created September 23, 2025 10:56
Show Gist options
  • Select an option

  • Save ardiereally/b0dab02c10a5488b0772d6a7bc96fcf7 to your computer and use it in GitHub Desktop.

Select an option

Save ardiereally/b0dab02c10a5488b0772d6a7bc96fcf7 to your computer and use it in GitHub Desktop.
package com.rdas.lld.streamjoin;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Data
@AllArgsConstructor
class StreamData {
private String audioDevice;
private Long utcMinute;
// data
private Integer bitRate, volume;
}
@Data
@AllArgsConstructor
class AudioDeviceData {
private Integer bitRate, volume;
}
public class AudioStreamJoiner {
private Map<String, Map<Long, AudioDeviceData>> buffer;
private ExecutorService streamReader;
long bufferEntries;
public AudioStreamJoiner(BlockingQueue<StreamData> audioStream, BlockingQueue<StreamData> bitRateStream, long bufferEntries) {
buffer = new ConcurrentHashMap<>();
streamReader = Executors.newFixedThreadPool(2); // only 2 streams
streamReader.submit(() -> consumeFromQueue(audioStream));
streamReader.submit(() -> consumeFromQueue(bitRateStream));
this.bufferEntries = bufferEntries;
}
public void joinStreams() {
while (true) {
// scan the buffer & see if we have any finished joins
List<StreamData> joinedData = scanForComplete();
// emit finished entries & remove from buffer
joinedData.forEach(System.out::println);
// check buffer size & remove older entries
trimBuffer();
}
}
private List<StreamData> scanForComplete() {
return buffer.entrySet()
.stream()
.flatMap(outer ->
outer.getValue().entrySet().stream()
.filter(this::isComplete) // filter criterion
.map(inner -> new StreamData(outer.getKey(), inner.getKey(), inner.getValue().getBitRate(), inner.getValue().getVolume()))
)
.collect(Collectors.toList());
}
private boolean isComplete(Map.Entry<Long, AudioDeviceData> longAudioDeviceDataEntry) {
var audioDeviceData = longAudioDeviceDataEntry.getValue();
return Objects.nonNull(audioDeviceData.getVolume()) && Objects.nonNull(audioDeviceData.getBitRate());
}
private void trimBuffer() {
long currentSize = findCurrentSize();
if (currentSize > this.bufferEntries) {
removeOlderEntries();
}
}
private void removeOlderEntries() {
int count = 0;
Iterator<Map.Entry<String, Map<Long, AudioDeviceData>>> outerIt = buffer.entrySet().iterator();
while (outerIt.hasNext()) {
Map.Entry<String, Map<Long, AudioDeviceData>> outerEntry = outerIt.next();
Map<Long, AudioDeviceData> inner = outerEntry.getValue();
Iterator<Map.Entry<Long, AudioDeviceData>> innerIt = inner.entrySet().iterator();
while (innerIt.hasNext()) {
if (count >= this.bufferEntries) {
// remove remaining elements
innerIt.remove();
} else {
innerIt.next();
count++;
}
}
}
}
private long findCurrentSize() {
return buffer.values()
.stream()
.mapToInt(Map::size)
.sum();
}
private void consumeFromQueue(BlockingQueue<StreamData> stream) {
while (true) {
try {
var dataPoint = stream.poll(1000, TimeUnit.MILLISECONDS);
if (dataPoint != null) {
// add to buffer
if (!buffer.containsKey(dataPoint.getAudioDevice())) {
buffer.put(dataPoint.getAudioDevice(), new ConcurrentHashMap<>());
}
var dataForDevice = buffer.get(dataPoint.getAudioDevice());
if (dataForDevice.containsKey(dataPoint.getUtcMinute())) {
var existingData = dataForDevice.get(dataPoint.getUtcMinute());
if (dataPoint.getVolume() != null) {
existingData.setVolume(dataPoint.getVolume());
}
if (dataPoint.getBitRate() != null) {
existingData.setBitRate(dataPoint.getBitRate());
}
} else {
dataForDevice.put(
dataPoint.getUtcMinute(),
new AudioDeviceData(
dataPoint.getBitRate(),
dataPoint.getVolume())
);
}
}
} catch (InterruptedException e) {
// todo handle this
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment