Skip to content

Commit

Permalink
the store cleanup will now run in transceivable and has a return bool…
Browse files Browse the repository at this point in the history
… to clodeBind... Eg.: If enquire_link expires, the bind should be closed (still handled by user)
  • Loading branch information
laduchesneau committed Mar 3, 2024
1 parent 4fb5738 commit e7f630f
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 44 deletions.
27 changes: 15 additions & 12 deletions example/transeiver_with_custom_store/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,39 +91,42 @@ func sendingAndReceiveSMS(wg *sync.WaitGroup) {
time.Sleep(1 * time.Second)
}

func handleExpirePduRequest() func(pdu.PDU) {
return func(p pdu.PDU) {
func handleExpirePduRequest() func(pdu.PDU) bool {
return func(p pdu.PDU) bool {
switch p.(type) {
case *pdu.Unbind:
fmt.Printf("Expired Unbind:%+v\n", p)
fmt.Println("Unbind Expired")

case *pdu.SubmitSM:
fmt.Printf("Expired SubmitSM:%+v\n", p)
fmt.Printf("Expired SubmitSM: %+v\n", p)

case *pdu.EnquireLink:
fmt.Printf("Expired EnquireLink:%+v\n", p)
fmt.Printf("Expired EnquireLink: %+v\n", p)
return true // if the enquire_link expired, usually means the bind is stale

case *pdu.DataSM:
fmt.Printf("Expired DataSM:%+v\n", p)
fmt.Printf("Expired DataSM: %+v\n", p)

default:
fmt.Printf("Expired PDU: %+v\n", p)
}

return false
}
}

func handleOnClosePduRequest() func(pdu.PDU) {
return func(p pdu.PDU) {
switch p.(type) {
case *pdu.Unbind:
fmt.Printf("OnClose Unbind:%+v\n", p)
fmt.Printf("OnClose Unbind: %+v\n", p)

case *pdu.SubmitSM:
fmt.Printf("OnClose SubmitSM:%+v\n", p)
fmt.Printf("OnClose SubmitSM: %+v\n", p)

case *pdu.EnquireLink:
fmt.Printf("OnClose EnquireLink:%+v\n", p)
fmt.Printf("OnClose EnquireLink: %+v\n", p)

case *pdu.DataSM:
fmt.Printf("OnClose DataSM:%+v\n", p)
fmt.Printf("OnClose DataSM: %+v\n", p)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ type RequestWindowConfig struct {
// Mandatory: the PduExpireTimeOut must be set
// Handle is optional
// If not set, expired PDU will be removed from cache
OnExpiredPduRequest func(pdu.PDU)
// the bind can be closed by retuning true on closeBind.
OnExpiredPduRequest func(pdu.PDU) (closeBind bool)

// OnClosePduRequest will return all PDU request found in the store when the bind closes
OnClosePduRequest func(pdu.PDU)
Expand Down
28 changes: 0 additions & 28 deletions receivable.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,41 +59,13 @@ func (t *receivable) closing(state State) {
}

func (t *receivable) start() {
if t.settings.RequestWindowConfig != nil && t.settings.ExpireCheckTimer > 0 {
t.wg.Add(1)
go func() {
t.windowCleanup()
t.wg.Done()
}()

}
t.wg.Add(1)
go func() {
t.loop()
t.wg.Done()
}()
}

func (t *receivable) windowCleanup() {
ticker := time.NewTicker(t.settings.ExpireCheckTimer)
defer ticker.Stop()
for {
select {
case <-t.ctx.Done():
return
case <-ticker.C:
for _, request := range t.settings.RequestStore.List(context.TODO()) {
if time.Since(request.TimeSent) > t.settings.PduExpireTimeOut {
t.settings.RequestStore.Delete(context.TODO(), request.GetSequenceNumber())
if t.settings.OnExpiredPduRequest != nil {
t.settings.OnExpiredPduRequest(request.PDU)
}
}
}
}
}
}

// check error and do closing if needed
func (t *receivable) check(err error) (closing bool) {
if err == nil {
Expand Down
44 changes: 41 additions & 3 deletions transceivable.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package gosmpp

import (
"context"
"github.com/linxGnu/gosmpp/pdu"
"sync"
"sync/atomic"
"time"
)

type transceivable struct {
settings Settings

conn *Connection
in *receivable
out *transmittable
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
conn *Connection
in *receivable
out *transmittable

aliveState int32
}
Expand All @@ -21,6 +27,7 @@ func newTransceivable(conn *Connection, settings Settings) *transceivable {
settings: settings,
conn: conn,
}
t.ctx, t.cancel = context.WithCancel(context.Background())

t.out = newTransmittable(conn, Settings{
WriteTimeout: settings.WriteTimeout,
Expand Down Expand Up @@ -79,6 +86,14 @@ func newTransceivable(conn *Connection, settings Settings) *transceivable {
}

func (t *transceivable) start() {
if t.settings.RequestWindowConfig != nil && t.settings.ExpireCheckTimer > 0 {
t.wg.Add(1)
go func() {
t.windowCleanup()
t.wg.Done()
}()

}
t.out.start()
t.in.start()
}
Expand Down Expand Up @@ -114,3 +129,26 @@ func (t *transceivable) Submit(p pdu.PDU) error {
func (t *transceivable) GetWindowSize() int {
return t.out.GetWindowSize()
}

func (t *transceivable) windowCleanup() {
ticker := time.NewTicker(t.settings.ExpireCheckTimer)
defer ticker.Stop()
for {
select {
case <-t.ctx.Done():
return
case <-ticker.C:
for _, request := range t.settings.RequestStore.List(context.TODO()) {
if time.Since(request.TimeSent) > t.settings.PduExpireTimeOut {
t.settings.RequestStore.Delete(context.TODO(), request.GetSequenceNumber())
if t.settings.OnExpiredPduRequest != nil {
bindClose := t.settings.OnExpiredPduRequest(request.PDU)
if bindClose {
_ = t.Close()
}
}
}
}
}
}
}

0 comments on commit e7f630f

Please sign in to comment.