Skip to content

Commit

Permalink
rebase and update
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Oct 1, 2024
1 parent 101ef07 commit f46d8a9
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 55 deletions.
2 changes: 1 addition & 1 deletion timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl<T> Antichain<T> {
/// let mut frontier = Antichain::from_elem(2);
/// assert_eq!(frontier.elements(), &[2]);
///```
#[inline] pub fn elements(&self) -> &Vec<T> { &self.elements }
#[inline] pub fn elements(&self) -> &[T] { &self[..] }

/// Reveals the elements in the antichain.
///
Expand Down
96 changes: 42 additions & 54 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::cmp::Reverse;

use columnar::{Columnar, Len, Index};
use columnar::ColumnVec;
use columnar::{Len, Index};
use columnar::Vecs;

use crate::progress::Timestamp;
use crate::progress::{Source, Target};
Expand All @@ -87,56 +87,43 @@ use crate::progress::frontier::{Antichain, MutableAntichain};
use crate::progress::timestamp::PathSummary;


use vec_antichain::VecAntichain;
use antichains::Antichains;

/// A stand-in for `Vec<Antichain<T>>`.
mod vec_antichain {
mod antichains {

use columnar::{Columnar, Len, Index, IndexMut};
use columnar::{ColumnVec, Slice};
use columnar::{Len, Index, Push};
use columnar::Vecs;

use crate::progress::Antichain;

#[derive(Clone, Debug)]
pub struct VecAntichain<T> (ColumnVec<T>);
pub struct Antichains<T> (Vecs<Vec<T>>);

impl<TC: Default> Default for VecAntichain<TC> {
impl<T> Default for Antichains<T> {
fn default() -> Self {
Self (Default::default())
}
}

impl<TC> Len for VecAntichain<TC> {
impl<T> Len for Antichains<T> {
#[inline(always)] fn len(&self) -> usize { self.0.len() }
}

impl<TC> Index for VecAntichain<TC> {
type Index<'a> = Slice<&'a TC> where TC: 'a;

impl<T> Push<Antichain<T>> for Antichains<T> {
#[inline(always)]
fn index(&self, index: usize) -> Self::Index<'_> {
self.0.index(index)
fn push(&mut self, item: Antichain<T>) {
columnar::Push::extend(&mut self.0.values, item);
self.0.bounds.push(self.0.values.len());
}
}
impl<TC> IndexMut for VecAntichain<TC> {
type IndexMut<'a> = Slice<&'a mut TC> where TC: 'a;

#[inline(always)]
fn index_mut(&mut self, index: usize) -> Self::IndexMut<'_> {
self.0.index_mut(index)
}
}
impl<'a, T> Index for &'a Antichains<T> {
type Ref = <&'a Vecs<Vec<T>> as Index>::Ref;

impl<T, TC: Columnar<T>> Columnar<Antichain<T>> for VecAntichain<TC> {
#[inline(always)]
fn copy(&mut self, item: &Antichain<T>) {
self.0.copy(item.elements());
}
fn clear(&mut self) {
unimplemented!()
}
fn heap_size(&self) -> (usize, usize) {
unimplemented!()
fn get(&self, index: usize) -> Self::Ref {
(&self.0).get(index)
}
}
}
Expand Down Expand Up @@ -191,7 +178,7 @@ pub struct Builder<T: Timestamp> {
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `get_internal_summary`, as if we simply appended
/// all of the summaries for the hosted nodes.
nodes: ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
nodes: Vecs<Vecs<Antichains<T::Summary>>>,
/// Direct connections from sources to targets.
///
/// Edges do not affect timestamps, so we only need to know the connectivity.
Expand Down Expand Up @@ -223,6 +210,7 @@ impl<T: Timestamp> Builder<T> {

assert_eq!(self.nodes.len(), index);

use columnar::Push;
self.nodes.push(summary);
self.edges.push(Vec::new());
self.shape.push((0, 0));
Expand Down Expand Up @@ -328,7 +316,7 @@ impl<T: Timestamp> Builder<T> {

// Load edges as default summaries.
for (index, ports) in self.edges.iter().enumerate() {
for (output, targets) in ports.iter().enumerate() {
for (output, targets) in (*ports).iter().enumerate() {
let source = Location::new_source(index, output);
in_degree.entry(source).or_insert(0);
for &target in targets.iter() {
Expand All @@ -339,13 +327,13 @@ impl<T: Timestamp> Builder<T> {
}

// Load default intra-node summaries.
for (index, summary) in self.nodes.iter().enumerate() {
for (input, outputs) in summary.iter().enumerate() {
for (index, summary) in (&self.nodes).into_iter().enumerate() {
for (input, outputs) in summary.into_iter().enumerate() {
let target = Location::new_target(index, input);
in_degree.entry(target).or_insert(0);
for (output, summaries) in outputs.iter().enumerate() {
for (output, summaries) in outputs.into_iter().enumerate() {
let source = Location::new_source(index, output);
for summary in summaries.iter() {
for summary in summaries.into_iter() {
if summary == &Default::default() {
*in_degree.entry(source).or_insert(0) += 1;
}
Expand Down Expand Up @@ -380,9 +368,9 @@ impl<T: Timestamp> Builder<T> {
}
},
Port::Target(port) => {
for (output, summaries) in self.nodes.index(node).index(port).iter().enumerate() {
for (output, summaries) in (&self.nodes).get(node).get(port).into_iter().enumerate() {
let source = Location::new_source(node, output);
for summary in summaries.iter() {
for summary in summaries.into_iter() {
if summary == &Default::default() {
*in_degree.get_mut(&source).unwrap() -= 1;
if in_degree[&source] == 0 {
Expand Down Expand Up @@ -419,12 +407,12 @@ pub struct Tracker<T:Timestamp> {
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `get_internal_summary`, as if we simply appended
/// all of the summaries for the hosted nodes.
nodes: ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
nodes: Vecs<Vecs<Antichains<T::Summary>>>,
/// Direct connections from sources to targets.
///
/// Edges do not affect timestamps, so we only need to know the connectivity.
/// Indexed by operator index then output port.
edges: ColumnVec<ColumnVec<Vec<Target>>>,
edges: Vecs<Vecs<Vec<Target>>>,

// TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
// It seems we should be able to flatten most of these so that there are a few allocations
Expand Down Expand Up @@ -602,7 +590,8 @@ impl<T:Timestamp> Tracker<T> {
let scope_outputs = builder.shape[0].0;
let output_changes = vec![ChangeBatch::new(); scope_outputs];

let mut edges: ColumnVec<ColumnVec<Vec<Target>>> = Default::default();
use columnar::Push;
let mut edges: Vecs<Vecs<Vec<Target>>> = Default::default();
for edge in builder.edges {
edges.push(edge);
}
Expand Down Expand Up @@ -726,10 +715,10 @@ impl<T:Timestamp> Tracker<T> {
.update_iter(Some((time, diff)));

for (time, diff) in changes {
let nodes = &self.nodes.index(location.node).index(port_index);
for (output_port, summaries) in nodes.iter().enumerate() {
let nodes = &(&self.nodes).get(location.node).get(port_index);
for (output_port, summaries) in nodes.into_iter().enumerate() {
let source = Location { node: location.node, port: Port::Source(output_port) };
for summary in summaries.iter() {
for summary in summaries.into_iter() {
if let Some(new_time) = summary.results_in(&time) {
self.worklist.push(Reverse((new_time, source, diff)));
}
Expand All @@ -749,7 +738,7 @@ impl<T:Timestamp> Tracker<T> {
.update_iter(Some((time, diff)));

for (time, diff) in changes {
for new_target in self.edges.index(location.node).index(port_index).iter() {
for new_target in (&self.edges).get(location.node).get(port_index).into_iter() {
self.worklist.push(Reverse((
time.clone(),
Location::from(*new_target),
Expand Down Expand Up @@ -801,14 +790,14 @@ impl<T:Timestamp> Tracker<T> {
/// Graph locations may be missing from the output, in which case they have no
/// paths to scope outputs.
fn summarize_outputs<T: Timestamp>(
nodes: &ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
nodes: &Vecs<Vecs<Antichains<T::Summary>>>,
edges: &Vec<Vec<Vec<Target>>>,
) -> HashMap<Location, Vec<Antichain<T::Summary>>>
{
// A reverse edge map, to allow us to walk back up the dataflow graph.
let mut reverse = HashMap::new();
for (node, outputs) in edges.iter().enumerate() {
for (output, targets) in outputs.iter().enumerate() {
for (node, outputs) in columnar::Index::into_iter(edges).enumerate() {
for (output, targets) in columnar::Index::into_iter(outputs).enumerate() {
for target in targets.iter() {
reverse.insert(
Location::from(*target),
Expand All @@ -822,10 +811,9 @@ fn summarize_outputs<T: Timestamp>(
let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new();

let outputs =
edges
.iter()
.flat_map(|x| x.iter())
.flat_map(|x| x.iter())
columnar::Index::into_iter(edges)
.flat_map(|x| columnar::Index::into_iter(x))
.flat_map(|x| columnar::Index::into_iter(x))
.filter(|target| target.node == 0);

// The scope may have no outputs, in which case we can do no work.
Expand All @@ -843,7 +831,7 @@ fn summarize_outputs<T: Timestamp>(
Port::Source(output_port) => {

// Consider each input port of the associated operator.
for (input_port, summaries) in nodes.index(location.node).iter().enumerate() {
for (input_port, summaries) in nodes.get(location.node).into_iter().enumerate() {

// Determine the current path summaries from the input port.
let location = Location { node: location.node, port: Port::Target(input_port) };
Expand All @@ -855,7 +843,7 @@ fn summarize_outputs<T: Timestamp>(
while antichains.len() <= output { antichains.push(Antichain::new()); }

// Combine each operator-internal summary to the output with `summary`.
for operator_summary in summaries.index(output_port).iter() {
for operator_summary in summaries.get(output_port).into_iter() {
if let Some(combined) = operator_summary.followed_by(&summary) {
if antichains[output].insert(combined.clone()) {
worklist.push_back((location, output, combined));
Expand Down

0 comments on commit f46d8a9

Please sign in to comment.