Created
September 23, 2025 10:56
-
-
Save ardiereally/b0dab02c10a5488b0772d6a7bc96fcf7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.rdas.lld.streamjoin; | |
| import lombok.AllArgsConstructor; | |
| import lombok.Data; | |
| import java.util.Iterator; | |
| import java.util.List; | |
| import java.util.Map; | |
| import java.util.Objects; | |
| import java.util.concurrent.*; | |
| import java.util.stream.Collectors; | |
| @Data | |
| @AllArgsConstructor | |
| class StreamData { | |
| private String audioDevice; | |
| private Long utcMinute; | |
| // data | |
| private Integer bitRate, volume; | |
| } | |
| @Data | |
| @AllArgsConstructor | |
| class AudioDeviceData { | |
| private Integer bitRate, volume; | |
| } | |
| public class AudioStreamJoiner { | |
| private Map<String, Map<Long, AudioDeviceData>> buffer; | |
| private ExecutorService streamReader; | |
| long bufferEntries; | |
| public AudioStreamJoiner(BlockingQueue<StreamData> audioStream, BlockingQueue<StreamData> bitRateStream, long bufferEntries) { | |
| buffer = new ConcurrentHashMap<>(); | |
| streamReader = Executors.newFixedThreadPool(2); // only 2 streams | |
| streamReader.submit(() -> consumeFromQueue(audioStream)); | |
| streamReader.submit(() -> consumeFromQueue(bitRateStream)); | |
| this.bufferEntries = bufferEntries; | |
| } | |
| public void joinStreams() { | |
| while (true) { | |
| // scan the buffer & see if we have any finished joins | |
| List<StreamData> joinedData = scanForComplete(); | |
| // emit finished entries & remove from buffer | |
| joinedData.forEach(System.out::println); | |
| // check buffer size & remove older entries | |
| trimBuffer(); | |
| } | |
| } | |
| private List<StreamData> scanForComplete() { | |
| return buffer.entrySet() | |
| .stream() | |
| .flatMap(outer -> | |
| outer.getValue().entrySet().stream() | |
| .filter(this::isComplete) // filter criterion | |
| .map(inner -> new StreamData(outer.getKey(), inner.getKey(), inner.getValue().getBitRate(), inner.getValue().getVolume())) | |
| ) | |
| .collect(Collectors.toList()); | |
| } | |
| private boolean isComplete(Map.Entry<Long, AudioDeviceData> longAudioDeviceDataEntry) { | |
| var audioDeviceData = longAudioDeviceDataEntry.getValue(); | |
| return Objects.nonNull(audioDeviceData.getVolume()) && Objects.nonNull(audioDeviceData.getBitRate()); | |
| } | |
| private void trimBuffer() { | |
| long currentSize = findCurrentSize(); | |
| if (currentSize > this.bufferEntries) { | |
| removeOlderEntries(); | |
| } | |
| } | |
| private void removeOlderEntries() { | |
| int count = 0; | |
| Iterator<Map.Entry<String, Map<Long, AudioDeviceData>>> outerIt = buffer.entrySet().iterator(); | |
| while (outerIt.hasNext()) { | |
| Map.Entry<String, Map<Long, AudioDeviceData>> outerEntry = outerIt.next(); | |
| Map<Long, AudioDeviceData> inner = outerEntry.getValue(); | |
| Iterator<Map.Entry<Long, AudioDeviceData>> innerIt = inner.entrySet().iterator(); | |
| while (innerIt.hasNext()) { | |
| if (count >= this.bufferEntries) { | |
| // remove remaining elements | |
| innerIt.remove(); | |
| } else { | |
| innerIt.next(); | |
| count++; | |
| } | |
| } | |
| } | |
| } | |
| private long findCurrentSize() { | |
| return buffer.values() | |
| .stream() | |
| .mapToInt(Map::size) | |
| .sum(); | |
| } | |
| private void consumeFromQueue(BlockingQueue<StreamData> stream) { | |
| while (true) { | |
| try { | |
| var dataPoint = stream.poll(1000, TimeUnit.MILLISECONDS); | |
| if (dataPoint != null) { | |
| // add to buffer | |
| if (!buffer.containsKey(dataPoint.getAudioDevice())) { | |
| buffer.put(dataPoint.getAudioDevice(), new ConcurrentHashMap<>()); | |
| } | |
| var dataForDevice = buffer.get(dataPoint.getAudioDevice()); | |
| if (dataForDevice.containsKey(dataPoint.getUtcMinute())) { | |
| var existingData = dataForDevice.get(dataPoint.getUtcMinute()); | |
| if (dataPoint.getVolume() != null) { | |
| existingData.setVolume(dataPoint.getVolume()); | |
| } | |
| if (dataPoint.getBitRate() != null) { | |
| existingData.setBitRate(dataPoint.getBitRate()); | |
| } | |
| } else { | |
| dataForDevice.put( | |
| dataPoint.getUtcMinute(), | |
| new AudioDeviceData( | |
| dataPoint.getBitRate(), | |
| dataPoint.getVolume()) | |
| ); | |
| } | |
| } | |
| } catch (InterruptedException e) { | |
| // todo handle this | |
| throw new RuntimeException(e); | |
| } | |
| } | |
| } | |
| public static void main(String[] args) { | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment