Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait expiration changes #1302

Merged
merged 3 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/flowrunner/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ func TestPrintEvent(t *testing.T) {
{events.NewContactRefreshed(session.Contact()), `👤 contact refreshed on resume`},
{events.NewContactTimezoneChanged(session.Environment().Timezone()), `🕑 timezone changed to 'America/Guayaquil'`},
{events.NewDialEnded(flows.NewDial(flows.DialStatusBusy, 3)), `☎️ dial ended with 'busy'`},
{events.NewDialWait(urns.URN(`tel:+1234567890`), 20, 120, nil), `⏳ waiting for dial (type /dial <answered|no_answer|busy|failed>)...`},
{events.NewDialWait(urns.URN(`tel:+1234567890`), 20, 120, expiresOn), `⏳ waiting for dial (type /dial <answered|no_answer|busy|failed>)...`},
{events.NewEmailSent([]string{"[email protected]"}, "Hi", "What up?"), `✉️ email sent with subject 'Hi'`},
{events.NewEnvironmentRefreshed(session.Environment()), `⚙️ environment refreshed on resume`},
{events.NewErrorf("this didn't work"), `⚠️ this didn't work`},
{events.NewFailure(errors.New("this really didn't work")), `🛑 this really didn't work`},
{events.NewFlowEntered(flow.Reference(false), "", false), `↪️ entered flow 'Registration'`},
{events.NewInputLabelsAdded("2a786bbc-2314-4d57-a0c9-b66e1642e5e2", []*flows.Label{sa.Labels().FindByName("Spam")}), `🏷️ labeled with 'Spam'`},
{events.NewMsgWait(nil, nil, nil), `⏳ waiting for message...`},
{events.NewMsgWait(&timeout, &expiresOn, nil), `⏳ waiting for message (3 sec timeout, type /timeout to simulate)...`},
{events.NewMsgWait(nil, expiresOn, nil), `⏳ waiting for message...`},
{events.NewMsgWait(&timeout, expiresOn, nil), `⏳ waiting for message (3 sec timeout, type /timeout to simulate)...`},
}

for _, tc := range tests {
Expand Down
14 changes: 7 additions & 7 deletions flows/actions/testdata/TestResthookPayload_resthook_payload.snap
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@
"uuid": "970b8069-50f5-4f6f-8f41-6b2d9f33d623"
},
{
"arrived_on": "2018-07-06T12:30:23.123456Z",
"arrived_on": "2018-07-06T12:30:24.123456Z",
"exit_uuid": "d898f9a4-f0fc-4ac4-a639-c98c602bb511",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
"uuid": "5ecda5fc-951c-437b-a17e-f85e49829fb9"
},
{
"arrived_on": "2018-07-06T12:30:51.123456Z",
"arrived_on": "2018-07-06T12:30:52.123456Z",
"exit_uuid": "9fc5f8b4-2247-43db-b899-ab1ac50ba06c",
"node_uuid": "c0781400-737f-4940-9a6c-1ec1c3df0325",
"uuid": "312d3af0-a565-4c96-ba00-bd7f0d08e671"
Expand All @@ -71,7 +71,7 @@
"2factor": {
"category": "",
"category_localized": "",
"created_on": "2018-07-06T12:30:32.123456Z",
"created_on": "2018-07-06T12:30:33.123456Z",
"input": "",
"name": "2Factor",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand All @@ -80,7 +80,7 @@
"favorite_color": {
"category": "Red",
"category_localized": "Red",
"created_on": "2018-07-06T12:30:28.123456Z",
"created_on": "2018-07-06T12:30:29.123456Z",
"input": "",
"name": "Favorite Color",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand All @@ -89,7 +89,7 @@
"intent": {
"category": "Success",
"category_localized": "Success",
"created_on": "2018-07-06T12:30:46.123456Z",
"created_on": "2018-07-06T12:30:47.123456Z",
"input": "Hi there",
"name": "Intent",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand All @@ -98,7 +98,7 @@
"phone_number": {
"category": "",
"category_localized": "",
"created_on": "2018-07-06T12:30:24.123456Z",
"created_on": "2018-07-06T12:30:25.123456Z",
"input": "",
"name": "Phone Number",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand All @@ -107,7 +107,7 @@
"webhook": {
"category": "Success",
"category_localized": "Success",
"created_on": "2018-07-06T12:30:40.123456Z",
"created_on": "2018-07-06T12:30:41.123456Z",
"input": "GET http://127.0.0.1:49999/?content=%7B%22results%22%3A%5B%7B%22state%22%3A%22WA%22%7D%2C%7B%22state%22%3A%22IN%22%7D%5D%7D",
"name": "webhook",
"node_uuid": "f5bb9b7a-7b5e-45c3-8f0e-61b4e95edf03",
Expand Down
61 changes: 36 additions & 25 deletions flows/definition/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"maps"
"slices"
"time"

"github.com/Masterminds/semver"
"github.com/nyaruka/gocommon/i18n"
Expand All @@ -31,15 +32,15 @@ func IsVersionSupported(v *semver.Version) bool {

type flow struct {
// spec properties
uuid assets.FlowUUID
name string
specVersion *semver.Version
language i18n.Language
flowType flows.FlowType
revision int
expireAfterMinutes int
localization flows.Localization
nodes []flows.Node
uuid assets.FlowUUID
name string
specVersion *semver.Version
language i18n.Language
flowType flows.FlowType
revision int
expireAfter time.Duration
localization flows.Localization
nodes []flows.Node

// optional properties not used by engine itself
ui json.RawMessage
Expand All @@ -52,20 +53,20 @@ type flow struct {
}

// NewFlow creates a new flow
func NewFlow(uuid assets.FlowUUID, name string, language i18n.Language, flowType flows.FlowType, revision int, expireAfterMinutes int, localization flows.Localization, nodes []flows.Node, ui json.RawMessage, a assets.Flow) (flows.Flow, error) {
func NewFlow(uuid assets.FlowUUID, name string, language i18n.Language, flowType flows.FlowType, revision int, expireAfter time.Duration, localization flows.Localization, nodes []flows.Node, ui json.RawMessage, a assets.Flow) (flows.Flow, error) {
f := &flow{
uuid: uuid,
name: name,
specVersion: CurrentSpecVersion,
language: language,
flowType: flowType,
revision: revision,
expireAfterMinutes: expireAfterMinutes,
localization: localization,
nodes: nodes,
nodeMap: make(map[flows.NodeUUID]flows.Node, len(nodes)),
ui: ui,
asset: a,
uuid: uuid,
name: name,
specVersion: CurrentSpecVersion,
language: language,
flowType: flowType,
revision: revision,
expireAfter: expireAfter,
localization: localization,
nodes: nodes,
nodeMap: make(map[flows.NodeUUID]flows.Node, len(nodes)),
ui: ui,
asset: a,
}

for _, node := range f.nodes {
Expand All @@ -85,12 +86,22 @@ func (f *flow) SpecVersion() *semver.Version { return f.specVersion }
func (f *flow) Revision() int { return f.revision }
func (f *flow) Language() i18n.Language { return f.language }
func (f *flow) Type() flows.FlowType { return f.flowType }
func (f *flow) ExpireAfterMinutes() int { return f.expireAfterMinutes }
func (f *flow) Nodes() []flows.Node { return f.nodes }
func (f *flow) Localization() flows.Localization { return f.localization }
func (f *flow) UI() json.RawMessage { return f.ui }
func (f *flow) GetNode(uuid flows.NodeUUID) flows.Node { return f.nodeMap[uuid] }

func (f *flow) ExpireAfter() time.Duration {
if f.expireAfter == 0 {
if f.flowType == flows.FlowTypeMessaging {
return 10080 * time.Minute
} else if f.flowType == flows.FlowTypeVoice {
return 5 * time.Minute
}
}
return f.expireAfter
}

func (f *flow) validate() error {
// track UUIDs used by nodes and actions to ensure that they are unique
seenUUIDs := make(map[uuids.UUID]bool)
Expand Down Expand Up @@ -348,7 +359,7 @@ func readFlow(data json.RawMessage, mc *migrations.Config, a assets.Flow) (flows
e.Localization = make(localization)
}

return NewFlow(e.UUID, e.Name, e.Language, e.Type, e.Revision, e.ExpireAfterMinutes, e.Localization, nodes, e.UI, a)
return NewFlow(e.UUID, e.Name, e.Language, e.Type, e.Revision, time.Duration(e.ExpireAfterMinutes)*time.Minute, e.Localization, nodes, e.UI, a)
}

// MarshalJSON marshals this flow into JSON
Expand All @@ -362,7 +373,7 @@ func (f *flow) MarshalJSON() ([]byte, error) {
Language: f.language,
Type: f.flowType,
Revision: f.revision,
ExpireAfterMinutes: f.expireAfterMinutes,
ExpireAfterMinutes: int(f.expireAfter / time.Minute),
Localization: f.localization.(localization),
Nodes: make([]*node, len(f.nodes)),
UI: f.ui,
Expand Down
5 changes: 3 additions & 2 deletions flows/definition/flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"strings"
"testing"
"time"

"github.com/Masterminds/semver"
"github.com/nyaruka/gocommon/i18n"
Expand Down Expand Up @@ -219,8 +220,8 @@ func TestNewFlow(t *testing.T) {
"Test Flow", // name
i18n.Language("eng"), // base language
flows.FlowTypeMessaging,
123, // revision
30, // expires after minutes
123, // revision
30*time.Minute, // expires after minutes
definition.NewLocalization(),
[]flows.Node{
definition.NewNode(
Expand Down
5 changes: 2 additions & 3 deletions flows/events/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ func TestEventMarshaling(t *testing.T) {

tz, _ := time.LoadLocation("Africa/Kigali")
timeout := 500
expiresOn := time.Date(2022, 2, 3, 13, 45, 30, 0, time.UTC)
gender := session.Assets().Fields().Get("gender")
jotd := session.Assets().OptIns().Get("248be71d-78e9-4d71-a6c4-9981d369e5cb")
weather := session.Assets().Topics().Get("472a7a73-96cb-4736-b567-056d987cc5b4")
Expand Down Expand Up @@ -504,7 +503,7 @@ func TestEventMarshaling(t *testing.T) {
}`,
},
{
events.NewMsgWait(&timeout, &expiresOn, hints.NewImageHint()),
events.NewMsgWait(&timeout, time.Date(2022, 2, 3, 13, 45, 30, 0, time.UTC), hints.NewImageHint()),
`{
"type": "msg_wait",
"created_on": "2018-10-18T14:20:30.000123456Z",
Expand Down Expand Up @@ -532,7 +531,7 @@ func TestEventMarshaling(t *testing.T) {
}`,
},
{
events.NewDialWait(urns.URN("tel:+1234567890"), 20, 120, &expiresOn),
events.NewDialWait(urns.URN("tel:+1234567890"), 20, 120, time.Date(2022, 2, 3, 13, 45, 30, 0, time.UTC)),
`{
"type": "dial_wait",
"created_on": "2018-10-18T14:20:30.000123456Z",
Expand Down
4 changes: 2 additions & 2 deletions flows/events/dial_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ type DialWaitEvent struct {
CallLimitSeconds int `json:"call_limit_seconds"`

// when this wait expires and the whole run can be expired
ExpiresOn *time.Time `json:"expires_on,omitempty"`
ExpiresOn time.Time `json:"expires_on,omitempty"`
}

// NewDialWait returns a new dial wait with the passed in URN
func NewDialWait(urn urns.URN, dialLimitSeconds, callLimitSeconds int, expiresOn *time.Time) *DialWaitEvent {
func NewDialWait(urn urns.URN, dialLimitSeconds, callLimitSeconds int, expiresOn time.Time) *DialWaitEvent {
return &DialWaitEvent{
BaseEvent: NewBaseEvent(TypeDialWait),
URN: urn,
Expand Down
6 changes: 3 additions & 3 deletions flows/events/msg_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ type MsgWaitEvent struct {
TimeoutSeconds *int `json:"timeout_seconds,omitempty"`

// When this wait expires and the whole run can be expired
ExpiresOn *time.Time `json:"expires_on,omitempty"`
ExpiresOn time.Time `json:"expires_on,omitempty"`

Hint flows.Hint `json:"hint,omitempty"`
}

// NewMsgWait returns a new msg wait with the passed in timeout
func NewMsgWait(timeoutSeconds *int, expiresOn *time.Time, hint flows.Hint) *MsgWaitEvent {
func NewMsgWait(timeoutSeconds *int, expiresOn time.Time, hint flows.Hint) *MsgWaitEvent {
return &MsgWaitEvent{
BaseEvent: NewBaseEvent(TypeMsgWait),
TimeoutSeconds: timeoutSeconds,
Expand All @@ -63,7 +63,7 @@ type msgWaitEnvelope struct {
BaseEvent

TimeoutSeconds *int `json:"timeout_seconds,omitempty"`
ExpiresOn *time.Time `json:"expires_on,omitempty"`
ExpiresOn time.Time `json:"expires_on,omitempty"`
Hint json.RawMessage `json:"hint,omitempty"`
}

Expand Down
2 changes: 1 addition & 1 deletion flows/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type Flow interface {
Revision() int
Language() i18n.Language
Type() FlowType
ExpireAfterMinutes() int
ExpireAfter() time.Duration
Localization() Localization
UI() json.RawMessage
Nodes() []Node
Expand Down
9 changes: 2 additions & 7 deletions flows/routers/waits/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,8 @@ func (w *baseWait) Timeout() flows.Timeout {
return w.timeout
}

func (w *baseWait) expiresOn(run flows.Run) *time.Time {
expiresAfterMins := run.Flow().ExpireAfterMinutes()
if expiresAfterMins > 0 {
dt := dates.Now().Add(time.Duration(int64(expiresAfterMins) * int64(time.Minute)))
return &dt
}
return nil
func (w *baseWait) expiresOn(run flows.Run) time.Time {
return dates.Now().Add(run.Flow().ExpireAfter())
}

//------------------------------------------------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion flows/routers/waits/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (w *DialWait) Begin(run flows.Run, log flows.EventCallback) bool {
// flow so calculate an expiry guaranteed to be after the wait returns
expiresOn := dates.Now().Add(w.dialLimit + w.callLimit + time.Second*30)

log(events.NewDialWait(urn, int(w.dialLimit/time.Second), int(w.callLimit/time.Second), &expiresOn))
log(events.NewDialWait(urn, int(w.dialLimit/time.Second), int(w.callLimit/time.Second), expiresOn))

return true
}
Expand Down
Loading
Loading