forked from dunglas/frankenphp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadregular.go
139 lines (121 loc) · 3.42 KB
/
threadregular.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package frankenphp
import (
"net/http"
"sync"
)
// representation of a non-worker PHP thread
// executes PHP scripts in a web context
// implements the threadHandler interface
type regularThread struct {
state *threadState
thread *phpThread
activeRequest *http.Request
}
var (
regularThreads []*phpThread
regularThreadMu = &sync.RWMutex{}
regularRequestChan chan *http.Request
)
func convertToRegularThread(thread *phpThread) {
thread.setHandler(®ularThread{
thread: thread,
state: thread.state,
})
attachRegularThread(thread)
}
// beforeScriptExecution returns the name of the script or an empty string on shutdown
func (handler *regularThread) beforeScriptExecution() string {
switch handler.state.get() {
case stateTransitionRequested:
detachRegularThread(handler.thread)
return handler.thread.transitionToNewHandler()
case stateTransitionComplete:
handler.state.set(stateReady)
return handler.waitForRequest()
case stateReady:
return handler.waitForRequest()
case stateShuttingDown:
detachRegularThread(handler.thread)
// signal to stop
return ""
}
panic("unexpected state: " + handler.state.name())
}
// return true if the worker should continue to run
func (handler *regularThread) afterScriptExecution(exitStatus int) {
handler.afterRequest(exitStatus)
}
func (handler *regularThread) getActiveRequest() *http.Request {
return handler.activeRequest
}
func (handler *regularThread) name() string {
return "Regular PHP Thread"
}
func (handler *regularThread) waitForRequest() string {
handler.state.markAsWaiting(true)
var r *http.Request
select {
case <-handler.thread.drainChan:
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
case r = <-regularRequestChan:
}
handler.activeRequest = r
handler.state.markAsWaiting(false)
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
if err := updateServerContext(handler.thread, r, true, false); err != nil {
rejectRequest(fc.responseWriter, err.Error())
handler.afterRequest(0)
handler.thread.Unpin()
// go back to beforeScriptExecution
return handler.beforeScriptExecution()
}
// set the scriptFilename that should be executed
return fc.scriptFilename
}
func (handler *regularThread) afterRequest(exitStatus int) {
fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus
maybeCloseContext(fc)
handler.activeRequest = nil
}
func handleRequestWithRegularPHPThreads(r *http.Request, fc *FrankenPHPContext) {
metrics.StartRequest()
select {
case regularRequestChan <- r:
// a thread was available to handle the request immediately
<-fc.done
metrics.StopRequest()
return
default:
// no thread was available
}
// if no thread was available, mark the request as queued and fan it out to all threads
metrics.QueuedRequest()
for {
select {
case regularRequestChan <- r:
metrics.DequeuedRequest()
<-fc.done
metrics.StopRequest()
return
case scaleChan <- fc:
// the request has triggered scaling, continue to wait for a thread
}
}
}
func attachRegularThread(thread *phpThread) {
regularThreadMu.Lock()
regularThreads = append(regularThreads, thread)
regularThreadMu.Unlock()
}
func detachRegularThread(thread *phpThread) {
regularThreadMu.Lock()
for i, t := range regularThreads {
if t == thread {
regularThreads = append(regularThreads[:i], regularThreads[i+1:]...)
break
}
}
regularThreadMu.Unlock()
}