Skip to content

Commit

Permalink
chore: Refactor Source to Avoid Implementing Forwarder Interface (#1708)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 authored May 5, 2024
1 parent 3d9358f commit e9810eb
Show file tree
Hide file tree
Showing 19 changed files with 401 additions and 1,066 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ const (
DefaultPnfBatchSize = 100 // Default flush batch size for pnf
DefaultPnfFlushDuration = time.Second // Default flush duration for pnf

// DefaultKafkaHandlerChannelSize is the default channel size for kafka handler
DefaultKafkaHandlerChannelSize = 100

// DefaultKeyForNonKeyedData Default key for non keyed stream
DefaultKeyForNonKeyedData = "NON_KEYED_STREAM"

Expand Down
9 changes: 5 additions & 4 deletions pkg/sources/forward/data_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ func NewDataForward(
toVertexWmStores map[string]store.WatermarkStore,
idleManager wmb.IdleManager,
opts ...Option) (*DataForward, error) {
defaultOptions := DefaultOptions()
dOpts := defaultOptions()
for _, o := range opts {
if err := o(defaultOptions); err != nil {
if err := o(dOpts); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -115,10 +115,10 @@ func NewDataForward(
Shutdown: Shutdown{
rwlock: new(sync.RWMutex),
},
opts: *defaultOptions,
opts: *dOpts,
}
// add logger from parent ctx to child context.
isdf.ctx = logging.WithLogger(ctx, defaultOptions.logger)
isdf.ctx = logging.WithLogger(ctx, dOpts.logger)
return &isdf, nil
}

Expand Down Expand Up @@ -270,6 +270,7 @@ func (df *DataForward) forwardAChunk(ctx context.Context) {
messageToStep[toVertex] = make([][]isb.Message, len(df.toBuffers[toVertex]))
}

// FIXME: when the transformer is not defined, we should avoid doing this.
// user-defined transformer concurrent processing request channel
transformerCh := make(chan *readWriteMessagePair)
// transformerResults stores the results after user-defined transformer processing for all read messages. It indexes
Expand Down
5 changes: 4 additions & 1 deletion pkg/sources/forward/data_forward_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ type SimpleSource struct {
buffer *simplebuffer.InMemoryBuffer
}

func (s *SimpleSource) Pending(ctx context.Context) (int64, error) {
return 0, nil
}

func NewSimpleSource(buffer *simplebuffer.InMemoryBuffer) *SimpleSource {
return &SimpleSource{buffer: buffer}
}
Expand Down Expand Up @@ -158,7 +162,6 @@ func TestNewDataForward(t *testing.T) {
defer cancel()

writeMessages := testutils.BuildTestWriteMessages(4*batchSize, testStartTime, nil)

fetchWatermark, _ := generic.BuildNoOpSourceWatermarkProgressorsFromBufferMap(toSteps)
noOpStores := buildNoOpToVertexStores(toSteps)
idleManager, _ := wmb.NewIdleManager(1, len(toSteps))
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/forward/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type options struct {

type Option func(*options) error

func DefaultOptions() *options {
func defaultOptions() *options {
return &options{
readBatchSize: dfv1.DefaultReadBatchSize,
transformerConcurrency: dfv1.DefaultReadBatchSize,
Expand Down
192 changes: 54 additions & 138 deletions pkg/sources/generator/tickgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,12 @@ import (
"go.uber.org/zap"

dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/forwarder"
"github.com/numaproj/numaflow/pkg/isb"
"github.com/numaproj/numaflow/pkg/metrics"
"github.com/numaproj/numaflow/pkg/shared/logging"
sourceforward "github.com/numaproj/numaflow/pkg/sources/forward"
applier2 "github.com/numaproj/numaflow/pkg/sources/forward/applier"
"github.com/numaproj/numaflow/pkg/sources/sourcer"
"github.com/numaproj/numaflow/pkg/watermark/fetch"
"github.com/numaproj/numaflow/pkg/watermark/publish"
"github.com/numaproj/numaflow/pkg/watermark/store"
"github.com/numaproj/numaflow/pkg/watermark/wmb"
)

var log = logging.NewLogger()

type Data struct {
Value uint64 `json:"value,omitempty"`
// only to ensure a desired message size
Expand All @@ -66,8 +57,7 @@ type record struct {
ts int64
}

var recordGenerator = func(size int32, value *uint64, createdTS int64) []byte {

var recordGenerator = func(size int32, value *uint64, createdTS int64) ([]byte, error) {
data := Data{}
if value != nil {
data.Value = *value
Expand All @@ -78,68 +68,42 @@ var recordGenerator = func(size int32, value *uint64, createdTS int64) []byte {
if size > 0 {
// padding to guarantee the size of the message
b := make([]byte, size)
_, err := rand.Read(b) // we do not care about failures here.
if err != nil {
log.Warn("error while generating random bytes", err)
}
_, _ = rand.Read(b) // we do not care about failures here.
data.Padding = b
}

r := payload{Data: data, Createdts: createdTS}
marshalled, err := json.Marshal(r)
if err != nil {
log.Errorf("Error marshalling the record [%v]", r)
}
return marshalled
return json.Marshal(r)
}

type memGen struct {
srcChan chan record // srcChan provides a go channel that supplies generated data
rpu int // rpu - records per time unit
keyCount int32 // keyCount is the number of unique keys in the payload
value *uint64 // value is the optional uint64 number that can be set in the payload
msgSize int32 // msgSize is the size of each generated message
timeunit time.Duration // timeunit - ticker will fire once per timeunit
genFn func(int32, *uint64, int64) []byte // genFn function that generates a payload as a byte array
vertexName string // name is the name of the source vertex
pipelineName string // pipelineName is the name of the pipeline
cancelFn context.CancelFunc // cancelFn terminates the source will not generate any more records.
forwarder *sourceforward.DataForward // forwarder to read from the source and write to the inter step buffer.
lifecycleCtx context.Context // lifecycleCtx context is used to control the lifecycle of this instance.
readTimeout time.Duration // read timeout for the reader
vertexInstance *dfv1.VertexInstance // vertex instance
srcChan chan record // srcChan provides a go channel that supplies generated data
rpu int // rpu - records per time unit
keyCount int32 // keyCount is the number of unique keys in the payload
value *uint64 // value is the optional uint64 number that can be set in the payload
msgSize int32 // msgSize is the size of each generated message
timeunit time.Duration // timeunit - ticker will fire once per timeunit
genFn func(int32, *uint64, int64) ([]byte, error) // genFn function that generates a payload as a byte array
vertexName string // name is the name of the source vertex
pipelineName string // pipelineName is the name of the pipeline
readTimeout time.Duration // read timeout for the reader
vertexInstance *dfv1.VertexInstance // vertex instance
jitter time.Duration
logger *zap.SugaredLogger
}

type Option func(*memGen) error

// WithLogger is used to return logger information
func WithLogger(l *zap.SugaredLogger) Option {
return func(o *memGen) error {
o.logger = l
return nil
}
}

// WithReadTimeout sets the read timeout for the reader.
func WithReadTimeout(timeout time.Duration) Option {
return func(o *memGen) error {
o.readTimeout = timeout
return nil
}
}

// NewMemGen function creates an instance of generator.
func NewMemGen(
vertexInstance *dfv1.VertexInstance,
writers map[string][]isb.BufferWriter,
fsd forwarder.ToWhichStepDecider,
transformerApplier applier2.SourceTransformApplier,
fetchWM fetch.SourceFetcher,
toVertexPublisherStores map[string]store.WatermarkStore,
publishWMStores store.WatermarkStore,
idleManager wmb.IdleManager,
opts ...Option) (sourcer.Sourcer, error) {
// NewMemGen function creates an instance of generator source reader.
func NewMemGen(ctx context.Context, vertexInstance *dfv1.VertexInstance, opts ...Option) (sourcer.SourceReader, error) {

// minimal CRDs don't have defaults
rpu := 5
Expand Down Expand Up @@ -181,40 +145,17 @@ func NewMemGen(
srcChan: make(chan record, rpu*int(keyCount)*5),
readTimeout: 3 * time.Second, // default timeout
jitter: jitter,
logger: logging.FromContext(ctx),
}

for _, o := range opts {
if err := o(genSrc); err != nil {
return nil, err
}
}
if genSrc.logger == nil {
genSrc.logger = logging.NewLogger()
}

// this context is to be used internally for controlling the lifecycle of generator
ctx, cancel := context.WithCancel(context.Background())

genSrc.lifecycleCtx = ctx
genSrc.cancelFn = cancel

forwardOpts := []sourceforward.Option{sourceforward.WithLogger(genSrc.logger)}
if x := vertexInstance.Vertex.Spec.Limits; x != nil {
if x.ReadBatchSize != nil {
forwardOpts = append(forwardOpts, sourceforward.WithReadBatchSize(int64(*x.ReadBatchSize)))
}
}

// create a source watermark publisher
sourceWmPublisher := publish.NewSourcePublish(ctx, genSrc.pipelineName, genSrc.vertexName, publishWMStores,
publish.WithDelay(vertexInstance.Vertex.Spec.Watermark.GetMaxDelay()), publish.WithDefaultPartitionIdx(vertexInstance.Replica))

// we pass in the context to sourceForwarder as well so that it can shut down when we cancelFn the context
sourceForwarder, err := sourceforward.NewDataForward(vertexInstance, genSrc, writers, fsd, transformerApplier, fetchWM, sourceWmPublisher, toVertexPublisherStores, idleManager, forwardOpts...)
if err != nil {
return nil, err
}
genSrc.forwarder = sourceForwarder
// start the generator
go genSrc.generator(ctx, genSrc.rpu, genSrc.timeunit)

return genSrc, nil
}
Expand All @@ -229,10 +170,6 @@ func (mg *memGen) Partitions(context.Context) []int32 {
return []int32{mg.vertexInstance.Replica}
}

func (mg *memGen) IsEmpty() bool {
return len(mg.srcChan) == 0
}

func (mg *memGen) Read(_ context.Context, count int64) ([]*isb.ReadMessage, error) {
msgs := make([]*isb.ReadMessage, 0, count)
// timeout should not be re-triggered for every run of the for loop. it is for the entire Read() call.
Expand All @@ -246,7 +183,7 @@ loop:
tickgenSourceReadCount.With(map[string]string{metrics.LabelVertex: mg.vertexName, metrics.LabelPipeline: mg.pipelineName}).Inc()
msgs = append(msgs, mg.newReadMessage(r.key, r.data, r.offset, r.ts))
case <-timeout:
mg.logger.Debugw("Timed out waiting for messages to read.", zap.Duration("waited", mg.readTimeout))
mg.logger.Infow("Timed out waiting for messages to read.", zap.Duration("waited", mg.readTimeout))
break loop
}
}
Expand All @@ -266,32 +203,12 @@ func (mg *memGen) Close() error {
return nil
}

func (mg *memGen) Stop() {
mg.cancelFn()
mg.forwarder.Stop()
}

func (mg *memGen) ForceStop() {
mg.Stop()
mg.forwarder.ForceStop()

}

// Start starts reading from the source
// context is used to control the lifecycle of this component.
// this context will be used to shut down the vertex once an os.signal is received.
func (mg *memGen) Start() <-chan struct{} {
mg.generator(mg.lifecycleCtx, mg.rpu, mg.timeunit)
return mg.forwarder.Start()
}

func (mg *memGen) NewWorker(ctx context.Context, rate int) func(chan time.Time, chan struct{}) {

func (mg *memGen) newWorker(ctx context.Context, rate int) func(chan time.Time, chan struct{}) {
return func(tickChan chan time.Time, done chan struct{}) {
defer func() {
// empty any pending ticks
if len(tickChan) > 0 {
log.Info("emptying any pending ticks")
mg.logger.Info("emptying any pending ticks")
for len(tickChan) > 0 {
<-tickChan
}
Expand All @@ -313,11 +230,15 @@ func (mg *memGen) NewWorker(ctx context.Context, rate int) func(chan time.Time,
for i := 0; i < rate; i++ {
for k := int32(0); k < mg.keyCount; k++ {
key := fmt.Sprintf("key-%d-%d", mg.vertexInstance.Replica, k)
d := mg.genFn(mg.msgSize, mg.value, t)
d, err := mg.genFn(mg.msgSize, mg.value, t)
if err != nil {
mg.logger.Errorw("Error while generating the record, skipping the record", zap.Error(err))
continue
}
r := record{data: d, offset: time.Now().UTC().UnixNano(), key: key, ts: t}
select {
case <-ctx.Done():
log.Info("Context.Done is called. returning from the inner function")
mg.logger.Info("Context.Done is called. returning from the inner function")
return
case mg.srcChan <- r:
}
Expand All @@ -330,41 +251,36 @@ func (mg *memGen) NewWorker(ctx context.Context, rate int) func(chan time.Time,

// generator fires once per time unit and generates records and writes them to the channel
func (mg *memGen) generator(ctx context.Context, rate int, timeunit time.Duration) {
go func() {
// capping the rate to 10000 msgs/sec
if rate > 10000 {
log.Infow("Capping the rate to 10000 msg/sec. rate has been changed from %d to 10000", rate)
rate = 10000
}
// capping the rate to 10000 msgs/sec
if rate > 10000 {
mg.logger.Infow("Capping the rate to 10000 msg/sec. rate has been changed from %d to 10000", rate)
rate = 10000
}

tickChan := make(chan time.Time, 1000)
doneChan := make(chan struct{})
childCtx, childCancel := context.WithCancel(ctx)
tickChan := make(chan time.Time, 1000)
doneChan := make(chan struct{})

defer childCancel()
// make sure that there is only one worker all the time.
// even when there is back pressure, max number of go routines inflight should be 1.
// at the same time, we don't want to miss any ticks that cannot be processed.
worker := mg.newWorker(ctx, rate)
go worker(tickChan, doneChan)

// make sure that there is only one worker all the time.
// even when there is back pressure, max number of go routines inflight should be 1.
// at the same time, we don't want to miss any ticks that cannot be processed.
worker := mg.NewWorker(childCtx, rate)
go worker(tickChan, doneChan)
ticker := time.NewTicker(timeunit)
defer ticker.Stop()

ticker := time.NewTicker(timeunit)
defer ticker.Stop()
for {
select {
// we don't need to wait for ticker to fire to return
// when the context closes
case <-ctx.Done():
log.Info("Context.Done is called. exiting generator loop.")
childCancel()
<-doneChan
return
case ts := <-ticker.C:
tickChan <- ts
}
for {
select {
// we don't need to wait for ticker to fire to return
// when the context closes
case <-ctx.Done():
mg.logger.Info("Context.Done is called. exiting generator loop.")
<-doneChan
return
case ts := <-ticker.C:
tickChan <- ts
}
}()
}
}

func (mg *memGen) newReadMessage(key string, payload []byte, offset int64, et int64) *isb.ReadMessage {
Expand Down
Loading

0 comments on commit e9810eb

Please sign in to comment.