Skip to content

Instantly share code, notes, and snippets.

@petrosagg
Created January 28, 2021 13:21
Show Gist options
  • Select an option

  • Save petrosagg/977ce0b7e3941be69733be011121210a to your computer and use it in GitHub Desktop.

Select an option

Save petrosagg/977ce0b7e3941be69733be011121210a to your computer and use it in GitHub Desktop.
Computed an arranged collection from a stream of upserts
use differential_dataflow::AsCollection;
use differential_dataflow::operators::arrange::ArrangeByKey;
use timely::dataflow::operators::Input;
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultKeyTrace;
fn main() {
timely::execute_from_args(std::env::args(), move |worker| {
let mut input = timely::dataflow::InputHandle::<_, ((usize, Option<String>), _, _)>::new();
let mut probe = timely::dataflow::ProbeHandle::new();
worker.dataflow::<u64, _, _>(|scope| {
let msg_stream = scope.input_from(&mut input);
let arranged = msg_stream
.as_collection()
.arrange_by_key()
.reduce_core::<_,DefaultKeyTrace<_, String, _, i64>>("UpsertReduce", |_key, vals, output, updates| {
if !vals.is_empty() { // there is either an upsert or delete for this key
updates.extend(output.drain(..).map(|(x,d)| (x,-d))); // retract the previous output
match vals[0].0 {
Some(s) => updates.push((s.clone(), 1)), // set the new value
None => (), // nothing to do, we've already retraced the previous output
}
}
})
.as_collection(|k, v| (k.clone(), v.clone()))
.inspect(|x| println!("Observed: {:?}", x))
.probe_with(&mut probe);
});
// Simulate what AltNeu does with even/odd timestamps
// Each update is added in odd times and retracted in even.
//
// This is to keep the input arrangement of reduce_core pratically empty
// Upsert(1, petros)
input.send(((1, Some("petros".to_string())), 1, 1));
input.send(((1, Some("petros".to_string())), 2, -1));
// Upsert(1, petros2)
input.send(((1, Some("petros2".to_string())), 3, 1));
input.send(((1, Some("petros2".to_string())), 4, -1));
// Delete(1)
input.send(((1, None), 5, 1));
input.send(((1, None), 6, -1));
}).expect("Computation terminated abnormally");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment