Skip to content

Commit

Permalink
First attempt to fix #198
Browse files Browse the repository at this point in the history
  • Loading branch information
vqhuy committed Jan 10, 2018
1 parent 295c4ad commit 536d5cb
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 95 deletions.
5 changes: 1 addition & 4 deletions application/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,5 @@ func SaveConfig(file string, conf AppConfig) error {
if err := e.Encode(conf); err != nil {
return err
}
if err := utils.WriteFile(file, confBuf.Bytes(), 0644); err != nil {
return err
}
return nil
return utils.WriteFile(file, confBuf.Bytes(), 0644)
}
76 changes: 18 additions & 58 deletions application/server/server.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package server

import (
"time"

"github.com/coniks-sys/coniks-go/application"
"github.com/coniks-sys/coniks-go/protocol"
"github.com/coniks-sys/coniks-go/protocol/directory"
Expand Down Expand Up @@ -32,12 +30,9 @@ type Address struct {
// at regular time intervals.
type ConiksServer struct {
*application.ServerBase
dir *directory.ConiksDirectory
epochTimer *time.Timer
dir *directory.ConiksDirectory
}

var _ application.Server = (*ConiksServer)(nil)

// NewConiksServer creates a new reference implementation of
// a CONIKS key server.
func NewConiksServer(conf *Config) *ConiksServer {
Expand Down Expand Up @@ -65,25 +60,11 @@ func NewConiksServer(conf *Config) *ConiksServer {
conf.Policies.signKey,
conf.LoadedHistoryLength,
true),
epochTimer: time.NewTimer(time.Duration(conf.Policies.EpochDeadline) * time.Second),
}

return server
}

// EpochUpdate runs a CONIKS key server's directory epoch update procedure.
func (server *ConiksServer) EpochUpdate() {
server.epochUpdate()
server.WaitStopDone()
}

// ConfigHotReload implements hot-reloading the configuration by
// listening for SIGUSR2 signal.
func (server *ConiksServer) ConfigHotReload() {
server.updatePolicies()
server.WaitStopDone()
}

// HandleRequests validates the request message and passes it to the
// appropriate operation handler according to the request type.
func (server *ConiksServer) HandleRequests(req *protocol.Request) *protocol.Response {
Expand Down Expand Up @@ -113,8 +94,9 @@ func (server *ConiksServer) HandleRequests(req *protocol.Request) *protocol.Resp
// It listens for all declared connections with corresponding
// permissions.
func (server *ConiksServer) Run(addrs []*Address) {
server.WaitStopAdd()
go server.EpochUpdate()
server.RunInBackground(func() {
server.EpochUpdate(server.dir.Update)
})

hasRegistrationPerm := false
for i := 0; i < len(addrs); i++ {
Expand All @@ -131,44 +113,22 @@ func (server *ConiksServer) Run(addrs []*Address) {
server.Logger().Warn("None of the addresses permit registration")
}

server.WaitStopAdd()
go server.ConfigHotReload()
}

func (server *ConiksServer) epochUpdate() {
for {
select {
case <-server.Stop():
return
case <-server.epochTimer.C:
server.Lock()
server.dir.Update()
server.epochTimer.Reset(time.Duration(server.dir.EpochDeadline()) * time.Second)
server.Unlock()
}
}
server.RunInBackground(func() {
server.HotReload(server.updatePolicies)
})
}

func (server *ConiksServer) updatePolicies() {
for {
select {
case <-server.Stop():
return
case <-server.ReloadChan():
// read server policies from config file
tmp, err := application.LoadConfig(server.ConfigFilePath())
if err != nil {
// error occured while reading server config
// simply abort the reloading policies
// process
server.Logger().Error(err.Error())
return
}
conf := tmp.(*Config)
server.Lock()
server.dir.SetPolicies(conf.Policies.EpochDeadline)
server.Unlock()
server.Logger().Info("Policies reloaded!")
}
// read server policies from config file
tmp, err := application.LoadConfig(server.ConfigFilePath())
if err != nil {
// error occured while reading server config
// simply abort the reloading policies
// process
server.Logger().Error(err.Error())
return
}
conf := tmp.(*Config)
server.dir.SetPolicies(conf.Policies.EpochDeadline)
server.Logger().Info("Policies reloaded!")
}
7 changes: 5 additions & 2 deletions application/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"bytes"
"encoding/json"
"math/rand"
"path"
"syscall"
"testing"
Expand Down Expand Up @@ -80,6 +81,7 @@ func newTestServer(t *testing.T, epDeadline protocol.Timestamp, useBot bool,
Environment: "development",
Path: path.Join(dir, "coniksserver.log"),
},
EpochDeadline: epDeadline,
},
LoadedHistoryLength: 100,
Addresses: addrs,
Expand Down Expand Up @@ -107,10 +109,11 @@ func TestServerStartStop(t *testing.T) {
}

func TestServerReloadPoliciesWithError(t *testing.T) {
server, teardown := startServer(t, 1, true, "")
deadline := protocol.Timestamp(rand.Int())
server, teardown := startServer(t, deadline, true, "")
defer teardown()
syscall.Kill(syscall.Getpid(), syscall.SIGUSR2)
if server.dir.EpochDeadline() != 1 {
if server.dir.EpochDeadline() != deadline {
t.Fatal("Expect the server's policies not change")
}
// just to make sure the server's still running normally
Expand Down
79 changes: 49 additions & 30 deletions application/serverbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@ import (
"github.com/coniks-sys/coniks-go/protocol"
)

// A Server is a generic interface used to implement CONIKS key servers
// and auditors.
// CONIKS server's must implement a request handler, an epoch update
// procedure, and hot-reloading the configuration.
type Server interface {
EpochUpdate()
ConfigHotReload()
HandleRequests(*protocol.Request) *protocol.Response
}

// A ServerBaseConfig contains configuration values
// which are read at initialization time from
// a TOML format configuration file.
type ServerBaseConfig struct {
Logger *LoggerConfig `toml:"logger"`
ConfigFilePath string
Logger *LoggerConfig `toml:"logger"`
ConfigFilePath string `toml:"config_file_path"`
EpochDeadline protocol.Timestamp `toml:"epoch_deadline"`
}

// EpochTimer consists of a `time.Timer` and the epoch deadline value.
type EpochTimer struct {
*time.Timer
duration time.Duration
}

// A ServerBase represents the base features needed to implement
Expand All @@ -42,6 +39,8 @@ type ServerBase struct {
Verb string
acceptableReqs map[*ServerAddress]map[int]bool

epochTimer *EpochTimer

logger *Logger
sync.RWMutex

Expand Down Expand Up @@ -78,6 +77,10 @@ func NewServerBase(conf *ServerBaseConfig, listenVerb string,
sb := new(ServerBase)
sb.Verb = listenVerb
sb.acceptableReqs = perms
sb.epochTimer = &EpochTimer{
Timer: time.NewTimer(time.Duration(conf.EpochDeadline) * time.Second),
duration: time.Duration(conf.EpochDeadline) * time.Second,
}
sb.logger = NewLogger(conf.Logger)
sb.stop = make(chan struct{})
sb.configFilePath = conf.ConfigFilePath
Expand Down Expand Up @@ -250,29 +253,45 @@ func (sb *ServerBase) acceptClient(addr *ServerAddress, conn net.Conn,
}
}

// TODO: Remove/refactor these getters. We would be happier if we didn't
// have to expose the WaitGroup to the server/auditor at all, and maybe
// we can export some of these other fields.

// WaitStopAdd increments the server base's waitgroup counter.
func (sb *ServerBase) WaitStopAdd() {
// RunInBackground creates a new goroutine that calls function `f`.
// It automatically increments the counter `sync.WaitGroup` of the `ServerBase`
// and calls `Done` when the function execution is finished.
func (sb *ServerBase) RunInBackground(f func()) {
sb.waitStop.Add(1)
go func() {
f()
sb.waitStop.Done()
}()
}

// WaitStopDone is a wrapper around waitgroup's Done(), which
// decrements the WaitGroup counter by one.
func (sb *ServerBase) WaitStopDone() {
sb.waitStop.Done()
}

// Stop returns the server base's stop channel.
func (sb *ServerBase) Stop() chan struct{} {
return sb.stop
// EpochUpdate runs function `f` supposed to be a CONIK's epoch update procedure
// every epoch.
func (sb *ServerBase) EpochUpdate(f func()) {
for {
select {
case <-sb.stop:
return
case <-sb.epochTimer.C:
sb.Lock()
f()
sb.epochTimer.Reset(sb.epochTimer.duration)
sb.Unlock()
}
}
}

// ReloadChan returns the server base's configuration reload channel.
func (sb *ServerBase) ReloadChan() chan os.Signal {
return sb.reloadChan
// HotReload implements hot-reloading by listening for SIGUSR2 signal.
func (sb *ServerBase) HotReload(f func()) {
for {
select {
case <-sb.stop:
return
case <-sb.reloadChan:
sb.Lock()
f()
sb.Unlock()
}
}
}

// Logger returns the server base's logger instance.
Expand Down
2 changes: 1 addition & 1 deletion cli/coniksserver/internal/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func run(cmd *cobra.Command, args []string) {
if pid {
writePID()
}
var conf *server.Config = &server.Config{}
conf := &server.Config{}
err := conf.Load(confPath)
if err != nil {
log.Fatal(err)
Expand Down

0 comments on commit 536d5cb

Please sign in to comment.