Skip to content

Commit

Permalink
Session tear down logic for fast verification (livepeer#2381)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberj0g authored Aug 19, 2022
1 parent 0602f81 commit a1fb761
Show file tree
Hide file tree
Showing 15 changed files with 462 additions and 205 deletions.
2 changes: 1 addition & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
orch := core.NewOrchestrator(s.LivepeerNode, timeWatcher)

go func() {
err = server.StartTranscodeServer(orch, *cfg.HttpAddr, s.HTTPMux, n.WorkDir, n.TranscoderManager != nil)
err = server.StartTranscodeServer(orch, *cfg.HttpAddr, s.HTTPMux, n.WorkDir, n.TranscoderManager != nil, n)
if err != nil {
glog.Fatalf("Error starting Transcoder node: err=%q", err)
}
Expand Down
24 changes: 22 additions & 2 deletions core/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ type LoadBalancingTranscoder struct {
idx int // Ensures a non-tapered work distribution
}

func (lb *LoadBalancingTranscoder) EndTranscodingSession(sessionId string) {
lb.mu.RLock()
defer lb.mu.RUnlock()
if session, exists := lb.sessions[sessionId]; exists {
// signal transcode loop finish for this session
close(session.stop)
clog.V(common.DEBUG).Infof(context.TODO(), "LB: Transcode session id=%s teared down", session.key)
} else {
clog.V(common.DEBUG).Infof(context.TODO(), "LB: Transcode session id=%s already finished", sessionId)
}
}

func NewLoadBalancingTranscoder(devices []string, newTranscoderFn newTranscoderFn,
newTranscoderWithDetectorFn newTranscoderWithDetectorFn) Transcoder {
return &LoadBalancingTranscoder{
Expand Down Expand Up @@ -102,6 +114,7 @@ func (lb *LoadBalancingTranscoder) createSession(ctx context.Context, md *SegTra
transcoder: lpmsSession,
key: key,
done: make(chan struct{}),
stop: make(chan struct{}),
sender: make(chan *transcoderParams, maxSegmentChannels),
makeContext: transcodeLoopContext,
}
Expand Down Expand Up @@ -158,8 +171,11 @@ type transcoderSession struct {
transcoder TranscoderSession
key string

sender chan *transcoderParams
done chan struct{}
sender chan *transcoderParams
// channel to handle Orchestrator error or shutdown during transcoding
done chan struct{}
// channel to signal transcoding loop stop, done channel is not used when not transcoding
stop chan struct{}
makeContext func() (context.Context, context.CancelFunc)
}

Expand All @@ -176,6 +192,10 @@ func (sess *transcoderSession) loop(logCtx context.Context) {
for {
ctx, cancel := sess.makeContext()
select {
case <-sess.stop:
// Terminate the session after a period of inactivity
clog.V(common.DEBUG).Infof(logCtx, "LB: Transcode loop stopped for key=%s", sess.key)
return
case <-ctx.Done():
// Terminate the session after a period of inactivity
clog.V(common.DEBUG).Infof(logCtx, "LB: Transcode loop timed out for key=%s", sess.key)
Expand Down
7 changes: 4 additions & 3 deletions core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ type LivepeerNode struct {
// Thread safety for config fields
mu sync.RWMutex
// Transcoder private fields
priceInfo *big.Rat
serviceURI url.URL
segmentMutex *sync.RWMutex
priceInfo *big.Rat
serviceURI url.URL
segmentMutex *sync.RWMutex
StorageConfig *transcodeConfig
}

//NewLivepeerNode creates a new Livepeer Node. Eth can be nil.
Expand Down
4 changes: 4 additions & 0 deletions core/livepeernode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func stubTranscoderWithProfiles(profiles []ffmpeg.VideoProfile) *StubTranscoder
return &StubTranscoder{Profiles: profiles}
}

func (t *StubTranscoder) EndTranscodingSession(sessionId string) {

}

func (t *StubTranscoder) Transcode(ctx context.Context, md *SegTranscodingMetadata) (*TranscodeData, error) {
if t.FailTranscode {
return nil, ErrTranscode
Expand Down
73 changes: 50 additions & 23 deletions core/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,8 +624,7 @@ func (n *LivepeerNode) transcodeSegmentLoop(logCtx context.Context, md *SegTrans
// no preference (or unknown pref), so use our own
os = los
}

config := transcodeConfig{
n.StorageConfig = &transcodeConfig{
OS: os,
LocalOS: los,
}
Expand All @@ -635,36 +634,55 @@ func (n *LivepeerNode) transcodeSegmentLoop(logCtx context.Context, md *SegTrans
ctx, cancel := context.WithTimeout(context.Background(), transcodeLoopTimeout)
select {
case <-ctx.Done():
// timeout; clean up goroutine here
os.EndSession()
los.EndSession()
// check to avoid nil pointer caused by garbage collection while this go routine is still running
if n.TranscoderManager != nil {
n.TranscoderManager.RTmutex.Lock()
n.TranscoderManager.completeStreamSession(md.AuthToken.SessionId)
n.TranscoderManager.RTmutex.Unlock()
}
clog.V(common.DEBUG).Infof(logCtx, "Segment loop timed out; closing ")
n.segmentMutex.Lock()
mid := ManifestID(md.AuthToken.SessionId)
if _, ok := n.SegmentChans[mid]; ok {
close(n.SegmentChans[mid])
delete(n.SegmentChans, mid)
if lpmon.Enabled {
lpmon.CurrentSessions(len(n.SegmentChans))
}
}
n.segmentMutex.Unlock()
n.endTranscodingSession(md.AuthToken.SessionId, logCtx)
return
case chanData := <-segChan:
chanData.res <- n.transcodeSeg(chanData.ctx, config, chanData.seg, chanData.md)
// nil means channel is closed by endTranscodingSession called by B
if chanData != nil {
chanData.res <- n.transcodeSeg(chanData.ctx, *n.StorageConfig, chanData.seg, chanData.md)
}
}
cancel()
}
}()
return nil
}

func (n *LivepeerNode) endTranscodingSession(sessionId string, logCtx context.Context) {
// timeout; clean up goroutine here
if n.StorageConfig != nil {
n.StorageConfig.OS.EndSession()
n.StorageConfig.LocalOS.EndSession()
}
// check to avoid nil pointer caused by garbage collection while this go routine is still running
if n.TranscoderManager != nil {
n.TranscoderManager.RTmutex.Lock()
// send empty segment to signal transcoder internal session teardown if session exist
if sess, exists := n.TranscoderManager.streamSessions[sessionId]; exists {
segData := &net.SegData{
AuthToken: &net.AuthToken{SessionId: sessionId},
}
msg := &net.NotifySegment{
SegData: segData,
}
_ = sess.stream.Send(msg)
}
n.TranscoderManager.completeStreamSession(sessionId)
n.TranscoderManager.RTmutex.Unlock()
}
n.segmentMutex.Lock()
mid := ManifestID(sessionId)
if _, ok := n.SegmentChans[mid]; ok {
close(n.SegmentChans[mid])
delete(n.SegmentChans, mid)
if lpmon.Enabled {
lpmon.CurrentSessions(len(n.SegmentChans))
}
}
n.segmentMutex.Unlock()
}

func (n *LivepeerNode) serveTranscoder(stream net.Transcoder_RegisterTranscoderServer, capacity int, capabilities *net.Capabilities) {
from := common.GetConnectionAddr(stream.Context())
coreCaps := CapabilitiesFromNetCapabilities(capabilities)
Expand Down Expand Up @@ -959,7 +977,16 @@ func (rtm *RemoteTranscoderManager) selectTranscoder(sessionId string, caps *Cap
return nil, ErrNoTranscodersAvailable
}

// compleStreamSessions end a stream session for a remote transcoder and decrements its laod
// ends transcoding session and releases resources
func (node *LivepeerNode) EndTranscodingSession(sessionId string) {
node.endTranscodingSession(sessionId, context.TODO())
}

func (node *RemoteTranscoderManager) EndTranscodingSession(sessionId string) {
panic("shouldn't be called on RemoteTranscoderManager")
}

// completeStreamSessions end a stream session for a remote transcoder and decrements its load
// caller should hold the mutex lock
func (rtm *RemoteTranscoderManager) completeStreamSession(sessionId string) {
t, ok := rtm.streamSessions[sessionId]
Expand Down
17 changes: 17 additions & 0 deletions core/transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

type Transcoder interface {
Transcode(ctx context.Context, md *SegTranscodingMetadata) (*TranscodeData, error)
EndTranscodingSession(sessionId string)
}

type LocalTranscoder struct {
Expand Down Expand Up @@ -74,6 +75,10 @@ func (lt *LocalTranscoder) Transcode(ctx context.Context, md *SegTranscodingMeta
return resToTranscodeData(ctx, res, opts)
}

func (lt *LocalTranscoder) EndTranscodingSession(sessionId string) {
// no-op for software transcoder
}

func NewLocalTranscoder(workDir string) Transcoder {
return &LocalTranscoder{workDir: workDir}
}
Expand Down Expand Up @@ -122,6 +127,10 @@ func (nv *NetintTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
return resToTranscodeData(ctx, res, out)
}

func (lt *LocalTranscoder) Stop() {
//no-op for software transcoder
}

func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMetadata) (td *TranscodeData, retErr error) {
// Returns UnrecoverableError instead of panicking to gracefully notify orchestrator about transcoder's failure
defer recoverFromPanic(&retErr)
Expand Down Expand Up @@ -156,6 +165,14 @@ func (nv *NvidiaTranscoder) Transcode(ctx context.Context, md *SegTranscodingMet
return resToTranscodeData(ctx, res, out)
}

func (nv *NvidiaTranscoder) EndTranscodingSession(sessionId string) {
nv.Stop()
}

func (nt *NetintTranscoder) EndTranscodingSession(sessionId string) {
nt.Stop()
}

type transcodeTestParams struct {
TestAvailable bool
Cap Capability
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,3 @@ require (
google.golang.org/grpc v1.47.0
pgregory.net/rapid v0.4.0
)

Loading

0 comments on commit a1fb761

Please sign in to comment.