diff --git a/Makefile b/Makefile index f54c6ea8..508c9fb7 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ TARGET = ./cc-backend VAR = ./var CFG = config.json .env FRONTEND = ./web/frontend -VERSION = 1.3.1 +VERSION = 1.4.0 GIT_HASH := $(shell git rev-parse --short HEAD || echo 'development') CURRENT_TIME = $(shell date +"%Y-%m-%d:T%H:%M:%S") LD_FLAGS = '-s -X main.date=${CURRENT_TIME} -X main.version=${VERSION} -X main.commit=${GIT_HASH}' diff --git a/cmd/cc-backend/cli.go b/cmd/cc-backend/cli.go new file mode 100644 index 00000000..f828a249 --- /dev/null +++ b/cmd/cc-backend/cli.go @@ -0,0 +1,33 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import "flag" + +var ( + flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool + flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string +) + +func cliInit() { + flag.BoolVar(&flagInit, "init", false, "Setup var directory, initialize swlite database file, config.json and .env") + flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") + flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap") + flag.BoolVar(&flagServer, "server", false, "Start a server, continues listening on port after initialization and argument handling") + flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)") + flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI") + flag.BoolVar(&flagVersion, "version", false, "Show version information and exit") + flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit") + flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit") + flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") + flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") + flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") + flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `:[admin,support,manager,api,user]:`") + flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`") + flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`") + flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `:,...`") + flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") + flag.Parse() +} diff --git a/cmd/cc-backend/init.go b/cmd/cc-backend/init.go new file mode 100644 index 00000000..5a00a113 --- /dev/null +++ b/cmd/cc-backend/init.go @@ -0,0 +1,85 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "fmt" + "os" + + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/internal/util" + "github.com/ClusterCockpit/cc-backend/pkg/log" +) + +const envString = ` +# Base64 encoded Ed25519 keys (DO NOT USE THESE TWO IN PRODUCTION!) +# You can generate your own keypair using the gen-keypair tool +JWT_PUBLIC_KEY="kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" +JWT_PRIVATE_KEY="dtPC/6dWJFKZK7KZ78CvWuynylOmjBFyMsUWArwmodOTN9itjL5POlqdZkcnmpJ0yPm4pRaCrvgFaFAbpyik/Q==" + +# Some random bytes used as secret for cookie-based sessions (DO NOT USE THIS ONE IN PRODUCTION) +SESSION_KEY="67d829bf61dc5f87a73fd814e2c9f629" +` + +const configString = ` +{ + "addr": "127.0.0.1:8080", + "archive": { + "kind": "file", + "path": "./var/job-archive" + }, + "jwts": { + "max-age": "2000h" + }, + "clusters": [ + { + "name": "name", + "metricDataRepository": { + "kind": "cc-metric-store", + "url": "http://localhost:8082", + "token": "" + }, + "filterRanges": { + "numNodes": { + "from": 1, + "to": 64 + }, + "duration": { + "from": 0, + "to": 86400 + }, + "startTime": { + "from": "2023-01-01T00:00:00Z", + "to": null + } + } + } + ] +} +` + +func initEnv() { + if util.CheckFileExists("var") { + fmt.Print("Directory ./var already exists. Exiting!\n") + os.Exit(0) + } + + if err := os.WriteFile("config.json", []byte(configString), 0o666); err != nil { + log.Fatalf("Writing config.json failed: %s", err.Error()) + } + + if err := os.WriteFile(".env", []byte(envString), 0o666); err != nil { + log.Fatalf("Writing .env failed: %s", err.Error()) + } + + if err := os.Mkdir("var", 0o777); err != nil { + log.Fatalf("Mkdir var failed: %s", err.Error()) + } + + err := repository.MigrateDB("sqlite3", "./var/job.db") + if err != nil { + log.Fatalf("Initialize job.db failed: %s", err.Error()) + } +} diff --git a/cmd/cc-backend/main.go b/cmd/cc-backend/main.go index abe453a3..e1546c7a 100644 --- a/cmd/cc-backend/main.go +++ b/cmd/cc-backend/main.go @@ -5,157 +5,47 @@ package main import ( - "context" - "crypto/tls" - "encoding/json" - "errors" - "flag" "fmt" - "io" - "net" - "net/http" "os" "os/signal" "runtime/debug" "strings" "sync" "syscall" - "time" - "github.com/99designs/gqlgen/graphql/handler" - "github.com/99designs/gqlgen/graphql/playground" - "github.com/ClusterCockpit/cc-backend/internal/api" "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" - "github.com/ClusterCockpit/cc-backend/internal/graph" - "github.com/ClusterCockpit/cc-backend/internal/graph/generated" "github.com/ClusterCockpit/cc-backend/internal/importer" "github.com/ClusterCockpit/cc-backend/internal/metricdata" "github.com/ClusterCockpit/cc-backend/internal/repository" - "github.com/ClusterCockpit/cc-backend/internal/routerConfig" "github.com/ClusterCockpit/cc-backend/internal/taskManager" - "github.com/ClusterCockpit/cc-backend/internal/util" "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/ClusterCockpit/cc-backend/pkg/runtimeEnv" "github.com/ClusterCockpit/cc-backend/pkg/schema" - "github.com/ClusterCockpit/cc-backend/web" "github.com/google/gops/agent" - "github.com/gorilla/handlers" - "github.com/gorilla/mux" - httpSwagger "github.com/swaggo/http-swagger" _ "github.com/go-sql-driver/mysql" _ "github.com/mattn/go-sqlite3" ) const logoString = ` - ____ _ _ ____ _ _ _ -/ ___| |_ _ ___| |_ ___ _ __ / ___|___ ___| | ___ __ (_) |_ + _____ _ _ ____ _ _ _ +/ ___| |_ _ ___| |_ ___ _ __ / ___|___ ___| | ___ __ (_) |_ | | | | | | / __| __/ _ \ '__| | / _ \ / __| |/ / '_ \| | __| | |___| | |_| \__ \ || __/ | | |__| (_) | (__| <| |_) | | |_ -\____|_|\__,_|___/\__\___|_| \____\___/ \___|_|\_\ .__/|_|\__| +\_____|_|\__,_|___/\__\___|_| \____\___/ \___|_|\_\ .__/|_|\__| |_| ` -const envString = ` -# Base64 encoded Ed25519 keys (DO NOT USE THESE TWO IN PRODUCTION!) -# You can generate your own keypair using the gen-keypair tool -JWT_PUBLIC_KEY="kzfYrYy+TzpanWZHJ5qSdMj5uKUWgq74BWhQG6copP0=" -JWT_PRIVATE_KEY="dtPC/6dWJFKZK7KZ78CvWuynylOmjBFyMsUWArwmodOTN9itjL5POlqdZkcnmpJ0yPm4pRaCrvgFaFAbpyik/Q==" - -# Some random bytes used as secret for cookie-based sessions (DO NOT USE THIS ONE IN PRODUCTION) -SESSION_KEY="67d829bf61dc5f87a73fd814e2c9f629" -` - -const configString = ` -{ - "addr": "127.0.0.1:8080", - "archive": { - "kind": "file", - "path": "./var/job-archive" - }, - "jwts": { - "max-age": "2000h" - }, - "clusters": [ - { - "name": "name", - "metricDataRepository": { - "kind": "cc-metric-store", - "url": "http://localhost:8082", - "token": "" - }, - "filterRanges": { - "numNodes": { - "from": 1, - "to": 64 - }, - "duration": { - "from": 0, - "to": 86400 - }, - "startTime": { - "from": "2023-01-01T00:00:00Z", - "to": null - } - } - } - ] -} -` - var ( date string commit string version string ) -func initEnv() { - if util.CheckFileExists("var") { - fmt.Print("Directory ./var already exists. Exiting!\n") - os.Exit(0) - } - - if err := os.WriteFile("config.json", []byte(configString), 0o666); err != nil { - log.Fatalf("Writing config.json failed: %s", err.Error()) - } - - if err := os.WriteFile(".env", []byte(envString), 0o666); err != nil { - log.Fatalf("Writing .env failed: %s", err.Error()) - } - - if err := os.Mkdir("var", 0o777); err != nil { - log.Fatalf("Mkdir var failed: %s", err.Error()) - } - - err := repository.MigrateDB("sqlite3", "./var/job.db") - if err != nil { - log.Fatalf("Initialize job.db failed: %s", err.Error()) - } -} - func main() { - var flagReinitDB, flagInit, flagServer, flagSyncLDAP, flagGops, flagMigrateDB, flagRevertDB, flagForceDB, flagDev, flagVersion, flagLogDateTime bool - var flagNewUser, flagDelUser, flagGenJWT, flagConfigFile, flagImportJob, flagLogLevel string - flag.BoolVar(&flagInit, "init", false, "Setup var directory, initialize swlite database file, config.json and .env") - flag.BoolVar(&flagReinitDB, "init-db", false, "Go through job-archive and re-initialize the 'job', 'tag', and 'jobtag' tables (all running jobs will be lost!)") - flag.BoolVar(&flagSyncLDAP, "sync-ldap", false, "Sync the 'user' table with ldap") - flag.BoolVar(&flagServer, "server", false, "Start a server, continues listening on port after initialization and argument handling") - flag.BoolVar(&flagGops, "gops", false, "Listen via github.com/google/gops/agent (for debugging)") - flag.BoolVar(&flagDev, "dev", false, "Enable development components: GraphQL Playground and Swagger UI") - flag.BoolVar(&flagVersion, "version", false, "Show version information and exit") - flag.BoolVar(&flagMigrateDB, "migrate-db", false, "Migrate database to supported version and exit") - flag.BoolVar(&flagRevertDB, "revert-db", false, "Migrate database to previous version and exit") - flag.BoolVar(&flagForceDB, "force-db", false, "Force database version, clear dirty flag and exit") - flag.BoolVar(&flagLogDateTime, "logdate", false, "Set this flag to add date and time to log messages") - flag.StringVar(&flagConfigFile, "config", "./config.json", "Specify alternative path to `config.json`") - flag.StringVar(&flagNewUser, "add-user", "", "Add a new user. Argument format: `:[admin,support,manager,api,user]:`") - flag.StringVar(&flagDelUser, "del-user", "", "Remove user by `username`") - flag.StringVar(&flagGenJWT, "jwt", "", "Generate and print a JWT for the user specified by its `username`") - flag.StringVar(&flagImportJob, "import-job", "", "Import a job. Argument format: `:,...`") - flag.StringVar(&flagLogLevel, "loglevel", "warn", "Sets the logging level: `[debug,info,warn (default),err,fatal,crit]`") - flag.Parse() + cliInit() if flagVersion { fmt.Print(logoString) @@ -170,14 +60,6 @@ func main() { // Apply config flags for pkg/log log.Init(flagLogLevel, flagLogDateTime) - if flagInit { - initEnv() - fmt.Print("Succesfully setup environment!\n") - fmt.Print("Please review config.json and .env and adjust it to your needs.\n") - fmt.Print("Add your job-archive at ./var/job-archive.\n") - os.Exit(0) - } - // See https://github.com/google/gops (Runtime overhead is almost zero) if flagGops { if err := agent.Listen(agent.Options{}); err != nil { @@ -201,6 +83,8 @@ func main() { config.Keys.DB = os.Getenv(envvar) } + repository.Connect(config.Keys.DBDriver, config.Keys.DB) + if flagMigrateDB { err := repository.MigrateDB(config.Keys.DBDriver, config.Keys.DB) if err != nil { @@ -225,19 +109,17 @@ func main() { os.Exit(0) } - repository.Connect(config.Keys.DBDriver, config.Keys.DB) - db := repository.GetConnection() + if flagInit { + initEnv() + fmt.Print("Succesfully setup environment!\n") + fmt.Print("Please review config.json and .env and adjust it to your needs.\n") + fmt.Print("Add your job-archive at ./var/job-archive.\n") + os.Exit(0) + } - var authentication *auth.Authentication if !config.Keys.DisableAuthentication { - var err error - if authentication, err = auth.Init(); err != nil { - log.Fatalf("auth initialization failed: %v", err) - } - if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err != nil { - authentication.SessionMaxAge = d - } + auth.Init() if flagNewUser != "" { parts := strings.SplitN(flagNewUser, ":", 3) @@ -259,12 +141,14 @@ func main() { } } + authHandle := auth.GetAuthInstance() + if flagSyncLDAP { - if authentication.LdapAuth == nil { + if authHandle.LdapAuth == nil { log.Fatal("cannot sync: LDAP authentication is not configured") } - if err := authentication.LdapAuth.Sync(); err != nil { + if err := authHandle.LdapAuth.Sync(); err != nil { log.Fatalf("LDAP sync failed: %v", err) } log.Info("LDAP sync successfull") @@ -281,7 +165,7 @@ func main() { log.Warnf("user '%s' does not have the API role", user.Username) } - jwt, err := authentication.JwtAuth.ProvideJWT(user) + jwt, err := authHandle.JwtAuth.ProvideJWT(user) if err != nil { log.Fatalf("failed to provide JWT to user '%s': %v", user.Username, err) } @@ -317,299 +201,22 @@ func main() { return } - // Setup the http.Handler/Router used by the server - jobRepo := repository.GetJobRepository() - resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo} - graphQLEndpoint := handler.NewDefaultServer(generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) - if os.Getenv("DEBUG") != "1" { - // Having this handler means that a error message is returned via GraphQL instead of the connection simply beeing closed. - // The problem with this is that then, no more stacktrace is printed to stderr. - graphQLEndpoint.SetRecoverFunc(func(ctx context.Context, err interface{}) error { - switch e := err.(type) { - case string: - return fmt.Errorf("MAIN > Panic: %s", e) - case error: - return fmt.Errorf("MAIN > Panic caused by: %w", e) - } - - return errors.New("MAIN > Internal server error (panic)") - }) - } - - api := &api.RestApi{ - JobRepository: jobRepo, - Resolver: resolver, - MachineStateDir: config.Keys.MachineStateDir, - Authentication: authentication, - } - - r := mux.NewRouter() - buildInfo := web.Build{Version: version, Hash: commit, Buildtime: date} - - info := map[string]interface{}{} - info["hasOpenIDConnect"] = false - - if config.Keys.OpenIDConfig != nil { - openIDConnect := auth.NewOIDC(authentication) - openIDConnect.RegisterEndpoints(r) - info["hasOpenIDConnect"] = true - } - - r.HandleFunc("/login", func(rw http.ResponseWriter, r *http.Request) { - rw.Header().Add("Content-Type", "text/html; charset=utf-8") - log.Debugf("##%v##", info) - web.RenderTemplate(rw, "login.tmpl", &web.Page{Title: "Login", Build: buildInfo, Infos: info}) - }).Methods(http.MethodGet) - r.HandleFunc("/imprint", func(rw http.ResponseWriter, r *http.Request) { - rw.Header().Add("Content-Type", "text/html; charset=utf-8") - web.RenderTemplate(rw, "imprint.tmpl", &web.Page{Title: "Imprint", Build: buildInfo}) - }) - r.HandleFunc("/privacy", func(rw http.ResponseWriter, r *http.Request) { - rw.Header().Add("Content-Type", "text/html; charset=utf-8") - web.RenderTemplate(rw, "privacy.tmpl", &web.Page{Title: "Privacy", Build: buildInfo}) - }) - - secured := r.PathPrefix("/").Subrouter() - securedapi := r.PathPrefix("/api").Subrouter() - userapi := r.PathPrefix("/userapi").Subrouter() - configapi := r.PathPrefix("/config").Subrouter() - frontendapi := r.PathPrefix("/frontend").Subrouter() - - if !config.Keys.DisableAuthentication { - r.Handle("/login", authentication.Login( - // On success: - http.RedirectHandler("/", http.StatusTemporaryRedirect), - - // On failure: - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "text/html; charset=utf-8") - rw.WriteHeader(http.StatusUnauthorized) - web.RenderTemplate(rw, "login.tmpl", &web.Page{ - Title: "Login failed - ClusterCockpit", - MsgType: "alert-warning", - Message: err.Error(), - Build: buildInfo, - Infos: info, - }) - })).Methods(http.MethodPost) - - r.Handle("/jwt-login", authentication.Login( - // On success: - http.RedirectHandler("/", http.StatusTemporaryRedirect), - - // On failure: - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "text/html; charset=utf-8") - rw.WriteHeader(http.StatusUnauthorized) - web.RenderTemplate(rw, "login.tmpl", &web.Page{ - Title: "Login failed - ClusterCockpit", - MsgType: "alert-warning", - Message: err.Error(), - Build: buildInfo, - Infos: info, - }) - })) - - r.Handle("/logout", authentication.Logout( - http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - rw.Header().Add("Content-Type", "text/html; charset=utf-8") - rw.WriteHeader(http.StatusOK) - web.RenderTemplate(rw, "login.tmpl", &web.Page{ - Title: "Bye - ClusterCockpit", - MsgType: "alert-info", - Message: "Logout successful", - Build: buildInfo, - Infos: info, - }) - }))).Methods(http.MethodPost) - - secured.Use(func(next http.Handler) http.Handler { - return authentication.Auth( - // On success; - next, - - // On failure: - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.WriteHeader(http.StatusUnauthorized) - web.RenderTemplate(rw, "login.tmpl", &web.Page{ - Title: "Authentication failed - ClusterCockpit", - MsgType: "alert-danger", - Message: err.Error(), - Build: buildInfo, - Infos: info, - }) - }) - }) - - securedapi.Use(func(next http.Handler) http.Handler { - return authentication.AuthApi( - // On success; - next, - - // On failure: JSON Response - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusUnauthorized) - json.NewEncoder(rw).Encode(map[string]string{ - "status": http.StatusText(http.StatusUnauthorized), - "error": err.Error(), - }) - }) - }) - - userapi.Use(func(next http.Handler) http.Handler { - return authentication.AuthUserApi( - // On success; - next, - - // On failure: JSON Response - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusUnauthorized) - json.NewEncoder(rw).Encode(map[string]string{ - "status": http.StatusText(http.StatusUnauthorized), - "error": err.Error(), - }) - }) - }) - - configapi.Use(func(next http.Handler) http.Handler { - return authentication.AuthConfigApi( - // On success; - next, - - // On failure: JSON Response - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusUnauthorized) - json.NewEncoder(rw).Encode(map[string]string{ - "status": http.StatusText(http.StatusUnauthorized), - "error": err.Error(), - }) - }) - }) - - frontendapi.Use(func(next http.Handler) http.Handler { - return authentication.AuthFrontendApi( - // On success; - next, - - // On failure: JSON Response - func(rw http.ResponseWriter, r *http.Request, err error) { - rw.Header().Add("Content-Type", "application/json") - rw.WriteHeader(http.StatusUnauthorized) - json.NewEncoder(rw).Encode(map[string]string{ - "status": http.StatusText(http.StatusUnauthorized), - "error": err.Error(), - }) - }) - }) - } - - if flagDev { - r.Handle("/playground", playground.Handler("GraphQL playground", "/query")) - r.PathPrefix("/swagger/").Handler(httpSwagger.Handler( - httpSwagger.URL("http://" + config.Keys.Addr + "/swagger/doc.json"))).Methods(http.MethodGet) - } - secured.Handle("/query", graphQLEndpoint) - - // Send a searchId and then reply with a redirect to a user, or directly send query to job table for jobid and project. - secured.HandleFunc("/search", func(rw http.ResponseWriter, r *http.Request) { - routerConfig.HandleSearchBar(rw, r, buildInfo) - }) - - // Mount all /monitoring/... and /api/... routes. - routerConfig.SetupRoutes(secured, buildInfo) - api.MountApiRoutes(securedapi) - api.MountUserApiRoutes(userapi) - api.MountConfigApiRoutes(configapi) - api.MountFrontendApiRoutes(frontendapi) - - if config.Keys.EmbedStaticFiles { - if i, err := os.Stat("./var/img"); err == nil { - if i.IsDir() { - log.Info("Use local directory for static images") - r.PathPrefix("/img/").Handler(http.StripPrefix("/img/", http.FileServer(http.Dir("./var/img")))) - } - } - r.PathPrefix("/").Handler(web.ServeFiles()) - } else { - r.PathPrefix("/").Handler(http.FileServer(http.Dir(config.Keys.StaticFiles))) - } - - r.Use(handlers.CompressHandler) - r.Use(handlers.RecoveryHandler(handlers.PrintRecoveryStack(true))) - r.Use(handlers.CORS( - handlers.AllowCredentials(), - handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization", "Origin"}), - handlers.AllowedMethods([]string{"GET", "POST", "HEAD", "OPTIONS"}), - handlers.AllowedOrigins([]string{"*"}))) - handler := handlers.CustomLoggingHandler(io.Discard, r, func(_ io.Writer, params handlers.LogFormatterParams) { - if strings.HasPrefix(params.Request.RequestURI, "/api/") { - log.Debugf("%s %s (%d, %.02fkb, %dms)", - params.Request.Method, params.URL.RequestURI(), - params.StatusCode, float32(params.Size)/1024, - time.Since(params.TimeStamp).Milliseconds()) - } else { - log.Debugf("%s %s (%d, %.02fkb, %dms)", - params.Request.Method, params.URL.RequestURI(), - params.StatusCode, float32(params.Size)/1024, - time.Since(params.TimeStamp).Milliseconds()) - } - }) - - var wg sync.WaitGroup - server := http.Server{ - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - Handler: handler, - Addr: config.Keys.Addr, - } - - // Start http or https server - listener, err := net.Listen("tcp", config.Keys.Addr) - if err != nil { - log.Fatalf("starting http listener failed: %v", err) - } - - if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHttpTo != "" { - go func() { - http.ListenAndServe(":80", http.RedirectHandler(config.Keys.RedirectHttpTo, http.StatusMovedPermanently)) - }() - } - - if config.Keys.HttpsCertFile != "" && config.Keys.HttpsKeyFile != "" { - cert, err := tls.LoadX509KeyPair(config.Keys.HttpsCertFile, config.Keys.HttpsKeyFile) - if err != nil { - log.Fatalf("loading X509 keypair failed: %v", err) - } - listener = tls.NewListener(listener, &tls.Config{ - Certificates: []tls.Certificate{cert}, - CipherSuites: []uint16{ - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - }, - MinVersion: tls.VersionTLS12, - PreferServerCipherSuites: true, - }) - fmt.Printf("HTTPS server listening at %s...", config.Keys.Addr) - } else { - fmt.Printf("HTTP server listening at %s...", config.Keys.Addr) - } + taskManager.Start() + serverInit() // Because this program will want to bind to a privileged port (like 80), the listener must // be established first, then the user can be changed, and after that, // the actual http server can be started. - if err = runtimeEnv.DropPrivileges(config.Keys.Group, config.Keys.User); err != nil { + if err := runtimeEnv.DropPrivileges(config.Keys.Group, config.Keys.User); err != nil { log.Fatalf("error while preparing server start: %s", err.Error()) } + var wg sync.WaitGroup + wg.Add(1) go func() { defer wg.Done() - if err = server.Serve(listener); err != nil && err != http.ErrServerClosed { - log.Fatalf("starting server failed: %v", err) - } + serverStart() }() wg.Add(1) @@ -620,55 +227,9 @@ func main() { <-sigs runtimeEnv.SystemdNotifiy(false, "Shutting down ...") - // First shut down the server gracefully (waiting for all ongoing requests) - server.Shutdown(context.Background()) - - // Then, wait for any async archivings still pending... - api.JobRepository.WaitForArchiving() + taskManager.Shutdown() }() - if config.Keys.StopJobsExceedingWalltime > 0 { - taskManager.RegisterStopJobsExceedTime() - } - - var cfg struct { - Retention schema.Retention `json:"retention"` - Compression int `json:"compression"` - } - cfg.Retention.IncludeDB = true - - if err = json.Unmarshal(config.Keys.Archive, &cfg); err != nil { - log.Warn("Error while unmarshaling raw config json") - } - - taskManager.RegisterRetentionService(cfg.Retention) - - if cfg.Compression > 0 { - log.Info("Register compression service") - - s.Every(1).Day().At("5:00").Do(func() { - var jobs []*schema.Job - - ar := archive.GetHandle() - startTime := time.Now().Unix() - int64(cfg.Compression*24*3600) - lastTime := ar.CompressLast(startTime) - if startTime == lastTime { - log.Info("Compression Service - Complete archive run") - jobs, err = jobRepo.FindJobsBetween(0, startTime) - - } else { - jobs, err = jobRepo.FindJobsBetween(lastTime, startTime) - } - - if err != nil { - log.Warnf("Error while looking for compression jobs: %v", err) - } - ar.Compress(jobs) - }) - } - - s.StartAsync() - if os.Getenv("GOGC") == "" { debug.SetGCPercent(25) } diff --git a/cmd/cc-backend/server.go b/cmd/cc-backend/server.go new file mode 100644 index 00000000..5531415f --- /dev/null +++ b/cmd/cc-backend/server.go @@ -0,0 +1,335 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package main + +import ( + "context" + "crypto/tls" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "os" + "strings" + "time" + + "github.com/99designs/gqlgen/graphql/handler" + "github.com/99designs/gqlgen/graphql/playground" + "github.com/ClusterCockpit/cc-backend/internal/api" + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/internal/config" + "github.com/ClusterCockpit/cc-backend/internal/graph" + "github.com/ClusterCockpit/cc-backend/internal/graph/generated" + "github.com/ClusterCockpit/cc-backend/internal/routerConfig" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/web" + "github.com/gorilla/handlers" + "github.com/gorilla/mux" + httpSwagger "github.com/swaggo/http-swagger" +) + +var ( + router *mux.Router + server *http.Server + apiHandle *api.RestApi +) + +func serverInit() { + // Setup the http.Handler/Router used by the server + graph.Init() + resolver := graph.GetResolverInstance() + graphQLEndpoint := handler.NewDefaultServer( + generated.NewExecutableSchema(generated.Config{Resolvers: resolver})) + + if os.Getenv("DEBUG") != "1" { + // Having this handler means that a error message is returned via GraphQL instead of the connection simply beeing closed. + // The problem with this is that then, no more stacktrace is printed to stderr. + graphQLEndpoint.SetRecoverFunc(func(ctx context.Context, err interface{}) error { + switch e := err.(type) { + case string: + return fmt.Errorf("MAIN > Panic: %s", e) + case error: + return fmt.Errorf("MAIN > Panic caused by: %w", e) + } + + return errors.New("MAIN > Internal server error (panic)") + }) + } + + authHandle := auth.GetAuthInstance() + + apiHandle = api.New() + + router = mux.NewRouter() + buildInfo := web.Build{Version: version, Hash: commit, Buildtime: date} + + info := map[string]interface{}{} + info["hasOpenIDConnect"] = false + + if config.Keys.OpenIDConfig != nil { + openIDConnect := auth.NewOIDC(authHandle) + openIDConnect.RegisterEndpoints(router) + info["hasOpenIDConnect"] = true + } + + router.HandleFunc("/login", func(rw http.ResponseWriter, r *http.Request) { + rw.Header().Add("Content-Type", "text/html; charset=utf-8") + log.Debugf("##%v##", info) + web.RenderTemplate(rw, "login.tmpl", &web.Page{Title: "Login", Build: buildInfo, Infos: info}) + }).Methods(http.MethodGet) + router.HandleFunc("/imprint", func(rw http.ResponseWriter, r *http.Request) { + rw.Header().Add("Content-Type", "text/html; charset=utf-8") + web.RenderTemplate(rw, "imprint.tmpl", &web.Page{Title: "Imprint", Build: buildInfo}) + }) + router.HandleFunc("/privacy", func(rw http.ResponseWriter, r *http.Request) { + rw.Header().Add("Content-Type", "text/html; charset=utf-8") + web.RenderTemplate(rw, "privacy.tmpl", &web.Page{Title: "Privacy", Build: buildInfo}) + }) + + secured := router.PathPrefix("/").Subrouter() + securedapi := router.PathPrefix("/api").Subrouter() + userapi := router.PathPrefix("/userapi").Subrouter() + configapi := router.PathPrefix("/config").Subrouter() + frontendapi := router.PathPrefix("/frontend").Subrouter() + + if !config.Keys.DisableAuthentication { + router.Handle("/login", authHandle.Login( + // On success: + http.RedirectHandler("/", http.StatusTemporaryRedirect), + + // On failure: + func(rw http.ResponseWriter, r *http.Request, err error) { + rw.Header().Add("Content-Type", "text/html; charset=utf-8") + rw.WriteHeader(http.StatusUnauthorized) + web.RenderTemplate(rw, "login.tmpl", &web.Page{ + Title: "Login failed - ClusterCockpit", + MsgType: "alert-warning", + Message: err.Error(), + Build: buildInfo, + Infos: info, + }) + })).Methods(http.MethodPost) + + router.Handle("/jwt-login", authHandle.Login( + // On success: + http.RedirectHandler("/", http.StatusTemporaryRedirect), + + // On failure: + func(rw http.ResponseWriter, r *http.Request, err error) { + rw.Header().Add("Content-Type", "text/html; charset=utf-8") + rw.WriteHeader(http.StatusUnauthorized) + web.RenderTemplate(rw, "login.tmpl", &web.Page{ + Title: "Login failed - ClusterCockpit", + MsgType: "alert-warning", + Message: err.Error(), + Build: buildInfo, + Infos: info, + }) + })) + + router.Handle("/logout", authHandle.Logout( + http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + rw.Header().Add("Content-Type", "text/html; charset=utf-8") + rw.WriteHeader(http.StatusOK) + web.RenderTemplate(rw, "login.tmpl", &web.Page{ + Title: "Bye - ClusterCockpit", + MsgType: "alert-info", + Message: "Logout successful", + Build: buildInfo, + Infos: info, + }) + }))).Methods(http.MethodPost) + + secured.Use(func(next http.Handler) http.Handler { + return authHandle.Auth( + // On success; + next, + + // On failure: + func(rw http.ResponseWriter, r *http.Request, err error) { + rw.WriteHeader(http.StatusUnauthorized) + web.RenderTemplate(rw, "login.tmpl", &web.Page{ + Title: "Authentication failed - ClusterCockpit", + MsgType: "alert-danger", + Message: err.Error(), + Build: buildInfo, + Infos: info, + }) + }) + }) + + securedapi.Use(func(next http.Handler) http.Handler { + return authHandle.AuthApi( + // On success; + next, + + // On failure: JSON Response + func(rw http.ResponseWriter, r *http.Request, err error) { + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusUnauthorized) + json.NewEncoder(rw).Encode(map[string]string{ + "status": http.StatusText(http.StatusUnauthorized), + "error": err.Error(), + }) + }) + }) + + userapi.Use(func(next http.Handler) http.Handler { + return authHandle.AuthUserApi( + // On success; + next, + + // On failure: JSON Response + func(rw http.ResponseWriter, r *http.Request, err error) { + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusUnauthorized) + json.NewEncoder(rw).Encode(map[string]string{ + "status": http.StatusText(http.StatusUnauthorized), + "error": err.Error(), + }) + }) + }) + + configapi.Use(func(next http.Handler) http.Handler { + return authHandle.AuthConfigApi( + // On success; + next, + + // On failure: JSON Response + func(rw http.ResponseWriter, r *http.Request, err error) { + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusUnauthorized) + json.NewEncoder(rw).Encode(map[string]string{ + "status": http.StatusText(http.StatusUnauthorized), + "error": err.Error(), + }) + }) + }) + + frontendapi.Use(func(next http.Handler) http.Handler { + return authHandle.AuthFrontendApi( + // On success; + next, + + // On failure: JSON Response + func(rw http.ResponseWriter, r *http.Request, err error) { + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(http.StatusUnauthorized) + json.NewEncoder(rw).Encode(map[string]string{ + "status": http.StatusText(http.StatusUnauthorized), + "error": err.Error(), + }) + }) + }) + } + + if flagDev { + router.Handle("/playground", playground.Handler("GraphQL playground", "/query")) + router.PathPrefix("/swagger/").Handler(httpSwagger.Handler( + httpSwagger.URL("http://" + config.Keys.Addr + "/swagger/doc.json"))).Methods(http.MethodGet) + } + secured.Handle("/query", graphQLEndpoint) + + // Send a searchId and then reply with a redirect to a user, or directly send query to job table for jobid and project. + secured.HandleFunc("/search", func(rw http.ResponseWriter, r *http.Request) { + routerConfig.HandleSearchBar(rw, r, buildInfo) + }) + + // Mount all /monitoring/... and /api/... routes. + routerConfig.SetupRoutes(secured, buildInfo) + apiHandle.MountApiRoutes(securedapi) + apiHandle.MountUserApiRoutes(userapi) + apiHandle.MountConfigApiRoutes(configapi) + apiHandle.MountFrontendApiRoutes(frontendapi) + + if config.Keys.EmbedStaticFiles { + if i, err := os.Stat("./var/img"); err == nil { + if i.IsDir() { + log.Info("Use local directory for static images") + router.PathPrefix("/img/").Handler(http.StripPrefix("/img/", http.FileServer(http.Dir("./var/img")))) + } + } + router.PathPrefix("/").Handler(web.ServeFiles()) + } else { + router.PathPrefix("/").Handler(http.FileServer(http.Dir(config.Keys.StaticFiles))) + } + + router.Use(handlers.CompressHandler) + router.Use(handlers.RecoveryHandler(handlers.PrintRecoveryStack(true))) + router.Use(handlers.CORS( + handlers.AllowCredentials(), + handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization", "Origin"}), + handlers.AllowedMethods([]string{"GET", "POST", "HEAD", "OPTIONS"}), + handlers.AllowedOrigins([]string{"*"}))) +} + +func serverStart() { + handler := handlers.CustomLoggingHandler(io.Discard, router, func(_ io.Writer, params handlers.LogFormatterParams) { + if strings.HasPrefix(params.Request.RequestURI, "/api/") { + log.Debugf("%s %s (%d, %.02fkb, %dms)", + params.Request.Method, params.URL.RequestURI(), + params.StatusCode, float32(params.Size)/1024, + time.Since(params.TimeStamp).Milliseconds()) + } else { + log.Debugf("%s %s (%d, %.02fkb, %dms)", + params.Request.Method, params.URL.RequestURI(), + params.StatusCode, float32(params.Size)/1024, + time.Since(params.TimeStamp).Milliseconds()) + } + }) + + server = &http.Server{ + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + Handler: handler, + Addr: config.Keys.Addr, + } + + // Start http or https server + listener, err := net.Listen("tcp", config.Keys.Addr) + if err != nil { + log.Fatalf("starting http listener failed: %v", err) + } + + if !strings.HasSuffix(config.Keys.Addr, ":80") && config.Keys.RedirectHttpTo != "" { + go func() { + http.ListenAndServe(":80", http.RedirectHandler(config.Keys.RedirectHttpTo, http.StatusMovedPermanently)) + }() + } + + if config.Keys.HttpsCertFile != "" && config.Keys.HttpsKeyFile != "" { + cert, err := tls.LoadX509KeyPair( + config.Keys.HttpsCertFile, config.Keys.HttpsKeyFile) + if err != nil { + log.Fatalf("loading X509 keypair failed: %v", err) + } + listener = tls.NewListener(listener, &tls.Config{ + Certificates: []tls.Certificate{cert}, + CipherSuites: []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + }, + MinVersion: tls.VersionTLS12, + PreferServerCipherSuites: true, + }) + fmt.Printf("HTTPS server listening at %s...", config.Keys.Addr) + } else { + fmt.Printf("HTTP server listening at %s...", config.Keys.Addr) + } + + if err = server.Serve(listener); err != nil && err != http.ErrServerClosed { + log.Fatalf("starting server failed: %v", err) + } +} + +func serverShutdown() { + // First shut down the server gracefully (waiting for all ongoing requests) + server.Shutdown(context.Background()) + + // Then, wait for any async archivings still pending... + apiHandle.JobRepository.WaitForArchiving() +} diff --git a/internal/api/api_test.go b/internal/api/api_test.go index 0354a0f8..80a7e644 100644 --- a/internal/api/api_test.go +++ b/internal/api/api_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/ClusterCockpit/cc-backend/internal/api" + "github.com/ClusterCockpit/cc-backend/internal/auth" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/graph" "github.com/ClusterCockpit/cc-backend/internal/metricdata" @@ -144,7 +145,6 @@ func setup(t *testing.T) *api.RestApi { archiveCfg := fmt.Sprintf("{\"kind\": \"file\",\"path\": \"%s\"}", jobarchive) repository.Connect("sqlite3", dbfilepath) - db := repository.GetConnection() if err := archive.Init(json.RawMessage(archiveCfg), config.Keys.DisableArchive); err != nil { t.Fatal(err) @@ -154,13 +154,10 @@ func setup(t *testing.T) *api.RestApi { t.Fatal(err) } - jobRepo := repository.GetJobRepository() - resolver := &graph.Resolver{DB: db.DB, Repo: jobRepo} + auth.Init() + graph.Init() - return &api.RestApi{ - JobRepository: resolver.Repo, - Resolver: resolver, - } + return api.New() } func cleanup() { @@ -253,12 +250,13 @@ func TestRestApi(t *testing.T) { t.Fatal(err) } - job, err := restapi.Resolver.Query().Job(ctx, strconv.Itoa(int(res.DBID))) + resolver := graph.GetResolverInstance() + job, err := resolver.Query().Job(ctx, strconv.Itoa(int(res.DBID))) if err != nil { t.Fatal(err) } - job.Tags, err = restapi.Resolver.Job().Tags(ctx, job) + job.Tags, err = resolver.Job().Tags(ctx, job) if err != nil { t.Fatal(err) } @@ -314,7 +312,8 @@ func TestRestApi(t *testing.T) { } restapi.JobRepository.WaitForArchiving() - job, err := restapi.Resolver.Query().Job(ctx, strconv.Itoa(int(dbid))) + resolver := graph.GetResolverInstance() + job, err := resolver.Query().Job(ctx, strconv.Itoa(int(dbid))) if err != nil { t.Fatal(err) } diff --git a/internal/api/rest.go b/internal/api/rest.go index b447a21c..01eb4296 100644 --- a/internal/api/rest.go +++ b/internal/api/rest.go @@ -53,12 +53,19 @@ import ( type RestApi struct { JobRepository *repository.JobRepository - Resolver *graph.Resolver Authentication *auth.Authentication MachineStateDir string RepositoryMutex sync.Mutex } +func New() *RestApi { + return &RestApi{ + JobRepository: repository.GetJobRepository(), + MachineStateDir: config.Keys.MachineStateDir, + Authentication: auth.GetAuthInstance(), + } +} + func (api *RestApi) MountApiRoutes(r *mux.Router) { r.StrictSlash(true) @@ -893,7 +900,6 @@ func (api *RestApi) stopJobByRequest(rw http.ResponseWriter, r *http.Request) { } job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) - if err != nil { handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) return @@ -977,7 +983,6 @@ func (api *RestApi) deleteJobByRequest(rw http.ResponseWriter, r *http.Request) } job, err = api.JobRepository.Find(req.JobId, req.Cluster, req.StartTime) - if err != nil { handleError(fmt.Errorf("finding job failed: %w", err), http.StatusUnprocessableEntity, rw) return @@ -1105,7 +1110,8 @@ func (api *RestApi) getJobMetrics(rw http.ResponseWriter, r *http.Request) { } `json:"error"` } - data, err := api.Resolver.Query().JobMetrics(r.Context(), id, metrics, scopes) + resolver := graph.GetResolverInstance() + data, err := resolver.Query().JobMetrics(r.Context(), id, metrics, scopes) if err != nil { json.NewEncoder(rw).Encode(Respone{ Error: &struct { diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 50f41210..e45fa9df 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -12,6 +12,7 @@ import ( "errors" "net/http" "os" + "sync" "time" "github.com/ClusterCockpit/cc-backend/internal/config" @@ -26,6 +27,11 @@ type Authenticator interface { Login(user *schema.User, rw http.ResponseWriter, r *http.Request) (*schema.User, error) } +var ( + initOnce sync.Once + authInstance *Authentication +) + type Authentication struct { sessionStore *sessions.CookieStore LdapAuth *LdapAuthenticator @@ -62,71 +68,79 @@ func (auth *Authentication) AuthViaSession( }, nil } -func Init() (*Authentication, error) { - auth := &Authentication{} +func Init() { + initOnce.Do(func() { + authInstance = &Authentication{} - sessKey := os.Getenv("SESSION_KEY") - if sessKey == "" { - log.Warn("environment variable 'SESSION_KEY' not set (will use non-persistent random key)") - bytes := make([]byte, 32) - if _, err := rand.Read(bytes); err != nil { - log.Error("Error while initializing authentication -> failed to generate random bytes for session key") - return nil, err + sessKey := os.Getenv("SESSION_KEY") + if sessKey == "" { + log.Warn("environment variable 'SESSION_KEY' not set (will use non-persistent random key)") + bytes := make([]byte, 32) + if _, err := rand.Read(bytes); err != nil { + log.Fatal("Error while initializing authentication -> failed to generate random bytes for session key") + } + authInstance.sessionStore = sessions.NewCookieStore(bytes) + } else { + bytes, err := base64.StdEncoding.DecodeString(sessKey) + if err != nil { + log.Fatal("Error while initializing authentication -> decoding session key failed") + } + authInstance.sessionStore = sessions.NewCookieStore(bytes) } - auth.sessionStore = sessions.NewCookieStore(bytes) - } else { - bytes, err := base64.StdEncoding.DecodeString(sessKey) - if err != nil { - log.Error("Error while initializing authentication -> decoding session key failed") - return nil, err + + if d, err := time.ParseDuration(config.Keys.SessionMaxAge); err != nil { + authInstance.SessionMaxAge = d } - auth.sessionStore = sessions.NewCookieStore(bytes) - } - if config.Keys.LdapConfig != nil { - ldapAuth := &LdapAuthenticator{} - if err := ldapAuth.Init(); err != nil { - log.Warn("Error while initializing authentication -> ldapAuth init failed") + if config.Keys.LdapConfig != nil { + ldapAuth := &LdapAuthenticator{} + if err := ldapAuth.Init(); err != nil { + log.Warn("Error while initializing authentication -> ldapAuth init failed") + } else { + authInstance.LdapAuth = ldapAuth + authInstance.authenticators = append(authInstance.authenticators, authInstance.LdapAuth) + } } else { - auth.LdapAuth = ldapAuth - auth.authenticators = append(auth.authenticators, auth.LdapAuth) + log.Info("Missing LDAP configuration: No LDAP support!") } - } else { - log.Info("Missing LDAP configuration: No LDAP support!") - } - if config.Keys.JwtConfig != nil { - auth.JwtAuth = &JWTAuthenticator{} - if err := auth.JwtAuth.Init(); err != nil { - log.Error("Error while initializing authentication -> jwtAuth init failed") - return nil, err - } + if config.Keys.JwtConfig != nil { + authInstance.JwtAuth = &JWTAuthenticator{} + if err := authInstance.JwtAuth.Init(); err != nil { + log.Fatal("Error while initializing authentication -> jwtAuth init failed") + } + + jwtSessionAuth := &JWTSessionAuthenticator{} + if err := jwtSessionAuth.Init(); err != nil { + log.Info("jwtSessionAuth init failed: No JWT login support!") + } else { + authInstance.authenticators = append(authInstance.authenticators, jwtSessionAuth) + } - jwtSessionAuth := &JWTSessionAuthenticator{} - if err := jwtSessionAuth.Init(); err != nil { - log.Info("jwtSessionAuth init failed: No JWT login support!") + jwtCookieSessionAuth := &JWTCookieSessionAuthenticator{} + if err := jwtCookieSessionAuth.Init(); err != nil { + log.Info("jwtCookieSessionAuth init failed: No JWT cookie login support!") + } else { + authInstance.authenticators = append(authInstance.authenticators, jwtCookieSessionAuth) + } } else { - auth.authenticators = append(auth.authenticators, jwtSessionAuth) + log.Info("Missing JWT configuration: No JWT token support!") } - jwtCookieSessionAuth := &JWTCookieSessionAuthenticator{} - if err := jwtCookieSessionAuth.Init(); err != nil { - log.Info("jwtCookieSessionAuth init failed: No JWT cookie login support!") - } else { - auth.authenticators = append(auth.authenticators, jwtCookieSessionAuth) + authInstance.LocalAuth = &LocalAuthenticator{} + if err := authInstance.LocalAuth.Init(); err != nil { + log.Fatal("Error while initializing authentication -> localAuth init failed") } - } else { - log.Info("Missing JWT configuration: No JWT token support!") - } + authInstance.authenticators = append(authInstance.authenticators, authInstance.LocalAuth) + }) +} - auth.LocalAuth = &LocalAuthenticator{} - if err := auth.LocalAuth.Init(); err != nil { - log.Error("Error while initializing authentication -> localAuth init failed") - return nil, err +func GetAuthInstance() *Authentication { + if authInstance == nil { + log.Fatal("Authentication module not initialized!") } - auth.authenticators = append(auth.authenticators, auth.LocalAuth) - return auth, nil + return authInstance } func persistUser(user *schema.User) { diff --git a/internal/auth/ldap.go b/internal/auth/ldap.go index 05672c5a..cc7c4f6c 100644 --- a/internal/auth/ldap.go +++ b/internal/auth/ldap.go @@ -10,7 +10,6 @@ import ( "net/http" "os" "strings" - "time" "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/repository" @@ -34,33 +33,6 @@ func (la *LdapAuthenticator) Init() error { lc := config.Keys.LdapConfig - if lc.SyncInterval != "" { - interval, err := time.ParseDuration(lc.SyncInterval) - if err != nil { - log.Warnf("Could not parse duration for sync interval: %v", - lc.SyncInterval) - return err - } - - if interval == 0 { - log.Info("Sync interval is zero") - return nil - } - - go func() { - ticker := time.NewTicker(interval) - for t := range ticker.C { - log.Printf("sync started at %s", t.Format(time.RFC3339)) - if err := la.Sync(); err != nil { - log.Errorf("sync failed: %s", err.Error()) - } - log.Print("sync done") - } - }() - } else { - log.Info("LDAP configuration key sync_interval invalid") - } - if lc.UserAttr != "" { la.UserAttr = lc.UserAttr } else { diff --git a/internal/graph/resolver.go b/internal/graph/resolver.go index dd7bc3b5..0f4dc065 100644 --- a/internal/graph/resolver.go +++ b/internal/graph/resolver.go @@ -1,15 +1,39 @@ package graph import ( + "sync" + "github.com/ClusterCockpit/cc-backend/internal/repository" + "github.com/ClusterCockpit/cc-backend/pkg/log" "github.com/jmoiron/sqlx" ) // This file will not be regenerated automatically. // // It serves as dependency injection for your app, add any dependencies you require here. +var ( + initOnce sync.Once + resolverInstance *Resolver +) type Resolver struct { DB *sqlx.DB Repo *repository.JobRepository } + +func Init() { + initOnce.Do(func() { + db := repository.GetConnection() + resolverInstance = &Resolver{ + DB: db.DB, Repo: repository.GetJobRepository(), + } + }) +} + +func GetResolverInstance() *Resolver { + if resolverInstance == nil { + log.Fatal("Authentication module not initialized!") + } + + return resolverInstance +} diff --git a/internal/taskManager/compressionService.go b/internal/taskManager/compressionService.go new file mode 100644 index 00000000..005a5bb7 --- /dev/null +++ b/internal/taskManager/compressionService.go @@ -0,0 +1,41 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package taskManager + +import ( + "time" + + "github.com/ClusterCockpit/cc-backend/pkg/archive" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" + "github.com/go-co-op/gocron/v2" +) + +func RegisterCompressionService(compressOlderThan int) { + log.Info("Register compression service") + + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(05, 0, 0))), + gocron.NewTask( + func() { + var jobs []*schema.Job + var err error + + ar := archive.GetHandle() + startTime := time.Now().Unix() - int64(compressOlderThan*24*3600) + lastTime := ar.CompressLast(startTime) + if startTime == lastTime { + log.Info("Compression Service - Complete archive run") + jobs, err = jobRepo.FindJobsBetween(0, startTime) + + } else { + jobs, err = jobRepo.FindJobsBetween(lastTime, startTime) + } + + if err != nil { + log.Warnf("Error while looking for compression jobs: %v", err) + } + ar.Compress(jobs) + })) +} diff --git a/internal/taskManager/ldapSyncService.go b/internal/taskManager/ldapSyncService.go new file mode 100644 index 00000000..a998aa81 --- /dev/null +++ b/internal/taskManager/ldapSyncService.go @@ -0,0 +1,36 @@ +// Copyright (C) NHR@FAU, University Erlangen-Nuremberg. +// All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. +package taskManager + +import ( + "time" + + "github.com/ClusterCockpit/cc-backend/internal/auth" + "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/go-co-op/gocron/v2" +) + +func RegisterLdapSyncService(ds string) { + interval, err := parseDuration(ds) + if err != nil { + log.Warnf("Could not parse duration for sync interval: %v", + ds) + return + } + + auth := auth.GetAuthInstance() + + log.Info("Register LDAP sync service") + s.NewJob(gocron.DurationJob(interval), + gocron.NewTask( + func() { + t := time.Now() + log.Printf("ldap sync started at %s", t.Format(time.RFC3339)) + if err := auth.LdapAuth.Sync(); err != nil { + log.Errorf("ldap sync failed: %s", err.Error()) + } + log.Print("ldap sync done") + })) +} diff --git a/internal/taskManager/retentionService.go b/internal/taskManager/retentionService.go index ef29b6a2..502f890b 100644 --- a/internal/taskManager/retentionService.go +++ b/internal/taskManager/retentionService.go @@ -9,62 +9,59 @@ import ( "github.com/ClusterCockpit/cc-backend/pkg/archive" "github.com/ClusterCockpit/cc-backend/pkg/log" - "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/go-co-op/gocron/v2" ) -func RegisterRetentionService(cfg schema.Retention) { - switch cfg.Policy { - case "delete": +func RegisterRetentionDeleteService(age int, includeDB bool) { + log.Info("Register retention delete service") - log.Info("Register retention delete service") + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))), + gocron.NewTask( + func() { + startTime := time.Now().Unix() - int64(age*24*3600) + jobs, err := jobRepo.FindJobsBetween(0, startTime) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().CleanUp(jobs) - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))), - gocron.NewTask( - func() { - startTime := time.Now().Unix() - int64(cfg.Age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime) + if includeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime) if err != nil { - log.Warnf("Error while looking for retention jobs: %s", err.Error()) + log.Errorf("Error while deleting retention jobs from db: %s", err.Error()) + } else { + log.Infof("Retention: Removed %d jobs from db", cnt) } - archive.GetHandle().CleanUp(jobs) - - if cfg.IncludeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime) - if err != nil { - log.Errorf("Error while deleting retention jobs from db: %s", err.Error()) - } else { - log.Infof("Retention: Removed %d jobs from db", cnt) - } - if err = jobRepo.Optimize(); err != nil { - log.Errorf("Error occured in db optimization: %s", err.Error()) - } + if err = jobRepo.Optimize(); err != nil { + log.Errorf("Error occured in db optimization: %s", err.Error()) } - })) - case "move": - log.Info("Register retention move service") + } + })) +} + +func RegisterRetentionMoveService(age int, includeDB bool, location string) { + log.Info("Register retention move service") - s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))), - gocron.NewTask( - func() { - startTime := time.Now().Unix() - int64(cfg.Age*24*3600) - jobs, err := jobRepo.FindJobsBetween(0, startTime) + s.NewJob(gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(04, 0, 0))), + gocron.NewTask( + func() { + startTime := time.Now().Unix() - int64(age*24*3600) + jobs, err := jobRepo.FindJobsBetween(0, startTime) + if err != nil { + log.Warnf("Error while looking for retention jobs: %s", err.Error()) + } + archive.GetHandle().Move(jobs, location) + + if includeDB { + cnt, err := jobRepo.DeleteJobsBefore(startTime) if err != nil { - log.Warnf("Error while looking for retention jobs: %s", err.Error()) + log.Errorf("Error while deleting retention jobs from db: %v", err) + } else { + log.Infof("Retention: Removed %d jobs from db", cnt) } - archive.GetHandle().Move(jobs, cfg.Location) - - if cfg.IncludeDB { - cnt, err := jobRepo.DeleteJobsBefore(startTime) - if err != nil { - log.Errorf("Error while deleting retention jobs from db: %v", err) - } else { - log.Infof("Retention: Removed %d jobs from db", cnt) - } - if err = jobRepo.Optimize(); err != nil { - log.Errorf("Error occured in db optimization: %v", err) - } + if err = jobRepo.Optimize(); err != nil { + log.Errorf("Error occured in db optimization: %v", err) } - })) - } + } + })) } diff --git a/internal/taskManager/taskManager.go b/internal/taskManager/taskManager.go index 5ddc1790..bcd2c064 100644 --- a/internal/taskManager/taskManager.go +++ b/internal/taskManager/taskManager.go @@ -5,8 +5,13 @@ package taskManager import ( + "encoding/json" + "time" + + "github.com/ClusterCockpit/cc-backend/internal/config" "github.com/ClusterCockpit/cc-backend/internal/repository" "github.com/ClusterCockpit/cc-backend/pkg/log" + "github.com/ClusterCockpit/cc-backend/pkg/schema" "github.com/go-co-op/gocron/v2" ) @@ -15,13 +20,66 @@ var ( jobRepo *repository.JobRepository ) -func init() { +func parseDuration(s string) (time.Duration, error) { + interval, err := time.ParseDuration(s) + if err != nil { + log.Warnf("Could not parse duration for sync interval: %v", + s) + return 0, err + } + + if interval == 0 { + log.Info("TaskManager: Sync interval is zero") + } + + return interval, nil +} + +func Start() { var err error jobRepo = repository.GetJobRepository() s, err = gocron.NewScheduler() if err != nil { log.Fatalf("Error while creating gocron scheduler: %s", err.Error()) } + + if config.Keys.StopJobsExceedingWalltime > 0 { + RegisterStopJobsExceedTime() + } + + var cfg struct { + Retention schema.Retention `json:"retention"` + Compression int `json:"compression"` + } + cfg.Retention.IncludeDB = true + + if err := json.Unmarshal(config.Keys.Archive, &cfg); err != nil { + log.Warn("Error while unmarshaling raw config json") + } + + switch cfg.Retention.Policy { + case "delete": + RegisterRetentionDeleteService( + cfg.Retention.Age, + cfg.Retention.IncludeDB) + case "move": + RegisterRetentionMoveService( + cfg.Retention.Age, + cfg.Retention.IncludeDB, + cfg.Retention.Location) + } + + if cfg.Compression > 0 { + RegisterCompressionService(cfg.Compression) + } + + lc := config.Keys.LdapConfig + + if lc.SyncInterval != "" { + RegisterLdapSyncService(lc.SyncInterval) + } + + s.Start() } func Shutdown() {