Skip to content

Commit

Permalink
v5 init
Browse files Browse the repository at this point in the history
  • Loading branch information
urlesistiana committed Nov 29, 2022
1 parent f130472 commit 89af225
Show file tree
Hide file tree
Showing 160 changed files with 5,399 additions and 7,286 deletions.
18 changes: 6 additions & 12 deletions coremain/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,15 @@
package coremain

import (
"github.com/IrineSistiana/mosdns/v4/mlog"
"github.com/IrineSistiana/mosdns/v4/pkg/data_provider"
"github.com/IrineSistiana/mosdns/v4/pkg/utils"
"github.com/IrineSistiana/mosdns/v5/mlog"
"github.com/IrineSistiana/mosdns/v5/pkg/utils"
)

type Config struct {
Log mlog.LogConfig `yaml:"log"`
Include []string `yaml:"include"`
DataProviders []data_provider.DataProviderConfig `yaml:"data_providers"`
Plugins []PluginConfig `yaml:"plugins"`
Servers []ServerConfig `yaml:"servers"`
API APIConfig `yaml:"api"`

// Experimental
Security SecurityConfig `yaml:"security"`
Log mlog.LogConfig `yaml:"log"`
Include []string `yaml:"include"`
Plugins []PluginConfig `yaml:"plugins"`
API APIConfig `yaml:"api"`
}

// PluginConfig represents a plugin config
Expand Down
25 changes: 6 additions & 19 deletions coremain/interface.go → coremain/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,12 @@
package coremain

import (
"github.com/IrineSistiana/mosdns/v4/pkg/executable_seq"
"io"
"fmt"
"net/http"
"runtime/debug"
)

// Plugin represents the basic plugin.
type Plugin interface {
Tag() string
Type() string
io.Closer
}

// ExecutablePlugin represents a Plugin that is Executable.
type ExecutablePlugin interface {
Plugin
executable_seq.Executable
}

// MatcherPlugin represents a Plugin that is a Matcher.
type MatcherPlugin interface {
Plugin
executable_seq.Matcher
func runGC(w http.ResponseWriter, _ *http.Request) {
debug.FreeOSMemory()
_, _ = fmt.Fprint(w, "done")
}
241 changes: 123 additions & 118 deletions coremain/mosdns.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,129 +20,174 @@
package coremain

import (
"errors"
"bytes"
"context"
"fmt"
"github.com/IrineSistiana/mosdns/v4/mlog"
"github.com/IrineSistiana/mosdns/v4/pkg/data_provider"
"github.com/IrineSistiana/mosdns/v4/pkg/executable_seq"
"github.com/IrineSistiana/mosdns/v4/pkg/safe_close"
"github.com/IrineSistiana/mosdns/v5/mlog"
"github.com/IrineSistiana/mosdns/v5/pkg/safe_close"
"github.com/go-chi/chi/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"io"
"net/http"
"net/http/pprof"
"runtime"
"os"
"os/signal"
"runtime/debug"
"syscall"
"time"
)

type Mosdns struct {
ctx context.Context
logger *zap.Logger

// Data
dataManager *data_provider.DataManager

// Plugins
execs map[string]executable_seq.Executable
matchers map[string]executable_seq.Matcher
plugins map[string]Plugin

httpAPIMux *http.ServeMux
httpAPIServer *http.Server
httpMux *chi.Mux

metricsReg *prometheus.Registry

sc *safe_close.SafeClose
}

func RunMosdns(cfg *Config) error {
m, err := NewMosdns(cfg)
if err != nil {
return err
}

go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill, syscall.SIGTERM)
sig := <-c
m.logger.Warn("mosdns is closing", zap.Stringer("sig", sig))
m.sc.SendCloseSignal(nil)
}()
<-m.sc.ReceiveCloseSignal()
m.sc.Done()
for _, plugin := range m.plugins {
if closer, _ := plugin.(io.Closer); closer != nil {
m.logger.Info("closing plugin", zap.String("tag", plugin.Tag()))
_ = closer.Close()
}
}
m.sc.CloseWait()
return m.sc.Err()
}

func (m *Mosdns) GetSafeClose() *safe_close.SafeClose {
return m.sc
}

