Skip to content

Instantly share code, notes, and snippets.

@jj11hh
Last active June 26, 2020 08:35
Show Gist options
  • Select an option

  • Save jj11hh/8089b5578c5bb1a4f0b7e4929948341c to your computer and use it in GitHub Desktop.

Select an option

Save jj11hh/8089b5578c5bb1a4f0b7e4929948341c to your computer and use it in GitHub Desktop.
StreamSummary data structure for Space Saving algorithm
#ifndef STREAMSUMMARY_H
#define STREAMSUMMARY_H
#include <list>
#include <unordered_map>
#include <vector>
#include <tuple>
#include <string>
#include <algorithm>
#include <cstdlib>
// See https://www.cse.ust.hk/~raywong/comp5331/References/EfficientComputationOfFrequentAndTop-kElementsInDataStreams.pdf
template<typename T>
class StreamSummary
{
struct Bucket;
struct Element {
T data;
typename std::list<Bucket>::iterator bucket;
size_t over_est;
Element (T data): data(data){}
};
struct Bucket {
size_t count;
std::vector<T> elements;
};
std::list<Bucket> buckets;
std::unordered_map<T, Element> elements;
size_t max_element;
public:
StreamSummary(size_t max_element): max_element(max_element) {
}
// the Space Saving algorithm here
// Algorithm: Space-Saving(m counters, stream S)
// begin
// for each element, e, in S{
// If e is monitored,
// increment the counter of e;
// else{
// let e_m be the element with least hits, min
// Replace e_m with e;
// Increment count_m;
// Assign ε_m the value min;
// }
// }// end for
// ·end;
void feed(T data){
auto old_one = elements.find(data);
// `if element is monitored'
if (old_one != elements.end()){
auto bucket = old_one->second.bucket;
size_t count = bucket->count;
auto next = std::next(bucket);
// `increment the counter of e'
if (next != buckets.end() && next->count == count + 1){
bucket->elements.erase(std::find(bucket->elements.begin(), bucket->elements.end(), data));
next->elements.push_back(data);
old_one->second.bucket = next;
}
else {
auto new_bucket = Bucket { count + 1, {} };
buckets.insert(next, new_bucket);
auto new_node = std::next(bucket);
bucket->elements.erase(std::find(bucket->elements.begin(), bucket->elements.end(), data));
new_node->elements.push_back(data);
old_one->second.bucket = new_node;
}
if (bucket->elements.size() == 0){
buckets.erase(bucket);
}
}
// `else'
// these is no such element, thus, it needs to be added.
else {
size_t over_est = 0;
// `let e_m be the element with least hits, min'
// `replace e_m with e'
// `Increment count_m'
// if there are too much elements, remove the least counted one
if (elements.size() >= max_element){
over_est = buckets.begin()->count;
auto *eles = &buckets.begin()->elements;
auto pick = rand() % eles->size();
auto swapped = (*eles)[pick];
eles->erase(eles->begin() + pick);
if (eles->size() == 0){
buckets.erase(buckets.begin());
}
elements.erase(elements.find(swapped));
}
// Add new element
if (buckets.begin() == buckets.end() || buckets.begin()->count != 1){
buckets.push_front(Bucket {1, {}});
}
buckets.begin()->elements.push_back(data);
auto new_element = Element(data);
new_element.bucket = buckets.begin();
// `Assign ε_m the value min'
new_element.over_est = over_est;
elements.insert(std::make_pair(data, new_element));
}
};
std::vector<std::pair<size_t, T>> top_k(size_t k) {
std::vector<std::pair<size_t, T>> result;
for (auto iter = buckets.end(); iter != buckets.begin(); iter --){
auto bucket = std::prev(iter);
for (auto &element :bucket->elements){
// Top K values already fetched, exit
if (k == 0) goto OUT;
result.push_back(std::make_pair(bucket->count, element));
k --;
}
}
OUT:
return result;
}
};
#endif // STREAMSUMMARY_H
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment