Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
Don't return OK from status until all parts are ready
Browse files Browse the repository at this point in the history
  • Loading branch information
bboreham committed May 11, 2017
1 parent 3e0eb4a commit 7611cd9
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 15 deletions.
8 changes: 5 additions & 3 deletions api/weaveapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ func (client *Client) Connect(remote string) error {
}

// IsReady returns true if the API server is up and running
// (note it returns StatusServiceUnavailable until all parts are ready,
// but callers of this function are expected to be ok with a partial service)
func (client *Client) IsReady() bool {
_, err := client.httpVerb("GET", "/status", nil)

return err == nil
resp, err := http.Get(client.baseURL + "/status")
return err == nil &&
(resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusServiceUnavailable)
}

// WaitAPIServer waits until the API server is ready to serve.
Expand Down
25 changes: 25 additions & 0 deletions common/wait.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package common

import (
"sync"
"sync/atomic"
)

type WaitGroup struct {
wg sync.WaitGroup
count int32
}

// Increase the count of things waiting and return a closure which calls Done
func (w *WaitGroup) Add() func() {
atomic.AddInt32(&w.count, 1)
w.wg.Add(1)
return func() {
w.wg.Done()
atomic.AddInt32(&w.count, -1)
}
}

func (w *WaitGroup) IsDone() bool {
return atomic.LoadInt32(&w.count) == 0
}
7 changes: 4 additions & 3 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ const (

var Log = common.Log

func Start(weaveAPIAddr string, dockerClient *docker.Client, address string, meshAddress string, dns bool, isPluginV2, forceMulticast bool, defaultSubnet string) {
func Start(weaveAPIAddr string, dockerClient *docker.Client, address string, meshAddress string, dns bool, isPluginV2, forceMulticast bool, defaultSubnet string, ready func()) {
weave := weaveapi.NewClient(weaveAPIAddr, Log)

Log.Info("Waiting for Weave API Server...")
weave.WaitAPIServer(30)
Log.Info("Finished waiting for Weave API Server")

if err := run(dockerClient, weave, address, meshAddress, dns, isPluginV2, forceMulticast, defaultSubnet); err != nil {
if err := run(dockerClient, weave, address, meshAddress, dns, isPluginV2, forceMulticast, defaultSubnet, ready); err != nil {
Log.Fatal(err)
}
}

func run(dockerClient *docker.Client, weave *weaveapi.Client, address, meshAddress string, dns, isPluginV2, forceMulticast bool, defaultSubnet string) error {
func run(dockerClient *docker.Client, weave *weaveapi.Client, address, meshAddress string, dns, isPluginV2, forceMulticast bool, defaultSubnet string, ready func()) error {
endChan := make(chan error, 1)

if address != "" {
Expand All @@ -60,6 +60,7 @@ func run(dockerClient *docker.Client, weave *weaveapi.Client, address, meshAddre
// TODO: the driver name should be extracted from pluginMeshSocket
dockerClient.EnsureNetwork("weave", "weavemesh", defaultSubnet, options)
}
ready()

return <-endChan
}
Expand Down
11 changes: 9 additions & 2 deletions prog/weaver/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/weaveworks/mesh"

"github.com/weaveworks/go-checkpoint"
"github.com/weaveworks/weave/common"
"github.com/weaveworks/weave/ipam"
"github.com/weaveworks/weave/nameserver"
"github.com/weaveworks/weave/net/address"
Expand Down Expand Up @@ -273,6 +274,7 @@ func (v *VersionCheck) String() string {
}

type WeaveStatus struct {
Ready bool
Version string
VersionCheck *VersionCheck `json:"VersionCheck,omitempty"`
Router *weave.NetworkRouterStatus `json:"Router,omitempty"`
Expand All @@ -281,9 +283,10 @@ type WeaveStatus struct {
}

// Read-only functions, suitable for exposing on an unprotected socket
func HandleHTTP(muxRouter *mux.Router, version string, router *weave.NetworkRouter, allocator *ipam.Allocator, defaultSubnet address.CIDR, ns *nameserver.Nameserver, dnsserver *nameserver.DNSServer) {
func HandleHTTP(muxRouter *mux.Router, version string, router *weave.NetworkRouter, allocator *ipam.Allocator, defaultSubnet address.CIDR, ns *nameserver.Nameserver, dnsserver *nameserver.DNSServer, waitReady *common.WaitGroup) {
status := func() WeaveStatus {
return WeaveStatus{
waitReady.IsDone(),
version,
versionCheck(),
weave.NewNetworkRouterStatus(router),
Expand Down Expand Up @@ -324,7 +327,11 @@ func HandleHTTP(muxRouter *mux.Router, version string, router *weave.NetworkRout
defHandler := func(path string, template *template.Template) {
muxRouter.Methods("GET").Path(path).HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
if err := template.Execute(w, status()); err != nil {
s := status()
if !s.Ready {
w.WriteHeader(http.StatusServiceUnavailable)
}
if err := template.Execute(w, s); err != nil {
http.Error(w, "error during template execution", http.StatusInternalServerError)
Log.Error(err)
}
Expand Down
15 changes: 9 additions & 6 deletions prog/weaver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,8 @@ func main() {
}
config.ProtocolMinVersion = byte(protocolMinVersion)

var waitReady common.WaitGroup

if proxyConfig.Enabled {
// Start Weave Proxy:
proxy, err := weaveproxy.NewProxy(*proxyConfig)
Expand All @@ -264,7 +266,7 @@ func main() {
defer proxy.Stop()
listeners := proxy.Listen()
proxy.AttachExistingContainers()
go proxy.Serve(listeners)
go proxy.Serve(listeners, waitReady.Add())
go proxy.ListenAndServeStatus("/home/weave/status.sock")
}

Expand Down Expand Up @@ -432,7 +434,7 @@ func main() {
ns.HandleHTTP(muxRouter, dockerCli)
}
router.HandleHTTP(muxRouter)
HandleHTTP(muxRouter, version, router, allocator, defaultSubnet, ns, dnsserver)
HandleHTTP(muxRouter, version, router, allocator, defaultSubnet, ns, dnsserver, &waitReady)
HandleHTTPPeer(muxRouter, allocator, discoveryEndpoint, token, name.String())
muxRouter.Methods("GET").Path("/metrics").Handler(metricsHandler(router, allocator, ns, dnsserver))
http.Handle("/", common.LoggingHTTPHandler(muxRouter))
Expand All @@ -442,7 +444,7 @@ func main() {

if statusAddr != "" {
muxRouter := mux.NewRouter()
HandleHTTP(muxRouter, version, router, allocator, defaultSubnet, ns, dnsserver)
HandleHTTP(muxRouter, version, router, allocator, defaultSubnet, ns, dnsserver, &waitReady)
muxRouter.Methods("GET").Path("/metrics").Handler(metricsHandler(router, allocator, ns, dnsserver))
statusMux := http.NewServeMux()
statusMux.Handle("/", muxRouter)
Expand All @@ -451,26 +453,27 @@ func main() {
}

if enablePlugin || enablePluginV2 {
go plugin.Start(httpAddr, dockerCli, pluginSocket, pluginMeshSocket, !noDNS, enablePluginV2, enablePluginV2Multicast, defaultSubnet.String())
go plugin.Start(httpAddr, dockerCli, pluginSocket, pluginMeshSocket, !noDNS, enablePluginV2, enablePluginV2Multicast, defaultSubnet.String(), waitReady.Add())
}

if bridgeConfig.AWSVPC {
// Run this on its own goroutine because the allocator can block
// We remove the default route installed by the kernel,
// because awsvpc has installed it as well
go expose(allocator, defaultSubnet, bridgeConfig.WeaveBridgeName, bridgeConfig.AWSVPC)
go expose(allocator, defaultSubnet, bridgeConfig.WeaveBridgeName, bridgeConfig.AWSVPC, waitReady.Add())
}

signals.SignalHandlerLoop(common.Log, router)
}

func expose(alloc *ipam.Allocator, subnet address.CIDR, bridgeName string, removeDefaultRoute bool) {
func expose(alloc *ipam.Allocator, subnet address.CIDR, bridgeName string, removeDefaultRoute bool, ready func()) {
addr, err := alloc.Allocate("weave:expose", subnet, false, func() bool { return false })
checkFatal(err)
cidr := address.MakeCIDR(subnet, addr)
err = weavenet.AddBridgeAddr(bridgeName, cidr.IPNet(), removeDefaultRoute)
checkFatal(err)
Log.Printf("Bridge %q exposed on address %v", bridgeName, cidr)
ready()
}

func options() map[string]string {
Expand Down
6 changes: 5 additions & 1 deletion proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,17 @@ func (proxy *Proxy) Listen() []net.Listener {
return listeners
}

func (proxy *Proxy) Serve(listeners []net.Listener) {
func (proxy *Proxy) Serve(listeners []net.Listener, ready func()) {
errs := make(chan error)
for _, listener := range listeners {
go func(listener net.Listener) {
errs <- (&http.Server{Handler: proxy}).Serve(listener)
}(listener)
}
// It would be better if we could delay calling Done() until all
// the listeners are ready, but it doesn't seem to be possible to
// hook the right point in http.Server
ready()
for range listeners {
err := <-errs
if err != nil {
Expand Down

0 comments on commit 7611cd9

Please sign in to comment.