func (m *Mosdns) GetPlugins(tag string) Plugin {
return m.plugins[tag]
}

// GetMetricsReg returns a prometheus.Registerer with a prefix of "mosdns_"
func (m *Mosdns) GetMetricsReg() prometheus.Registerer {
return prometheus.WrapRegistererWithPrefix("mosdns_", m.metricsReg)
}

func (m *Mosdns) GetAPIRouter() *chi.Mux {
return m.httpMux
}

func (m *Mosdns) RegPluginAPI(tag string, mux *chi.Mux) {
m.httpMux.Mount("/plugins/"+tag, mux)
}

func newMetricsReg() *prometheus.Registry {
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())
return reg
}

func NewMosdns(cfg *Config) (*Mosdns, error) {
// Init logger.
lg, err := mlog.NewLogger(&cfg.Log)
if err != nil {
return fmt.Errorf("failed to init logger: %w", err)
return nil, fmt.Errorf("failed to init logger: %w", err)
}

m := &Mosdns{
logger: lg,
dataManager: data_provider.NewDataManager(),
execs: make(map[string]executable_seq.Executable),
matchers: make(map[string]executable_seq.Matcher),
httpAPIMux: http.NewServeMux(),
metricsReg: newMetricsReg(),
sc: safe_close.NewSafeClose(),
logger: lg,
plugins: make(map[string]Plugin),
httpMux: chi.NewRouter(),
metricsReg: newMetricsReg(),
sc: safe_close.NewSafeClose(),
}

m.httpAPIMux.Handle("/metrics", promhttp.HandlerFor(m.metricsReg, promhttp.HandlerOpts{}))
m.httpAPIMux.HandleFunc("/debug/pprof/", pprof.Index)
m.httpAPIMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
m.httpAPIMux.HandleFunc("/debug/pprof/profile", pprof.Profile)
m.httpAPIMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
m.httpAPIMux.HandleFunc("/debug/pprof/trace", pprof.Trace)

// Init data manager
dupTag := make(map[string]struct{})
for _, dpc := range cfg.DataProviders {
if len(dpc.Tag) == 0 {
continue
}
if _, ok := dupTag[dpc.Tag]; ok {
return fmt.Errorf("duplicated provider tag %s", dpc.Tag)
}
dupTag[dpc.Tag] = struct{}{}
// Register metrics.
m.httpMux.Method(http.MethodGet, "/metrics", promhttp.HandlerFor(m.metricsReg, promhttp.HandlerOpts{}))

dp, err := data_provider.NewDataProvider(lg, dpc)
if err != nil {
return fmt.Errorf("failed to init data provider %s, %w", dpc.Tag, err)
}
m.dataManager.AddDataProvider(dpc.Tag, dp)
// Register pprof.
m.httpMux.Route("/debug/pprof", func(r chi.Router) {
r.Get("/*", pprof.Index)
r.Get("/cmdline", pprof.Cmdline)
r.Get("/profile", pprof.Profile)
r.Get("/symbol", pprof.Symbol)
r.Get("/trace", pprof.Trace)
})

// A helper page for 404.
invalidApiReqHelper := func(w http.ResponseWriter, req *http.Request) {
b := new(bytes.Buffer)
_, _ = fmt.Fprintf(b, "Invalid request %s %s\n\n", req.Method, req.RequestURI)
b.WriteString("Available api urls:\n")
_ = chi.Walk(m.httpMux, func(method string, route string, handler http.Handler, middlewares ...func(http.Handler) http.Handler) error {
b.WriteString(method)
b.WriteByte(' ')
b.WriteString(route)
b.WriteByte('\n')
return nil
})
_, _ = w.Write(b.Bytes())
}
m.httpMux.NotFound(invalidApiReqHelper)
m.httpMux.MethodNotAllowed(invalidApiReqHelper)

// Init preset plugins
for tag, f := range LoadNewPersetPluginFuncs() {
p, err := f(NewBP(tag, "preset", m.logger, m))
bpOpts := BPOpts{
Logger: m.logger.Named(tag),
Mosdns: m,
}
p, err := f(NewBP(tag, "preset", bpOpts))
if err != nil {
return fmt.Errorf("failed to init preset plugin %s, %w", tag, err)
return nil, fmt.Errorf("failed to init preset plugin %s, %w", tag, err)
}
m.addPlugin(p)
m.plugins[tag] = p
}

// Init plugins
dupTag = make(map[string]struct{})
for i, pc := range cfg.Plugins {
if len(pc.Type) == 0 || len(pc.Tag) == 0 {
continue
}
if _, dup := dupTag[pc.Tag]; dup {
return fmt.Errorf("duplicated plugin tag %s", pc.Tag)

if _, dup := m.plugins[pc.Tag]; dup {
return nil, fmt.Errorf("duplicated plugin tag %s", pc.Tag)
}
dupTag[pc.Tag] = struct{}{}

m.logger.Info("loading plugin", zap.String("tag", pc.Tag), zap.String("type", pc.Type))
p, err := NewPlugin(&pc, m.logger, m)
p, err := NewPlugin(&pc, m)
if err != nil {
return fmt.Errorf("failed to init plugin #%d, %w", i, err)
}

m.addPlugin(p)
// Also add it to api mux if plugin implements http.Handler.
if h, ok := p.(http.Handler); ok {
m.httpAPIMux.Handle(fmt.Sprintf("/plugins/%s/", p.Tag()), h)
}
}

if len(cfg.Servers) == 0 {
return errors.New("no server is configured")
}
for i, sc := range cfg.Servers {
if err := m.startServers(&sc); err != nil {
return fmt.Errorf("failed to start server #%d, %w", i, err)
return nil, fmt.Errorf("failed to init plugin #%d %s, %w", i, pc.Tag, err)
}
m.plugins[pc.Tag] = p
}
m.logger.Info("all plugins are loaded")

// Start http api server
if httpAddr := cfg.API.HTTP; len(httpAddr) > 0 {
httpServer := &http.Server{
Addr: httpAddr,
Handler: m.httpAPIMux,
Handler: m.httpMux,
}
m.sc.Attach(func(done func(), closeSignal <-chan struct{}) {
defer done()
Expand All @@ -155,64 +200,24 @@ func RunMosdns(cfg *Config) error {
case err := <-errChan:
m.sc.SendCloseSignal(err)
case <-closeSignal:
httpServer.Close()
_ = httpServer.Close()
}
})
}

// Run GC to release memory asap.
time.AfterFunc(time.Second*1, func() {
runtime.GC()
debug.FreeOSMemory()
})
<-m.sc.ReceiveCloseSignal()
m.sc.Done()
m.sc.CloseWait()
return m.sc.Err()
return m, nil
}

func (m *Mosdns) addPlugin(p Plugin) {
t := p.Tag()
if p, ok := p.(ExecutablePlugin); ok {
m.execs[t] = p
}
if p, ok := p.(MatcherPlugin); ok {
m.matchers[p.Tag()] = p
func NewTestMosdns(plugins map[string]Plugin) *Mosdns {
return &Mosdns{
logger: mlog.Nop(),
plugins: plugins,
httpMux: chi.NewRouter(),
metricsReg: newMetricsReg(),
sc: safe_close.NewSafeClose(),
}
}

func (m *Mosdns) GetDataManager() *data_provider.DataManager {
return m.dataManager
}

func (m *Mosdns) GetSafeClose() *safe_close.SafeClose {
return m.sc
}

func (m *Mosdns) GetExecutables() map[string]executable_seq.Executable {
return m.execs
}

func (m *Mosdns) GetMatchers() map[string]executable_seq.Matcher {
return m.matchers
}

// GetMetricsReg returns a prometheus.Registerer with a prefix of "mosdns_"
func (m *Mosdns) GetMetricsReg() prometheus.Registerer {
return prometheus.WrapRegistererWithPrefix("mosdns_", m.metricsReg)
}

// GetHTTPAPIMux returns the api http.ServeMux.
// The pattern "/plugins/plugin_tag/" has been registered if
// Plugin implements http.Handler interface.
// Plugin caller should register path that has "/plugins/plugin_tag/"
// prefix only.
func (m *Mosdns) GetHTTPAPIMux() *http.ServeMux {
return m.httpAPIMux
}

func newMetricsReg() *prometheus.Registry {
reg := prometheus.NewRegistry()
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())
return reg
}
Loading

0 comments on commit 89af225

Please sign in to comment.