diff --git a/gopm-schema.json b/gopm-schema.json new file mode 100644 index 0000000..ca61d6c --- /dev/null +++ b/gopm-schema.json @@ -0,0 +1,179 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "gopm config schema", + "type": "object", + "properties": { + "grpc_server": { + "type": "object", + "required": ["address"], + "properties": { + "address": { + "type": "string" + } + } + }, + "http_server": { + "type": "object", + "required": ["port"], + "properties": { + "port": { + "type": "string" + } + } + }, + "programs": { + "type": "array", + "items": { + "$ref": "#/definitions/Program" + } + }, + "file_system": { + "type": "object", + "required": ["root"], + "properties": { + "root": { + "type": "string" + }, + "files": { + "type": "array", + "items": { + "$ref": "#/definitions/File" + } + } + } + } + }, + "definitions": { + "Program": { + "type": "object", + "required": ["name", "command"], + "properties": { + "name": { + "type": "string", + "pattern": "^\\w+$" + }, + "directory": { + "type": "string" + }, + "command": { + "type": "string" + }, + "environment": { + "type": "object", + "additionalProperties": false, + "patternProperties": { + "^\\w+$": {"type": "string"} + } + }, + "user": { + "type": "string" + }, + "exit_codes": { + "type": "array", + "items": { + "type": "integer", + "minimum": 0, + "maximum": 255 + } + }, + "priority": { + "type": "integer" + }, + "restart_pause": { + "type": "integer" + }, + "start_retries": { + "type": "integer", + "minimum": 0 + }, + "start_seconds": { + "type": "integer" + }, + "cron": { + "type": "string" + }, + "auto_start": { + "type": "boolean" + }, + "auto_restart": { + "type": "boolean" + }, + "restart_directory_monitor": { + "description": "Path to a directory to monitor and automatically restart the process if changes are detected", + "type": "string" + }, + "restart_file_pattern": { + "description": "A pattern used to monitor a path and automatically restart the process if changes are detected", + "type": "string" + }, + "restart_when_binary_changed": { + "description": "Automatically restart the process if changes to the binary are detected", + "type": "boolean" + }, + "stop_signals": { + "type": "array", + "items": { + "type": "string", + "uniqueItems": true, + "enum": ["HUP", "INT", "QUIT", "KILL", "TERM", "USR1", "USR2"] + } + }, + "stop_wait_seconds": { + "description": "The number of seconds to wait for the process to stop before killing", + "type": "integer" + }, + "stop_as_group": { + "type": "boolean" + }, + "kill_as_group": { + "type": "boolean" + }, + "stdout_logfile": { + "type": "string" + }, + "stdout_logfile_backups": { + "type": "integer" + }, + "stdout_logfile_max_bytes": { + "type": "integer" + }, + "redirect_stderr": { + "description": "Redirect STDERR to STDOUT", + "type": "boolean" + }, + "stderr_logfile": { + "type": "string" + }, + "stderr_logfile_backups": { + "type": "integer" + }, + "stderr_logfile_max_bytes": { + "type": "integer" + }, + "depends_on": { + "description": "A list of process names that must be started before starting this process", + "type": "array", + "items": { + "type": "string" + } + } + } + }, + "File": { + "type": "object", + "required": ["name", "path", "content"], + "properties": { + "name": { + "type": "string", + "pattern": "^\\w+$" + }, + "path": { + "type": "string" + }, + "content": { + "type": "string" + } + } + } + } +} \ No newline at end of file diff --git a/model/program.go b/model/program.go index 96fe710..34d28d3 100644 --- a/model/program.go +++ b/model/program.go @@ -3,8 +3,6 @@ package model import "github.com/creasty/defaults" type Program struct { - Group string `yaml:"-"` - Name string `yaml:"name"` Directory string `yaml:"directory"` Command string `yaml:"command"` diff --git a/model/visitor_validator.go b/model/visitor_validator.go index 5fac3c0..1892047 100644 --- a/model/visitor_validator.go +++ b/model/visitor_validator.go @@ -19,6 +19,13 @@ func (v *validator) Err() error { func (v *validator) Visit(node Node) Visitor { switch n := node.(type) { case *HTTPServer: + if len(n.Port) == 0 { + multierr.AppendInto(&v.err, errors.New("http_server port missing")) + } + case *GrpcServer: + if len(n.Address) == 0 { + multierr.AppendInto(&v.err, errors.New("grpc_server address missing")) + } case *Program: if len(n.Name) == 0 { diff --git a/process/process.go b/process/process.go index 315f409..86af45c 100644 --- a/process/process.go +++ b/process/process.go @@ -116,7 +116,7 @@ func NewProcess(supervisorID string, cfg *config.Process) *Process { proc := &Process{ supervisorID: supervisorID, config: cfg, - log: zap.L().With(zap.String("program", cfg.Name)), + log: zap.L().With(zap.String("name", cfg.Name)), state: Stopped, retryTimes: new(int32), } @@ -146,9 +146,9 @@ func (p *Process) addToCron() { return } - p.log.Info("Scheduling program with cron", zap.String("cron", cfg.Cron)) + p.log.Info("Scheduling process with cron", zap.String("cron", cfg.Cron)) id := scheduler.Schedule(schedule, cron.FuncJob(func() { - p.log.Debug("Running scheduled program") + p.log.Debug("Running scheduled process") if !p.isRunning() { p.Start(false) } @@ -163,7 +163,7 @@ func (p *Process) removeFromCron() { return } - p.log.Info("Removing program from cron schedule") + p.log.Info("Removing process from cron schedule") scheduler.Remove(p.cronID) p.cronID = 0 } @@ -172,10 +172,10 @@ func (p *Process) removeFromCron() { // Args: // wait - true, wait the program started or failed func (p *Process) Start(wait bool) { - p.log.Info("Starting program") + p.log.Info("Starting process") p.mu.Lock() if p.inStart { - p.log.Info("Program already starting") + p.log.Info("Process already starting") p.mu.Unlock() return } @@ -402,11 +402,12 @@ func (p *Process) createProgramCommand() error { p.log.Error("Failed to run as user", zap.String("user", cfg.User)) return fmt.Errorf("failed to set user") } - p.setProgramRestartChangeMonitor(args[0]) setDeathsig(p.cmd.SysProcAttr) p.setEnv() p.setDir() p.setLog() + bin := filepath.Join(p.cmd.Dir, args[0]) + p.setProgramRestartChangeMonitor(bin) p.stdin, _ = p.cmd.StdinPipe() return nil @@ -421,7 +422,7 @@ func (p *Process) setProgramRestartChangeMonitor(programPath string) { absPath = programPath } AddProgramChangeMonitor(absPath, func(path string, mode filechangemonitor.FileChangeMode) { - p.log.Info("Program binary changed") + p.log.Info("Process binary changed") p.Stop(true) p.Start(true) }) @@ -436,7 +437,7 @@ func (p *Process) setProgramRestartChangeMonitor(programPath string) { AddConfigChangeMonitor(absDir, filePattern, func(path string, mode filechangemonitor.FileChangeMode) { // fmt.Printf( "filePattern=%s, base=%s\n", filePattern, filepath.Base( path ) ) // if matched, err := filepath.Match( filePattern, filepath.Base( path ) ); matched && err == nil { - p.log.Info("Watched file for program is changed") + p.log.Info("Watched file for process has changed") p.Stop(true) p.Start(true) //} @@ -448,9 +449,9 @@ func (p *Process) setProgramRestartChangeMonitor(programPath string) { func (p *Process) waitForExit() { p.cmd.Wait() if p.cmd.ProcessState != nil { - p.log.Info("Program stopped", zap.Stringer("status", p.cmd.ProcessState)) + p.log.Info("Process stopped", zap.Stringer("status", p.cmd.ProcessState)) } else { - p.log.Info("Program stopped") + p.log.Info("Process stopped") } p.mu.Lock() defer p.mu.Unlock() @@ -479,7 +480,7 @@ func (p *Process) monitorProgramIsRunning(endTime time.Time, monitorExited, prog defer p.mu.Unlock() // if the program does not exit if atomic.LoadInt32(programExited) == 0 && p.state == Starting { - p.log.Info("Successfully started program") + p.log.Info("Successfully started process") p.changeStateTo(Running) } } @@ -492,7 +493,7 @@ func (p *Process) run(finishedFn func()) { // check if the program is in running state if p.isRunning() { - p.log.Info("Program already running") + p.log.Info("Process already running") finishedFn() return @@ -512,7 +513,7 @@ func (p *Process) run(finishedFn func()) { if restartPause > 0 && atomic.LoadInt32(p.retryTimes) != 0 { // pause p.mu.Unlock() - p.log.Info("Delay program restart", zap.Duration("restart_pause_seconds", time.Duration(restartPause))) + p.log.Info("Delay process restart", zap.Duration("restart_pause_seconds", time.Duration(restartPause))) time.Sleep(time.Duration(restartPause)) p.mu.Lock() } @@ -522,7 +523,7 @@ func (p *Process) run(finishedFn func()) { err := p.createProgramCommand() if err != nil { - p.failToStartProgram("Failed to create program", finishedOnceFn) + p.failToStartProgram("Failed to create process", finishedOnceFn) break } @@ -530,27 +531,21 @@ func (p *Process) run(finishedFn func()) { if err != nil { if atomic.LoadInt32(p.retryTimes) >= int32(cfg.StartRetries) { - p.failToStartProgram(fmt.Sprintf("fail to start program with error:%v", err), finishedOnceFn) + p.failToStartProgram(fmt.Sprintf("fail to start process with error:%v", err), finishedOnceFn) break } else { - p.log.Error("Failed to start program", zap.Error(err)) + p.log.Error("Failed to start process", zap.Error(err)) p.changeStateTo(Backoff) continue } } - if p.StdoutLog != nil { - p.StdoutLog.SetPid(p.cmd.Process.Pid) - } - if p.StderrLog != nil { - p.StderrLog.SetPid(p.cmd.Process.Pid) - } monitorExited := int32(0) programExited := int32(0) // Set startsec to 0 to indicate that the program needn't stay // running for any particular amount of time. if startSecs <= 0 { - p.log.Info("Program started") + p.log.Info("Process started") p.changeStateTo(Running) go finishedOnceFn() } else { @@ -559,7 +554,7 @@ func (p *Process) run(finishedFn func()) { finishedOnceFn() }() } - p.log.Debug("Waiting for program to exit") + p.log.Debug("Waiting for process to exit") p.mu.Unlock() p.waitForExit() @@ -574,7 +569,7 @@ func (p *Process) run(finishedFn func()) { // if the program still in running after startSecs if p.state == Running { p.changeStateTo(Exited) - p.log.Info("Program exited") + p.log.Info("Process exited") break } else { p.changeStateTo(Backoff) @@ -584,7 +579,7 @@ func (p *Process) run(finishedFn func()) { // start the program before giving up and putting the process into an Fatal state // first start time is not the retry time if atomic.LoadInt32(p.retryTimes) >= int32(cfg.StartRetries) { - p.failToStartProgram(fmt.Sprintf("Unable to run program; exceeded retry count: %d", cfg.StartRetries), finishedOnceFn) + p.failToStartProgram(fmt.Sprintf("Unable to run process; exceeded retry count: %d", cfg.StartRetries), finishedOnceFn) break } } @@ -657,7 +652,7 @@ func (p *Process) setLog() { } func (p *Process) createLogger(logFile string, maxBytes int64, backups int) logger.Logger { - return logger.NewLogger(p.Name(), logFile, logger.NewNullLocker(), maxBytes, backups) + return logger.NewLogger(p.Name(), logFile, maxBytes, backups) } func (p *Process) setUser() error { @@ -708,7 +703,7 @@ func (p *Process) Stop(wait bool) { if !isRunning { return } - p.log.Info("Stopping program") + p.log.Info("Stopping process") cfg := p.Config() @@ -737,21 +732,21 @@ func (p *Process) Stop(wait bool) { continue } - p.log.Info("Send stop signal to program", zap.String("signal", sigs[i])) + p.log.Info("Send stop signal to process", zap.String("signal", sigs[i])) _ = p.Signal(sig, stopAsGroup) endTime := time.Now().Add(waitDur) for endTime.After(time.Now()) { // if it already exits if p.state != Starting && p.state != Running && p.state != Stopping { - p.log.Info("Program exited") + p.log.Info("Process exited") return } time.Sleep(10 * time.Millisecond) } } - p.log.Info("Program did not stop in time, sending KILL") + p.log.Info("Process did not stop in time, sending KILL") p.Signal(syscall.SIGKILL, killAsGroup) }() diff --git a/process/process_manager.go b/process/process_manager.go index 75a1c04..c6b0644 100644 --- a/process/process_manager.go +++ b/process/process_manager.go @@ -31,10 +31,10 @@ func (pm *Manager) CreateOrUpdateProcess(supervisorID string, after *config.Proc if !ok { proc = NewProcess(supervisorID, after) pm.procs[after.Name] = proc - zap.L().Info("Created program", zap.String("program", after.Name)) + zap.L().Info("Created process", zap.String("name", after.Name)) } else { proc.UpdateConfig(after) - zap.L().Info("Updated program", zap.String("program", after.Name)) + zap.L().Info("Updated process", zap.String("name", after.Name)) } return proc @@ -71,7 +71,7 @@ func (pm *Manager) createProcess(supervisorID string, process *config.Process) * proc = NewProcess(supervisorID, process) pm.procs[process.Name] = proc } - zap.L().Info("Created program", zap.String("program", process.Name)) + zap.L().Info("Created process", zap.String("name", process.Name)) return proc } diff --git a/supervisor.go b/supervisor.go index 6893a2c..cdde25c 100644 --- a/supervisor.go +++ b/supervisor.go @@ -211,11 +211,11 @@ func (s *Supervisor) StopProcessGroup(r *http.Request, args *StartProcessArgs, r func (s *Supervisor) SendProcessStdin(r *http.Request, args *ProcessStdin, reply *struct{ Success bool }) error { proc := s.procMgr.Find(args.Name) if proc == nil { - zap.L().Error("program does not exist", zap.String("program", args.Name)) + zap.L().Error("Process does not exist", zap.String("name", args.Name)) return fmt.Errorf("NOT_RUNNING") } if proc.State() != process.Running { - zap.L().Error("program does not run", zap.String("program", args.Name)) + zap.L().Error("Process does not run", zap.String("name", args.Name)) return fmt.Errorf("NOT_RUNNING") } err := proc.SendProcessStdin(args.Chars)