Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant elements calls. #377

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ pub mod source {
let capability =
capability.expect("Changes occurred, without surfacing a capability");
let mut changes = ChangeBatch::new();
changes.extend(lower_bound.elements().iter().map(|t| (t.clone(), 1)));
changes.extend(reported_frontier.elements().iter().map(|t| (t.clone(), -1)));
changes.extend(lower_bound.iter().map(|t| (t.clone(), 1)));
changes.extend(reported_frontier.iter().map(|t| (t.clone(), -1)));
let mut frontier_session = frontier.session(&capability);
for peer in 0..workers {
frontier_session.give((peer, changes.clone()));
Expand Down Expand Up @@ -724,7 +724,7 @@ pub mod sink {

// Announce the lower bound, upper bound, and timestamp counts.
let progress = Progress {
lower: frontier.elements().to_vec(),
lower: frontier.to_vec(),
upper: new_frontier.to_vec(),
counts: announce,
};
Expand Down
14 changes: 7 additions & 7 deletions src/lattice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ implement_lattice!((), ());
/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
/// let f2 = &[Product::new(4, 6)];
/// let join = antichain_join(f1, f2);
/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
/// assert_eq!(&*join, &[Product::new(4, 7), Product::new(5, 6)]);
/// # }
/// ```
pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
Expand Down Expand Up @@ -302,7 +302,7 @@ pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
/// let f2 = &[Product::new(4, 6)];
/// antichain_join_into(f1, f2, &mut join);
/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
/// assert_eq!(&*join, &[Product::new(4, 7), Product::new(5, 6)]);
/// # }
/// ```
pub fn antichain_join_into<T: Lattice>(one: &[T], other: &[T], upper: &mut Antichain<T>) {
Expand Down Expand Up @@ -334,7 +334,7 @@ pub fn antichain_join_into<T: Lattice>(one: &[T], other: &[T], upper: &mut Antic
/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
/// let f2 = &[Product::new(4, 6)];
/// let meet = antichain_meet(f1, f2);
/// assert_eq!(&*meet.elements(), &[Product::new(3, 7), Product::new(4, 6)]);
/// assert_eq!(&*meet, &[Product::new(3, 7), Product::new(4, 6)]);
/// # }
/// ```
pub fn antichain_meet<T: Lattice+Clone>(one: &[T], other: &[T]) -> Antichain<T> {
Expand All @@ -351,19 +351,19 @@ pub fn antichain_meet<T: Lattice+Clone>(one: &[T], other: &[T]) -> Antichain<T>
impl<T: Lattice+Clone> Lattice for Antichain<T> {
fn join(&self, other: &Self) -> Self {
let mut upper = Antichain::new();
for time1 in self.elements().iter() {
for time2 in other.elements().iter() {
for time1 in self.iter() {
for time2 in other.iter() {
upper.insert(time1.join(time2));
}
}
upper
}
fn meet(&self, other: &Self) -> Self {
let mut upper = Antichain::new();
for time1 in self.elements().iter() {
for time1 in self.iter() {
upper.insert(time1.clone());
}
for time2 in other.elements().iter() {
for time2 in other.iter() {
upper.insert(time2.clone());
}
upper
Expand Down
10 changes: 5 additions & 5 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,12 +615,12 @@ where
// and feed this to the trace agent (but not along the timely output).

// If there is at least one capability not in advance of the input frontier ...
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
if capabilities.iter().any(|c| !input.frontier().less_equal(c.time())) {

let mut upper = Antichain::new(); // re-used allocation for sealing batches.

// For each capability not in advance of the input frontier ...
for (index, capability) in capabilities.elements().iter().enumerate() {
for (index, capability) in capabilities.iter().enumerate() {

if !input.frontier().less_equal(capability.time()) {

Expand All @@ -631,7 +631,7 @@ where
for time in input.frontier().frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1) .. ] {
for other_capability in &capabilities[(index + 1) .. ] {
upper.insert(other_capability.time().clone());
}

Expand All @@ -641,7 +641,7 @@ where
writer.insert(batch.clone(), Some(capability.time().clone()));

// send the batch to downstream consumers, empty or not.
output.session(&capabilities.elements()[index]).give(batch);
output.session(&capabilities[index]).give(batch);
}
}

Expand All @@ -652,7 +652,7 @@ where

let mut new_capabilities = Antichain::new();
for time in batcher.frontier().iter() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
if let Some(capability) = capabilities.iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
}
else {
Expand Down
10 changes: 5 additions & 5 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,12 @@ where
if prev_frontier.borrow() != input.frontier().frontier() {

// If there is at least one capability not in advance of the input frontier ...
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
if capabilities.iter().any(|c| !input.frontier().less_equal(c.time())) {

let mut upper = Antichain::new(); // re-used allocation for sealing batches.

// For each capability not in advance of the input frontier ...
for (index, capability) in capabilities.elements().iter().enumerate() {
for (index, capability) in capabilities.iter().enumerate() {

if !input.frontier().less_equal(capability.time()) {

Expand All @@ -226,7 +226,7 @@ where
for time in input.frontier().frontier().iter() {
upper.insert(time.clone());
}
for other_capability in &capabilities.elements()[(index + 1) .. ] {
for other_capability in &capabilities[(index + 1) .. ] {
upper.insert(other_capability.time().clone());
}

Expand Down Expand Up @@ -299,7 +299,7 @@ where

// Communicate `batch` to the arrangement and the stream.
writer.insert(batch.clone(), Some(capability.time().clone()));
output.session(&capabilities.elements()[index]).give(batch);
output.session(&capabilities[index]).give(batch);
}
}

Expand All @@ -310,7 +310,7 @@ where

let mut new_capabilities = Antichain::new();
if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() {
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
if let Some(capability) = capabilities.iter().find(|c| c.time().less_equal(time)) {
new_capabilities.insert(capability.delayed(time));
}
else {
Expand Down
2 changes: 1 addition & 1 deletion src/trace/description.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub struct Description<Time> {
impl<Time: PartialOrder+Clone> Description<Time> {
/// Returns a new description from its component parts.
pub fn new(lower: Antichain<Time>, upper: Antichain<Time>, since: Antichain<Time>) -> Self {
assert!(lower.elements().len() > 0); // this should always be true.
assert!(lower.len() > 0); // this should always be true.
// assert!(upper.len() > 0); // this may not always be true.
Description {
lower,
Expand Down
6 changes: 3 additions & 3 deletions src/trace/wrappers/enter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ where
{
/// Makes a new batch wrapper
pub fn make_from(batch: B) -> Self {
let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
let lower: Vec<_> = batch.description().lower().iter().map(|x| TInner::to_inner(x.clone())).collect();
let upper: Vec<_> = batch.description().upper().iter().map(|x| TInner::to_inner(x.clone())).collect();
let since: Vec<_> = batch.description().since().iter().map(|x| TInner::to_inner(x.clone())).collect();

BatchEnter {
batch: batch,
Expand Down
6 changes: 3 additions & 3 deletions src/trace/wrappers/enter_at.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ where
{
/// Makes a new batch wrapper
pub fn make_from(batch: B, logic: F) -> Self {
let lower: Vec<_> = batch.description().lower().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
let upper: Vec<_> = batch.description().upper().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
let since: Vec<_> = batch.description().since().elements().iter().map(|x| TInner::to_inner(x.clone())).collect();
let lower: Vec<_> = batch.description().lower().iter().map(|x| TInner::to_inner(x.clone())).collect();
let upper: Vec<_> = batch.description().upper().iter().map(|x| TInner::to_inner(x.clone())).collect();
let since: Vec<_> = batch.description().since().iter().map(|x| TInner::to_inner(x.clone())).collect();

BatchEnter {
batch,
Expand Down