Skip to content

Commit

Permalink
Support Parallel Trace Expansion
Browse files Browse the repository at this point in the history
This puts in place a relatively straightfoward parallel algorithm for
trace expansion using go routines.  Given all the previous refactoring
to enable trace expansion to occurr out-of-order, this was relatively
easy to implement.  Performance improvements are not substantial, but
this is perhaps to be expected given that checking is more compute
heavy.
  • Loading branch information
DavePearce committed Jul 31, 2024
1 parent 8ee5b52 commit 8563f40
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 27 deletions.
6 changes: 5 additions & 1 deletion pkg/cmd/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ var checkCmd = &cobra.Command{
cfg.strict = !getFlag(cmd, "warn")
cfg.quiet = getFlag(cmd, "quiet")
cfg.padding.Right = getUint(cmd, "padding")
cfg.parallelExpansion = !getFlag(cmd, "sequential")
// TODO: support true ranges
cfg.padding.Left = cfg.padding.Right
// Parse constraints
Expand Down Expand Up @@ -74,6 +75,8 @@ type checkConfig struct {
// Specifies whether or not to report details of the failure (e.g. for
// debugging purposes).
report bool
// Perform trace expansion in parallel (or not)
parallelExpansion bool
}

// Check a given trace is consistently accepted (or rejected) at the different
Expand Down Expand Up @@ -153,7 +156,7 @@ func checkTraceWithLoweringDefault(cols []trace.RawColumn, hirSchema *hir.Schema
}

func checkTrace(cols []trace.RawColumn, schema sc.Schema, cfg checkConfig) (trace.Trace, []error) {
builder := sc.NewTraceBuilder(schema).Expand(cfg.expand)
builder := sc.NewTraceBuilder(schema).Expand(cfg.expand).Parallel(cfg.parallelExpansion)
//
for n := cfg.padding.Left; n <= cfg.padding.Right; n++ {
tr, errs := builder.Padding(n).Build(cols)
Expand Down Expand Up @@ -239,6 +242,7 @@ func init() {
checkCmd.Flags().BoolP("warn", "w", false, "report warnings instead of failing for certain errors"+
"(e.g. unknown columns in the trace)")
checkCmd.Flags().BoolP("quiet", "q", false, "suppress output (e.g. warnings)")
checkCmd.Flags().Bool("sequential", false, "perform sequential trace expansion")
checkCmd.Flags().Uint("padding", 0, "specify amount of (front) padding to apply")
checkCmd.Flags().Int("spillage", -1,
"specify amount of splillage to account for (where -1 indicates this should be inferred)")
Expand Down
158 changes: 133 additions & 25 deletions pkg/schema/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type TraceBuilder struct {
// NewTraceBuilder constructs a default trace builder. The idea is that this
// could then be customized as needed following the builder pattern.
func NewTraceBuilder(schema Schema) TraceBuilder {
return TraceBuilder{schema, true, 0, false}
return TraceBuilder{schema, true, 0, true}
}

// Expand updates a given builder configuration to perform trace expansion (or
Expand All @@ -48,6 +48,12 @@ func (tb TraceBuilder) Padding(padding uint) TraceBuilder {
return TraceBuilder{tb.schema, tb.expand, padding, tb.parallel}
}

// Parallel updates a given builder configuration to allow trace expansion to be
// performed concurrently (or not).
func (tb TraceBuilder) Parallel(parallel bool) TraceBuilder {
return TraceBuilder{tb.schema, tb.expand, tb.padding, parallel}
}

// Build takes the given builder configuration, along with a given set of input
// columns and constructs a trace.
func (tb TraceBuilder) Build(columns []trace.RawColumn) (trace.Trace, []error) {
Expand All @@ -61,7 +67,10 @@ func (tb TraceBuilder) Build(columns []trace.RawColumn) (trace.Trace, []error) {
padColumns(tr, requiredSpillage(tb.schema))
// Expand trace
if tb.parallel {
panic("parallel trace expansion not supported")
// Run (parallel) trace expansion
if err := parallelTraceExpansion(tb.schema, tr); err != nil {
return nil, append(errs, err)
}
} else if err := sequentialTraceExpansion(tb.schema, tr); err != nil {
// Expansion errors are fatal as well
return nil, append(errs, err)
Expand Down Expand Up @@ -205,49 +214,148 @@ func requiredSpillage(schema Schema) uint {
return mx
}

// PadColumns pads every column in a given trace with a given amount of padding.
func padColumns(tr *trace.ArrayTrace, padding uint) {
n := tr.Modules().Count()
// Iterate over modules
for i := uint(0); i < n; i++ {
tr.Pad(i, padding)
}
}

// sequentialTraceExpansion expands a given trace according to a given schema.
// More specifically, that means computing the actual values for any
// assignments. This is done using a straightforward sequential algorithm.
func sequentialTraceExpansion(schema Schema, trace *tr.ArrayTrace) error {
var err error
// Column identifiers for computed columns start immediately following the
// designated input columns.
cid := schema.InputColumns().Count()
// Compute each assignment in turn
for i, j := schema.Assignments(), uint(0); i.HasNext(); j++ {
var cols []tr.ArrayColumn
// Get ith assignment
ith := i.Next()
// Compute ith assignment(s)
cols, err := ith.ComputeColumns(trace)
// Check error
if err != nil {
if cols, err = ith.ComputeColumns(trace); err != nil {
return err
}
// Add all columns
for _, col := range cols {
dst := trace.Column(cid)
// Sanity checks
if dst.Context() != col.Context() || dst.Name() != col.Name() {
mod := schema.Modules().Nth(col.Context().Module()).name
return fmt.Errorf("misaligned computed column %s.%s during trace expansion", mod, col.Name())
} else if dst.Data() != nil {
mod := schema.Modules().Nth(col.Context().Module()).name
return fmt.Errorf("computed column %s.%s already exists in trace", mod, col.Name())
// Fill all computed columns
fillComputedColumns(cid, cols, trace)
// Advance column id past this assignment
cid += ith.Columns().Count()
}
// Done
return nil
}

// Perform trace expansion using concurrently executing jobs. The chosen
// algorithm operates in waves, rather than using an continuous approach. This
// is for two reasons: firstly, the latter would require locks that would slow
// down evaluation performance; secondly, the vast majority of jobs are run in
// the very first wave.
func parallelTraceExpansion(schema Schema, trace *tr.ArrayTrace) error {
// Construct a communication channel for errors.
ch := make(chan columnBatch, 100)
// Determine number of input columns
ninputs := schema.InputColumns().Count()
// Determine number of columns to compute
ntodo := schema.Assignments().Count()
// Iterate until all columns completed.
for ntodo > 0 {
// Dispatch next batch of assignments.
n := dispatchReadyAssignments(ninputs, schema, trace, ch)
//
batches := make([]columnBatch, n)
// Collect all the results
for i := uint(0); i < n; i++ {
batches[i] = <-ch
// Read from channel
if batches[i].err != nil {
// Fail immediately
return batches[i].err
}
// Looks good
trace.FillColumn(cid, col.Data(), col.Padding())
}
// Once we get here, all go rountines are complete and we are sequential
// again.
for _, r := range batches {
fillComputedColumns(r.index, r.columns, trace)
//
cid++
ntodo--
}
}
//
// Done
return nil
}

// PadColumns pads every column in a given trace with a given amount of padding.
func padColumns(tr *trace.ArrayTrace, padding uint) {
n := tr.Modules().Count()
// Iterate over modules
for i := uint(0); i < n; i++ {
tr.Pad(i, padding)
// Find any assignments which are ready to compute, and dispatch them with
// results being fed back into the shared channel. This returns the number of
// jobs which have been dispatched (i.e. so the caller knows how many results to
// expect).
func dispatchReadyAssignments(ninputs uint, schema Schema, trace *tr.ArrayTrace, ch chan columnBatch) uint {
count := uint(0)
//
for iter, cid := schema.Assignments(), ninputs; iter.HasNext(); {
ith := iter.Next()
// Check whether this assignment has already been computed and, if not,
// whether or not it is ready.
if trace.Column(cid).Data() == nil && isReady(ith, trace) {
// Dispatch!
go func(index uint) {
cols, err := ith.ComputeColumns(trace)
// Send outcome back
ch <- columnBatch{index, cols, err}
}(cid)
// Increment dispatch count
count++
}
// Update the column identifier
cid += ith.Columns().Count()
}
// Done
return count
}

// Check whether all dependencies for this assignment are available (that is,
// have their data already).
func isReady(assignment Assignment, trace *tr.ArrayTrace) bool {
for _, cid := range assignment.Dependencies() {
if trace.Column(cid).Data() == nil {
return false
}
}
// Done
return true
}

// Result from given computation.
type columnBatch struct {
// The column index of the first computed column in this batch.
index uint
// The computed columns in this batch.
columns []trace.ArrayColumn
// An error (should one arise)
err error
}

// Fill a set of columns with their computed results. The column index is that
// of the first column in the sequence, and subsequent columns are index
// consecutively.
func fillComputedColumns(cid uint, cols []tr.ArrayColumn, trace *tr.ArrayTrace) {
// Add all columns
for _, col := range cols {
dst := trace.Column(cid)
// Sanity checks
if dst.Context() != col.Context() || dst.Name() != col.Name() {
mod := trace.Modules().Nth(col.Context().Module()).Name()
panic(fmt.Sprintf("misaligned computed column %s.%s during trace expansion", mod, col.Name()))
} else if dst.Data() != nil {
mod := trace.Modules().Nth(col.Context().Module()).Name()
panic(fmt.Sprintf("computed column %s.%s already exists in trace", mod, col.Name()))
}
// Looks good
trace.FillColumn(cid, col.Data(), col.Padding())
//
cid++
}
}
2 changes: 1 addition & 1 deletion pkg/test/ir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ func CheckTraces(t *testing.T, test string, expected bool, expand bool,

func checkTrace(t *testing.T, inputs []trace.RawColumn, expand bool, id traceId, schema sc.Schema) {
// Construct the trace
tr, errs := sc.NewTraceBuilder(schema).Expand(expand).Padding(id.padding).Build(inputs)
tr, errs := sc.NewTraceBuilder(schema).Expand(expand).Padding(id.padding).Parallel(true).Build(inputs)
// Sanity check construction
if len(errs) > 0 {
for _, err := range errs {
Expand Down
5 changes: 5 additions & 0 deletions pkg/trace/array_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ func EmptyArrayModule(name string) ArrayModule {
return ArrayModule{name, math.MaxUint}
}

// Name returns the name of this module.
func (p ArrayModule) Name() string {
return p.name
}

// ----------------------------------------------------------------------------

// ArrayColumn describes an individual column of data within a trace table.
Expand Down

0 comments on commit 8563f40

Please sign in to comment.