Created
January 28, 2021 13:21
-
-
Save petrosagg/977ce0b7e3941be69733be011121210a to your computer and use it in GitHub Desktop.
Computed an arranged collection from a stream of upserts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use differential_dataflow::AsCollection; | |
| use differential_dataflow::operators::arrange::ArrangeByKey; | |
| use timely::dataflow::operators::Input; | |
| use differential_dataflow::operators::reduce::ReduceCore; | |
| use differential_dataflow::trace::implementations::ord::OrdValSpine as DefaultKeyTrace; | |
| fn main() { | |
| timely::execute_from_args(std::env::args(), move |worker| { | |
| let mut input = timely::dataflow::InputHandle::<_, ((usize, Option<String>), _, _)>::new(); | |
| let mut probe = timely::dataflow::ProbeHandle::new(); | |
| worker.dataflow::<u64, _, _>(|scope| { | |
| let msg_stream = scope.input_from(&mut input); | |
| let arranged = msg_stream | |
| .as_collection() | |
| .arrange_by_key() | |
| .reduce_core::<_,DefaultKeyTrace<_, String, _, i64>>("UpsertReduce", |_key, vals, output, updates| { | |
| if !vals.is_empty() { // there is either an upsert or delete for this key | |
| updates.extend(output.drain(..).map(|(x,d)| (x,-d))); // retract the previous output | |
| match vals[0].0 { | |
| Some(s) => updates.push((s.clone(), 1)), // set the new value | |
| None => (), // nothing to do, we've already retraced the previous output | |
| } | |
| } | |
| }) | |
| .as_collection(|k, v| (k.clone(), v.clone())) | |
| .inspect(|x| println!("Observed: {:?}", x)) | |
| .probe_with(&mut probe); | |
| }); | |
| // Simulate what AltNeu does with even/odd timestamps | |
| // Each update is added in odd times and retracted in even. | |
| // | |
| // This is to keep the input arrangement of reduce_core pratically empty | |
| // Upsert(1, petros) | |
| input.send(((1, Some("petros".to_string())), 1, 1)); | |
| input.send(((1, Some("petros".to_string())), 2, -1)); | |
| // Upsert(1, petros2) | |
| input.send(((1, Some("petros2".to_string())), 3, 1)); | |
| input.send(((1, Some("petros2".to_string())), 4, -1)); | |
| // Delete(1) | |
| input.send(((1, None), 5, 1)); | |
| input.send(((1, None), 6, -1)); | |
| }).expect("Computation terminated abnormally"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment