diff --git a/.gitignore b/.gitignore index 2644e015..e5b2cc3f 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,4 @@ tmp/* .idea # Directory for local development -local/ +.local/ diff --git a/Makefile b/Makefile index 7de68d33..ed746c50 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ VERSION= SCRIPT_DIR=$(abspath $(dir $(lastword $(MAKEFILE_LIST)))) # Directories for miscellaneous files for the local environment -LOCAL_DIR=$(SCRIPT_DIR)/local +LOCAL_DIR=$(SCRIPT_DIR)/.local LOCAL_BIN_DIR=$(LOCAL_DIR)/bin # Configuration directory @@ -111,6 +111,7 @@ run-server-https: ${SERVER_CERT_FILE} ${SERVER_KEY_FILE} test: @echo "${COLOR_GREEN}Running tests...${COLOR_RESET}" @GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum} + @go clean -testcache @${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} ./... # test-coverage runs all tests with coverage. @@ -119,13 +120,6 @@ test-coverage: @GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum} @${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} -coverprofile="coverage.txt" -covermode=atomic ./... -# test-clean cleans the test cache and run all tests. -test-clean: build-bin - @echo "${COLOR_GREEN}Running tests...${COLOR_RESET}" - @GOBIN=${LOCAL_BIN_DIR} go install ${PKG_gotestsum} - @go clean -testcache - @${LOCAL_BIN_DIR}/gotestsum ${GOTESTSUM_ARGS} -- ${GO_TEST_FLAGS} ./... - # lint runs the linter. lint: golangci-lint diff --git a/cmd/dry.go b/cmd/dry.go index 7bbdc5a5..7b71d24c 100644 --- a/cmd/dry.go +++ b/cmd/dry.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "path/filepath" "github.com/daguflow/dagu/internal/agent" @@ -45,32 +44,35 @@ func dryCmd() *cobra.Command { params, err := cmd.Flags().GetString("params") if err != nil { - initLogger.Error("Parameter retrieval failed", "error", err) - os.Exit(1) + initLogger.Fatal("Parameter retrieval failed", "error", err) } workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params)) if err != nil { - initLogger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + initLogger.Fatal("Workflow load failed", "error", err, "file", args[0]) } requestID, err := generateRequestID() if err != nil { - initLogger.Error("Request ID generation failed", "error", err) - os.Exit(1) + initLogger.Fatal("Request ID generation failed", "error", err) } - logFile, err := openLogFile("dry_", cfg.LogDir, workflow, requestID) + logFile, err := logger.OpenLogFile(logger.LogFileConfig{ + Prefix: "dry_", + LogDir: cfg.LogDir, + DAGLogDir: workflow.LogDir, + DAGName: workflow.Name, + RequestID: requestID, + }) + if err != nil { - initLogger.Error( + initLogger.Fatal( "Log file creation failed", "error", err, "workflow", workflow.Name, ) - os.Exit(1) } defer logFile.Close() @@ -98,11 +100,10 @@ func dryCmd() *cobra.Command { listenSignals(ctx, agt) if err := agt.Run(ctx); err != nil { - agentLogger.Error("Workflow execution failed", + agentLogger.Fatal("Workflow execution failed", "error", err, "workflow", workflow.Name, "requestID", requestID) - os.Exit(1) } }, } diff --git a/cmd/logging.go b/cmd/logging.go deleted file mode 100644 index 517e0303..00000000 --- a/cmd/logging.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright (C) 2024 The Daguflow/Dagu Authors -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package cmd - -import ( - "fmt" - "os" - "path/filepath" - "time" - - "github.com/daguflow/dagu/internal/dag" - "github.com/daguflow/dagu/internal/util" -) - -// openLogFile opens a log file for the workflow. -func openLogFile( - prefix string, - logDir string, - workflow *dag.DAG, - requestID string, -) (*os.File, error) { - name := util.ValidFilename(workflow.Name) - if workflow.LogDir != "" { - logDir = filepath.Join(workflow.LogDir, name) - } - // Check if the log directory exists - if _, err := os.Stat(logDir); os.IsNotExist(err) { - // Create the log directory - if err := os.MkdirAll(logDir, 0755); err != nil { - return nil, err - } - } - file := filepath.Join(logDir, fmt.Sprintf("%s%s.%s.%s.log", - prefix, - name, - time.Now().Format("20060102.15:04:05.000"), - util.TruncString(requestID, 8), - )) - // Open or create the log file - return os.OpenFile( - file, os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, 0644, - ) -} diff --git a/cmd/restart.go b/cmd/restart.go index 640f215b..ad06a391 100644 --- a/cmd/restart.go +++ b/cmd/restart.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "path/filepath" "time" @@ -57,18 +56,16 @@ func restartCmd() *cobra.Command { specFilePath := args[0] workflow, err := dag.Load(cfg.BaseConfig, specFilePath, "") if err != nil { - initLogger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + initLogger.Fatal("Workflow load failed", "error", err, "file", args[0]) } dataStore := newDataStores(cfg) cli := newClient(cfg, dataStore, initLogger) if err := stopDAGIfRunning(cli, workflow, initLogger); err != nil { - initLogger.Error("Workflow stop operation failed", + initLogger.Fatal("Workflow stop operation failed", "error", err, "workflow", workflow.Name) - os.Exit(1) } // Wait for the specified amount of time before restarting. @@ -77,35 +74,37 @@ func restartCmd() *cobra.Command { // Retrieve the parameter of the previous execution. params, err := getPreviousExecutionParams(cli, workflow) if err != nil { - initLogger.Error("Previous execution parameter retrieval failed", + initLogger.Fatal("Previous execution parameter retrieval failed", "error", err, "workflow", workflow.Name) - os.Exit(1) } // Start the DAG with the same parameter. // Need to reload the DAG file with the parameter. workflow, err = dag.Load(cfg.BaseConfig, specFilePath, params) if err != nil { - initLogger.Error("Workflow reload failed", + initLogger.Fatal("Workflow reload failed", "error", err, "file", specFilePath, "params", params) - os.Exit(1) } requestID, err := generateRequestID() if err != nil { - initLogger.Error("Request ID generation failed", "error", err) - os.Exit(1) + initLogger.Fatal("Request ID generation failed", "error", err) } - logFile, err := openLogFile("restart_", cfg.LogDir, workflow, requestID) + logFile, err := logger.OpenLogFile(logger.LogFileConfig{ + Prefix: "restart_", + LogDir: cfg.LogDir, + DAGLogDir: workflow.LogDir, + DAGName: workflow.Name, + RequestID: requestID, + }) if err != nil { - initLogger.Error("Log file creation failed", + initLogger.Fatal("Log file creation failed", "error", err, "workflow", workflow.Name) - os.Exit(1) } defer logFile.Close() @@ -133,11 +132,10 @@ func restartCmd() *cobra.Command { listenSignals(cmd.Context(), agt) if err := agt.Run(cmd.Context()); err != nil { - agentLogger.Error("Workflow restart failed", + agentLogger.Fatal("Workflow restart failed", "error", err, "workflow", workflow.Name, "requestID", requestID) - os.Exit(1) } }, } diff --git a/cmd/retry.go b/cmd/retry.go index 99646c96..583cc00f 100644 --- a/cmd/retry.go +++ b/cmd/retry.go @@ -56,44 +56,45 @@ func retryCmd() *cobra.Command { specFilePath := args[0] absoluteFilePath, err := filepath.Abs(specFilePath) if err != nil { - initLogger.Error("Absolute path resolution failed", + initLogger.Fatal("Absolute path resolution failed", "error", err, "file", specFilePath) - os.Exit(1) } status, err := historyStore.FindByRequestID(absoluteFilePath, requestID) if err != nil { - initLogger.Error("Historical execution retrieval failed", + initLogger.Fatal("Historical execution retrieval failed", "error", err, "requestID", requestID, "file", absoluteFilePath) - os.Exit(1) } // Start the DAG with the same parameters with the execution that // is being retried. workflow, err := dag.Load(cfg.BaseConfig, absoluteFilePath, status.Status.Params) if err != nil { - initLogger.Error("Workflow specification load failed", + initLogger.Fatal("Workflow specification load failed", "error", err, "file", specFilePath, "params", status.Status.Params) - os.Exit(1) } newRequestID, err := generateRequestID() if err != nil { - initLogger.Error("Request ID generation failed", "error", err) - os.Exit(1) + initLogger.Fatal("Request ID generation failed", "error", err) } - logFile, err := openLogFile("dry_", cfg.LogDir, workflow, newRequestID) + logFile, err := logger.OpenLogFile(logger.LogFileConfig{ + Prefix: "retry_", + LogDir: cfg.LogDir, + DAGLogDir: workflow.LogDir, + DAGName: workflow.Name, + RequestID: newRequestID, + }) if err != nil { - initLogger.Error("Log file creation failed", + initLogger.Fatal("Log file creation failed", "error", err, "workflow", workflow.Name) - os.Exit(1) } defer logFile.Close() @@ -126,8 +127,7 @@ func retryCmd() *cobra.Command { listenSignals(ctx, agt) if err := agt.Run(ctx); err != nil { - agentLogger.Error("Failed to start workflow", "error", err) - os.Exit(1) + agentLogger.Fatal("Failed to start workflow", "error", err) } }, } diff --git a/cmd/scheduler.go b/cmd/scheduler.go index e5ef4504..b25bbb10 100644 --- a/cmd/scheduler.go +++ b/cmd/scheduler.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/logger" @@ -54,14 +53,13 @@ func schedulerCmd() *cobra.Command { cli := newClient(cfg, dataStore, logger) sc := scheduler.New(cfg, logger, cli) if err := sc.Start(ctx); err != nil { - logger.Error( + logger.Fatal( "Scheduler initialization failed", "error", err, "specsDirectory", cfg.DAGs, ) - os.Exit(1) } }, } diff --git a/cmd/server.go b/cmd/server.go index 2889bee3..231f89ab 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/frontend" @@ -52,8 +51,7 @@ func serverCmd() *cobra.Command { cli := newClient(cfg, dataStore, logger) server := frontend.New(cfg, logger, cli) if err := server.Serve(cmd.Context()); err != nil { - logger.Error("Server initialization failed", "error", err) - os.Exit(1) + logger.Fatal("Server initialization failed", "error", err) } }, } diff --git a/cmd/start.go b/cmd/start.go index f38002ab..069f384f 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "path/filepath" "github.com/daguflow/dagu/internal/agent" @@ -52,32 +51,34 @@ func startCmd() *cobra.Command { params, err := cmd.Flags().GetString("params") if err != nil { - initLogger.Error("Parameter retrieval failed", "error", err) - os.Exit(1) + initLogger.Fatal("Parameter retrieval failed", "error", err) } workflow, err := dag.Load(cfg.BaseConfig, args[0], removeQuotes(params)) if err != nil { - initLogger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + initLogger.Fatal("Workflow load failed", "error", err, "file", args[0]) } requestID, err := generateRequestID() if err != nil { - initLogger.Error("Request ID generation failed", "error", err) - os.Exit(1) + initLogger.Fatal("Request ID generation failed", "error", err) } - logFile, err := openLogFile("start_", cfg.LogDir, workflow, requestID) + logFile, err := logger.OpenLogFile(logger.LogFileConfig{ + Prefix: "start_", + LogDir: cfg.LogDir, + DAGLogDir: workflow.LogDir, + DAGName: workflow.Name, + RequestID: requestID, + }) if err != nil { - initLogger.Error( + initLogger.Fatal( "Log file creation failed", "error", err, "workflow", workflow.Name, ) - os.Exit(1) } defer logFile.Close() @@ -111,11 +112,10 @@ func startCmd() *cobra.Command { listenSignals(ctx, agt) if err := agt.Run(ctx); err != nil { - agentLogger.Error("Workflow execution failed", + agentLogger.Fatal("Workflow execution failed", "error", err, "workflow", workflow.Name, "requestID", requestID) - os.Exit(1) } }, } diff --git a/cmd/start_all.go b/cmd/start_all.go index a49c3f0f..ad8119d0 100644 --- a/cmd/start_all.go +++ b/cmd/start_all.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/frontend" @@ -60,8 +59,7 @@ func startAllCmd() *cobra.Command { sc := scheduler.New(cfg, logger, cli) if err := sc.Start(ctx); err != nil { - logger.Error("Scheduler initialization failed", "error", err, "dags", cfg.DAGs) - os.Exit(1) + logger.Fatal("Scheduler initialization failed", "error", err, "dags", cfg.DAGs) } }() @@ -69,8 +67,7 @@ func startAllCmd() *cobra.Command { server := frontend.New(cfg, logger, cli) if err := server.Serve(ctx); err != nil { - logger.Error("Server initialization failed", "error", err) - os.Exit(1) + logger.Fatal("Server initialization failed", "error", err) } }, } diff --git a/cmd/status.go b/cmd/status.go index 9c823f00..6266670f 100644 --- a/cmd/status.go +++ b/cmd/status.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/dag" @@ -44,8 +43,7 @@ func statusCmd() *cobra.Command { // Load the DAG file and get the current running status. workflow, err := dag.Load(cfg.BaseConfig, args[0], "") if err != nil { - logger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + logger.Fatal("Workflow load failed", "error", err, "file", args[0]) } dataStore := newDataStores(cfg) @@ -54,8 +52,7 @@ func statusCmd() *cobra.Command { curStatus, err := cli.GetCurrentStatus(workflow) if err != nil { - logger.Error("Current status retrieval failed", "error", err) - os.Exit(1) + logger.Fatal("Current status retrieval failed", "error", err) } logger.Info("Current status", "pid", curStatus.PID, "status", curStatus.Status) diff --git a/cmd/stop.go b/cmd/stop.go index 4ea594fa..a3ddea73 100644 --- a/cmd/stop.go +++ b/cmd/stop.go @@ -17,7 +17,6 @@ package cmd import ( "log" - "os" "github.com/daguflow/dagu/internal/config" "github.com/daguflow/dagu/internal/dag" @@ -50,8 +49,7 @@ func stopCmd() *cobra.Command { workflow, err := dag.Load(cfg.BaseConfig, args[0], "") if err != nil { - logger.Error("Workflow load failed", "error", err, "file", args[0]) - os.Exit(1) + logger.Fatal("Workflow load failed", "error", err, "file", args[0]) } logger.Info("Workflow stop initiated", "workflow", workflow.Name) @@ -60,14 +58,13 @@ func stopCmd() *cobra.Command { cli := newClient(cfg, dataStore, logger) if err := cli.Stop(workflow); err != nil { - logger.Error( + logger.Fatal( "Workflow stop operation failed", "error", err, "workflow", workflow.Name, ) - os.Exit(1) } }, } diff --git a/docs/source/yaml_format.rst b/docs/source/yaml_format.rst index 7237eb83..44590964 100644 --- a/docs/source/yaml_format.rst +++ b/docs/source/yaml_format.rst @@ -317,7 +317,7 @@ This section provides a comprehensive list of available fields that can be used - ``logDir``: The directory where the standard output is written. The default value is ``${HOME}/.local/share/logs``. - ``restartWaitSec``: The number of seconds to wait after the DAG process stops before restarting it. - ``histRetentionDays``: The number of days to retain execution history (not for log files). -- ``timeout``: The timeout of the DAG, which is optional. Unit is seconds. +- ``timeoutSec``: The timeout of the DAG, which is optional. Unit is seconds. - ``delaySec``: The interval time in seconds between steps. - ``maxActiveRuns``: The maximum number of parallel running steps. - ``params``: The default parameters that can be referred to by ``$1``, ``$2``, and so on. @@ -344,7 +344,7 @@ Example: logDir: ${LOG_DIR} restartWaitSec: 60 histRetentionDays: 3 - timeout: 3600 + timeoutSec: 3600 delaySec: 1 maxActiveRuns: 1 params: param1 param2 diff --git a/internal/agent/agent_test.go b/internal/agent/agent_test.go index 37b5b587..af864c58 100644 --- a/internal/agent/agent_test.go +++ b/internal/agent/agent_test.go @@ -248,7 +248,7 @@ func TestAgent_Retry(t *testing.T) { // Modify the DAG to make it successful for _, node := range status.Nodes { - node.CmdWithArgs = "true" + node.Step.CmdWithArgs = "true" } // Retry the DAG and check if it is successful diff --git a/internal/agent/reporter.go b/internal/agent/reporter.go index d8d2425f..59e20f18 100644 --- a/internal/agent/reporter.go +++ b/internal/agent/reporter.go @@ -55,7 +55,7 @@ func (r *reporter) reportStep( nodeStatus := node.State().Status if nodeStatus != scheduler.NodeStatusNone { r.logger.Info("Step execution finished", - "step", node.Data().Name, + "step", node.Data().Step.Name, "status", nodeStatus, ) } @@ -160,15 +160,15 @@ func renderTable(nodes []*model.Node) string { }, ) for i, n := range nodes { - var command = n.Command - if n.Args != nil { + var command = n.Step.Command + if n.Step.Args != nil { command = strings.Join( - []string{n.Command, strings.Join(n.Args, " ")}, " ", + []string{n.Step.Command, strings.Join(n.Step.Args, " ")}, " ", ) } t.AppendRow(table.Row{ fmt.Sprintf("%d", i+1), - n.Name, + n.Step.Name, n.StartedAt, n.FinishedAt, n.StatusText, @@ -215,7 +215,7 @@ func renderHTML(nodes []*model.Node) string { } for _, n := range nodes { _, _ = buffer.WriteString("") - addValFunc(n.Name) + addValFunc(n.Step.Name) addValFunc(n.StartedAt) addValFunc(n.FinishedAt) addStatusFunc(n.Status) diff --git a/internal/agent/reporter_test.go b/internal/agent/reporter_test.go index 79499306..202cba8c 100644 --- a/internal/agent/reporter_test.go +++ b/internal/agent/reporter_test.go @@ -181,8 +181,8 @@ func testRenderSummary(t *testing.T, _ *reporter, workflow *dag.DAG, nodes []*mo func testRenderTable(t *testing.T, _ *reporter, _ *dag.DAG, nodes []*model.Node) { summary := renderTable(nodes) - require.Contains(t, summary, nodes[0].Name) - require.Contains(t, summary, nodes[0].Args[0]) + require.Contains(t, summary, nodes[0].Step.Name) + require.Contains(t, summary, nodes[0].Step.Args[0]) } type mockSender struct { diff --git a/internal/agent/testdata/timeout.yaml b/internal/agent/testdata/timeout.yaml index 13ff24ed..d5e01f2c 100644 --- a/internal/agent/testdata/timeout.yaml +++ b/internal/agent/testdata/timeout.yaml @@ -1,4 +1,4 @@ -timeout: 2 +timeoutSec: 2 steps: - name: "1" command: "sleep 1" diff --git a/internal/client/client.go b/internal/client/client.go index fa2176a0..531e5fc0 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -302,7 +302,12 @@ func (e *client) GetAllStatusPagination(params dags.ListDagsParams) ([]*DAGStatu currentStatus *DAGStatus ) - if dagListPaginationResult, err = dagStore.ListPagination(params); err != nil { + if dagListPaginationResult, err = dagStore.ListPagination(persistence.DAGListPaginationArgs{ + Page: int(params.Page), + Limit: int(params.Limit), + Name: params.SearchName, + Tag: params.SearchTag, + }); err != nil { return dagStatusList, &DagListPaginationSummaryResult{PageCount: 1}, err } @@ -314,7 +319,7 @@ func (e *client) GetAllStatusPagination(params dags.ListDagsParams) ([]*DAGStatu } return dagStatusList, &DagListPaginationSummaryResult{ - PageCount: e.getPageCount(dagListPaginationResult.Count, params.Limit), + PageCount: e.getPageCount(int64(dagListPaginationResult.Count), params.Limit), ErrorList: dagListPaginationResult.ErrorList, }, nil } diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 1ce70385..fb59ca4a 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -855,7 +855,7 @@ func testNewStatus(workflow *dag.DAG, requestID string, status scheduler.Status, workflow, []scheduler.NodeData{ { - NodeState: scheduler.NodeState{Status: nodeStatus}, + State: scheduler.NodeState{Status: nodeStatus}, }, }, status, diff --git a/internal/dag/builder.go b/internal/dag/builder.go index 75d11cc4..4103d509 100644 --- a/internal/dag/builder.go +++ b/internal/dag/builder.go @@ -126,7 +126,7 @@ func (b *builder) build(def *definition, envs []string) (*DAG, error) { Name: def.Name, Group: def.Group, Description: def.Description, - Timeout: time.Second * time.Duration(def.Timeout), + Timeout: time.Second * time.Duration(def.TimeoutSec), Delay: time.Second * time.Duration(def.DelaySec), RestartWait: time.Second * time.Duration(def.RestartWaitSec), Tags: parseTags(def.Tags), diff --git a/internal/dag/definition.go b/internal/dag/definition.go index b43020fc..9921a0be 100644 --- a/internal/dag/definition.go +++ b/internal/dag/definition.go @@ -32,7 +32,7 @@ type definition struct { MailOn *mailOnDef ErrorMail mailConfigDef InfoMail mailConfigDef - Timeout int + TimeoutSec int DelaySec int RestartWaitSec int HistRetentionDays *int diff --git a/internal/dag/scheduler/graph.go b/internal/dag/scheduler/graph.go index 8a775424..2591aa1e 100644 --- a/internal/dag/scheduler/graph.go +++ b/internal/dag/scheduler/graph.go @@ -194,7 +194,7 @@ func (g *ExecutionGraph) setupRetry() error { dict := map[int]NodeStatus{} retry := map[int]bool{} for _, node := range g.nodes { - dict[node.id] = node.data.Status + dict[node.id] = node.data.State.Status retry[node.id] = false } var frontier []int diff --git a/internal/dag/scheduler/graph_test.go b/internal/dag/scheduler/graph_test.go index bf91ddca..c84173fb 100644 --- a/internal/dag/scheduler/graph_test.go +++ b/internal/dag/scheduler/graph_test.go @@ -44,7 +44,7 @@ func TestRetryExecution(t *testing.T) { { data: NodeData{ Step: dag.Step{Name: "1", Command: "true"}, - NodeState: NodeState{ + State: NodeState{ Status: NodeStatusSuccess, }, }, @@ -52,7 +52,7 @@ func TestRetryExecution(t *testing.T) { { data: NodeData{ Step: dag.Step{Name: "2", Command: "true", Depends: []string{"1"}}, - NodeState: NodeState{ + State: NodeState{ Status: NodeStatusError, }, }, @@ -60,7 +60,7 @@ func TestRetryExecution(t *testing.T) { { data: NodeData{ Step: dag.Step{Name: "3", Command: "true", Depends: []string{"2"}}, - NodeState: NodeState{ + State: NodeState{ Status: NodeStatusCancel, }, }, @@ -68,7 +68,7 @@ func TestRetryExecution(t *testing.T) { { data: NodeData{ Step: dag.Step{Name: "4", Command: "true", Depends: []string{}}, - NodeState: NodeState{ + State: NodeState{ Status: NodeStatusSkipped, }, }, @@ -76,7 +76,7 @@ func TestRetryExecution(t *testing.T) { { data: NodeData{ Step: dag.Step{Name: "5", Command: "true", Depends: []string{"4"}}, - NodeState: NodeState{ + State: NodeState{ Status: NodeStatusError, }, }, @@ -84,7 +84,7 @@ func TestRetryExecution(t *testing.T) { { data: NodeData{ Step: dag.Step{Name: "6", Command: "true", Depends: []string{"5"}}, - NodeState: NodeState{ + State: NodeState{ Status: NodeStatusSuccess, }, }, @@ -92,7 +92,7 @@ func TestRetryExecution(t *testing.T) { { data: NodeData{ Step: dag.Step{Name: "7", Command: "true", Depends: []string{"6"}}, - NodeState: NodeState{ + State: NodeState{ Status: NodeStatusSkipped, }, }, @@ -100,7 +100,7 @@ func TestRetryExecution(t *testing.T) { { data: NodeData{ Step: dag.Step{Name: "8", Command: "true", Depends: []string{}}, - NodeState: NodeState{ + State: NodeState{ Status: NodeStatusSkipped, }, }, diff --git a/internal/dag/scheduler/node.go b/internal/dag/scheduler/node.go index ef2af54e..7436d6b4 100644 --- a/internal/dag/scheduler/node.go +++ b/internal/dag/scheduler/node.go @@ -34,6 +34,45 @@ import ( "golang.org/x/sys/unix" ) +// Node is a node in a DAG. It executes a command. +type Node struct { + data NodeData + + id int + mu sync.RWMutex + logLock sync.Mutex + cmd executor.Executor + cancelFunc func() + logFile *os.File + logWriter *bufio.Writer + stdoutFile *os.File + stdoutWriter *bufio.Writer + stderrFile *os.File + stderrWriter *bufio.Writer + outputWriter *os.File + outputReader *os.File + scriptFile *os.File + done bool +} + +type NodeData struct { + Step dag.Step + State NodeState +} + +// NodeState contains the state of a node. +type NodeState struct { + Status NodeStatus + Log string + StartedAt time.Time + FinishedAt time.Time + RetryCount int + RetriedAt time.Time + DoneCount int + Error error +} + +// NodeStatus represents the status of a node. type NodeStatus int const ( @@ -66,64 +105,26 @@ func (s NodeStatus) String() string { func NewNode(step dag.Step, state NodeState) *Node { return &Node{ - data: NodeData{Step: step, NodeState: state}, + data: NodeData{Step: step, State: state}, } } -type NodeData struct { - dag.Step - NodeState -} - -// Node is a node in a DAG. It executes a command. -type Node struct { - data NodeData - - id int - mu sync.RWMutex - logLock sync.Mutex - cmd executor.Executor - cancelFunc func() - logFile *os.File - logWriter *bufio.Writer - stdoutFile *os.File - stdoutWriter *bufio.Writer - stderrFile *os.File - stderrWriter *bufio.Writer - outputWriter *os.File - outputReader *os.File - scriptFile *os.File - done bool -} - -// NodeState is the state of a node. -type NodeState struct { - Status NodeStatus - Log string - StartedAt time.Time - FinishedAt time.Time - RetryCount int - RetriedAt time.Time - DoneCount int - Error error -} - -func (n *Node) finish() { - n.mu.Lock() - defer n.mu.Unlock() - n.data.FinishedAt = time.Now() +func (n *Node) Data() NodeData { + n.mu.RLock() + defer n.mu.RUnlock() + return n.data } func (n *Node) SetError(err error) { n.mu.Lock() defer n.mu.Unlock() - n.data.Error = err + n.data.State.Error = err } func (n *Node) State() NodeState { n.mu.RLock() defer n.mu.RUnlock() - return n.data.NodeState + return n.data.State } // Execute runs the command synchronously and returns error if any. @@ -146,7 +147,13 @@ func (n *Node) Execute(ctx context.Context) error { ) } - return n.data.Error + return n.data.State.Error +} + +func (n *Node) finish() { + n.mu.Lock() + defer n.mu.Unlock() + n.data.State.FinishedAt = time.Now() } func (n *Node) setupExec(ctx context.Context) (executor.Executor, error) { @@ -203,51 +210,45 @@ func (n *Node) setupExec(ctx context.Context) (executor.Executor, error) { return cmd, nil } -func (n *Node) Data() NodeData { - n.mu.RLock() - defer n.mu.RUnlock() - return n.data -} - func (n *Node) getRetryCount() int { n.mu.RLock() defer n.mu.RUnlock() - return n.data.RetryCount + return n.data.State.RetryCount } func (n *Node) setRetriedAt(retriedAt time.Time) { n.mu.Lock() defer n.mu.Unlock() - n.data.RetriedAt = retriedAt + n.data.State.RetriedAt = retriedAt } func (n *Node) getDoneCount() int { n.mu.RLock() defer n.mu.RUnlock() - return n.data.DoneCount + return n.data.State.DoneCount } func (n *Node) clearState() { - n.data.NodeState = NodeState{} + n.data.State = NodeState{} } func (n *Node) setStatus(status NodeStatus) { n.mu.Lock() defer n.mu.Unlock() - n.data.Status = status + n.data.State.Status = status } func (n *Node) setErr(err error) { n.mu.Lock() defer n.mu.Unlock() - n.data.Error = err - n.data.Status = NodeStatusError + n.data.State.Error = err + n.data.State.Status = NodeStatusError } func (n *Node) signal(sig os.Signal, allowOverride bool) { n.mu.Lock() defer n.mu.Unlock() - status := n.data.Status + status := n.data.State.Status if status == NodeStatusRunning && n.cmd != nil { sigsig := sig if allowOverride && n.data.Step.SignalOnStop != "" { @@ -257,16 +258,16 @@ func (n *Node) signal(sig os.Signal, allowOverride bool) { util.LogErr("sending signal", n.cmd.Kill(sigsig)) } if status == NodeStatusRunning { - n.data.Status = NodeStatusCancel + n.data.State.Status = NodeStatusCancel } } func (n *Node) cancel() { n.mu.Lock() defer n.mu.Unlock() - status := n.data.Status + status := n.data.State.Status if status == NodeStatusRunning { - n.data.Status = NodeStatusCancel + n.data.State.Status = NodeStatusCancel } if n.cancelFunc != nil { log.Printf("canceling node: %s", n.data.Step.Name) @@ -278,24 +279,22 @@ func (n *Node) setup(logDir string, requestID string) error { n.mu.Lock() defer n.mu.Unlock() - n.data.StartedAt = time.Now() - n.data.Log = filepath.Join(logDir, fmt.Sprintf("%s.%s.%s.log", + n.data.State.StartedAt = time.Now() + n.data.State.Log = filepath.Join(logDir, fmt.Sprintf("%s.%s.%s.log", util.ValidFilename(n.data.Step.Name), - n.data.StartedAt.Format("20060102.15:04:05.000"), + n.data.State.StartedAt.Format("20060102.15:04:05.000"), util.TruncString(requestID, 8), )) - for _, fn := range []func() error{ - n.setupLog, - n.setupStdout, - n.setupStderr, - n.setupScript, - } { - if err := fn(); err != nil { - n.data.Error = err - return err - } + if err := n.setupLog(); err != nil { + return err } - return nil + if err := n.setupStdout(); err != nil { + return err + } + if err := n.setupStderr(); err != nil { + return err + } + return n.setupScript() } var ( @@ -328,7 +327,7 @@ func (n *Node) setupStdout() error { var err error n.stdoutFile, err = util.OpenOrCreateFile(f) if err != nil { - n.data.Error = err + n.data.State.Error = err return err } n.stdoutWriter = bufio.NewWriter(n.stdoutFile) @@ -345,7 +344,7 @@ func (n *Node) setupStderr() error { var err error n.stderrFile, err = util.OpenOrCreateFile(f) if err != nil { - n.data.Error = err + n.data.State.Error = err return err } n.stderrWriter = bufio.NewWriter(n.stderrFile) @@ -354,15 +353,15 @@ func (n *Node) setupStderr() error { } func (n *Node) setupLog() error { - if n.data.Log == "" { + if n.data.State.Log == "" { return nil } n.logLock.Lock() defer n.logLock.Unlock() var err error - n.logFile, err = util.OpenOrCreateFile(n.data.Log) + n.logFile, err = util.OpenOrCreateFile(n.data.State.Log) if err != nil { - n.data.Error = err + n.data.State.Error = err return err } n.logWriter = bufio.NewWriter(n.logFile) @@ -397,7 +396,7 @@ func (n *Node) teardown() error { _ = os.Remove(n.scriptFile.Name()) } if lastErr != nil { - n.data.Error = lastErr + n.data.State.Error = lastErr } return lastErr } @@ -405,13 +404,13 @@ func (n *Node) teardown() error { func (n *Node) incRetryCount() { n.mu.Lock() defer n.mu.Unlock() - n.data.RetryCount++ + n.data.State.RetryCount++ } func (n *Node) incDoneCount() { n.mu.Lock() defer n.mu.Unlock() - n.data.DoneCount++ + n.data.State.DoneCount++ } var ( diff --git a/internal/dag/scheduler/node_test.go b/internal/dag/scheduler/node_test.go index 5fd6acd1..981badc2 100644 --- a/internal/dag/scheduler/node_test.go +++ b/internal/dag/scheduler/node_test.go @@ -36,7 +36,7 @@ func TestExecute(t *testing.T) { OutputVariables: &dag.SyncMap{}, }}} require.NoError(t, n.Execute(context.Background())) - require.Nil(t, n.data.Error) + require.Nil(t, n.data.State.Error) } func TestError(t *testing.T) { @@ -47,7 +47,7 @@ func TestError(t *testing.T) { }}} err := n.Execute(context.Background()) require.True(t, err != nil) - require.Equal(t, n.data.Error, err) + require.Equal(t, n.data.State.Error, err) } func TestSignal(t *testing.T) { @@ -369,13 +369,13 @@ func TestTeardown(t *testing.T) { // no error since done flag is true err := n.teardown() require.NoError(t, err) - require.NoError(t, n.data.Error) + require.NoError(t, n.data.State.Error) // error n.done = false err = n.teardown() require.Error(t, err) - require.Error(t, n.data.Error) + require.Error(t, n.data.State.Error) } func runTestNode(t *testing.T, n *Node) { diff --git a/internal/dag/scheduler/scheduler.go b/internal/dag/scheduler/scheduler.go index d12b24c1..6fd144d2 100644 --- a/internal/dag/scheduler/scheduler.go +++ b/internal/dag/scheduler/scheduler.go @@ -404,7 +404,7 @@ func isReady(g *ExecutionGraph, node *Node) bool { func (sc *Scheduler) runHandlerNode(ctx context.Context, node *Node) error { defer func() { - node.data.FinishedAt = time.Now() + node.data.State.FinishedAt = time.Now() }() node.setStatus(NodeStatusRunning) diff --git a/internal/dag/scheduler/scheduler_test.go b/internal/dag/scheduler/scheduler_test.go index 1ffacf9e..b50d9559 100644 --- a/internal/dag/scheduler/scheduler_test.go +++ b/internal/dag/scheduler/scheduler_test.go @@ -544,7 +544,7 @@ func TestRepeat(t *testing.T) { require.Equal(t, sc.Status(g), StatusCancel) require.Equal(t, NodeStatusCancel, nodes[0].State().Status) - require.Equal(t, nodes[0].data.DoneCount, 2) + require.Equal(t, nodes[0].data.State.DoneCount, 2) } func TestRepeatFail(t *testing.T) { @@ -566,7 +566,7 @@ func TestRepeatFail(t *testing.T) { nodes := g.Nodes() require.Equal(t, sc.Status(g), StatusError) require.Equal(t, NodeStatusError, nodes[0].State().Status) - require.Equal(t, nodes[0].data.DoneCount, 1) + require.Equal(t, nodes[0].data.State.DoneCount, 1) } func TestStopRepetitiveTaskGracefully(t *testing.T) { @@ -600,7 +600,7 @@ func TestStopRepetitiveTaskGracefully(t *testing.T) { require.Equal(t, sc.Status(g), StatusSuccess) require.Equal(t, NodeStatusSuccess, nodes[0].State().Status) - require.Equal(t, nodes[0].data.DoneCount, 1) + require.Equal(t, nodes[0].data.State.DoneCount, 1) } func TestSchedulerStatusText(t *testing.T) { @@ -643,7 +643,7 @@ func TestNodeSetupFailure(t *testing.T) { nodes := g.Nodes() require.Equal(t, NodeStatusError, nodes[0].State().Status) - require.Equal(t, nodes[0].data.DoneCount, 0) + require.Equal(t, nodes[0].data.State.DoneCount, 0) } func TestNodeTeardownFailure(t *testing.T) { diff --git a/internal/frontend/dag/handler.go b/internal/frontend/dag/handler.go index 260b4644..f1c9fcf2 100644 --- a/internal/frontend/dag/handler.go +++ b/internal/frontend/dag/handler.go @@ -426,16 +426,16 @@ func (h *Handler) processStepLogRequest( } if node == nil { - if status.OnSuccess != nil && status.OnSuccess.Name == *params.Step { + if status.OnSuccess != nil && status.OnSuccess.Step.Name == *params.Step { node = status.OnSuccess } - if status.OnFailure != nil && status.OnFailure.Name == *params.Step { + if status.OnFailure != nil && status.OnFailure.Step.Name == *params.Step { node = status.OnFailure } - if status.OnCancel != nil && status.OnCancel.Name == *params.Step { + if status.OnCancel != nil && status.OnCancel.Step.Name == *params.Step { node = status.OnCancel } - if status.OnExit != nil && status.OnExit.Name == *params.Step { + if status.OnExit != nil && status.OnExit.Step.Name == *params.Step { node = status.OnExit } } @@ -489,7 +489,7 @@ func (h *Handler) processLogRequest( nodeNameToStatusList := map[string][]scheduler.NodeStatus{} for idx, log := range logs { for _, node := range log.Status.Nodes { - addNodeStatus(nodeNameToStatusList, len(logs), idx, node.Name, node.Status) + addNodeStatus(nodeNameToStatusList, len(logs), idx, node.Step.Name, node.Status) } } @@ -515,23 +515,23 @@ func (h *Handler) processLogRequest( for idx, log := range logs { if n := log.Status.OnSuccess; n != nil { addNodeStatus( - handlerToStatusList, len(logs), idx, n.Name, n.Status, + handlerToStatusList, len(logs), idx, n.Step.Name, n.Status, ) } if n := log.Status.OnFailure; n != nil { addNodeStatus( - handlerToStatusList, len(logs), idx, n.Name, n.Status, + handlerToStatusList, len(logs), idx, n.Step.Name, n.Status, ) } if n := log.Status.OnCancel; n != nil { n := log.Status.OnCancel addNodeStatus( - handlerToStatusList, len(logs), idx, n.Name, n.Status, + handlerToStatusList, len(logs), idx, n.Step.Name, n.Status, ) } if n := log.Status.OnExit; n != nil { addNodeStatus( - handlerToStatusList, len(logs), idx, n.Name, n.Status, + handlerToStatusList, len(logs), idx, n.Step.Name, n.Status, ) } } diff --git a/internal/logger/file.go b/internal/logger/file.go new file mode 100644 index 00000000..a718ec2a --- /dev/null +++ b/internal/logger/file.go @@ -0,0 +1,75 @@ +// Copyright (C) 2024 The Daguflow/Dagu Authors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logger + +import ( + "fmt" + "os" + "path/filepath" + "time" + + "github.com/daguflow/dagu/internal/util" +) + +// LogFileConfig holds the configuration for opening a log file +type LogFileConfig struct { + Prefix string + LogDir string + DAGLogDir string + DAGName string + RequestID string +} + +// OpenLogFile opens a log file for the workflow. +func OpenLogFile(config LogFileConfig) (*os.File, error) { + logDir, err := prepareLogDirectory(config) + if err != nil { + return nil, fmt.Errorf("failed to prepare log directory: %w", err) + } + + filename := generateLogFilename(config) + return openFile(filepath.Join(logDir, filename)) +} + +func prepareLogDirectory(config LogFileConfig) (string, error) { + validName := util.ValidFilename(config.DAGName) + logDir := filepath.Join(config.LogDir, validName) + if config.DAGLogDir != "" { + logDir = filepath.Join(config.DAGLogDir, validName) + } + if err := os.MkdirAll(logDir, 0755); err != nil { + return "", fmt.Errorf("failed to create log directory: %w", err) + } + + return logDir, nil +} + +func generateLogFilename(config LogFileConfig) string { + return fmt.Sprintf("%s%s.%s.%s.log", + config.Prefix, + util.ValidFilename(config.DAGName), + time.Now().Format("20060102.15:04:05.000"), + util.TruncString(config.RequestID, 8), + ) +} + +func openFile(filepath string) (*os.File, error) { + return os.OpenFile( + filepath, + os.O_CREATE|os.O_WRONLY|os.O_APPEND|os.O_SYNC, + 0644, + ) +} diff --git a/internal/logger/file_test.go b/internal/logger/file_test.go new file mode 100644 index 00000000..e44f008c --- /dev/null +++ b/internal/logger/file_test.go @@ -0,0 +1,123 @@ +// Copyright (C) 2024 The Daguflow/Dagu Authors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package logger + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestOpenLogFile(t *testing.T) { + tempDir, err := os.MkdirTemp("", "test_log_dir") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + config := LogFileConfig{ + Prefix: "test_", + LogDir: tempDir, + DAGName: "test_dag", + RequestID: "12345678", + } + + file, err := OpenLogFile(config) + require.NoError(t, err) + defer file.Close() + + assert.NotNil(t, file) + assert.True(t, filepath.IsAbs(file.Name())) + assert.Contains(t, file.Name(), "test_dag") + assert.Contains(t, file.Name(), "test_") + assert.Contains(t, file.Name(), "12345678") +} + +func TestPrepareLogDirectory(t *testing.T) { + tempDir, err := os.MkdirTemp("", "test_log_dir") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + tests := []struct { + name string + config LogFileConfig + expected string + }{ + { + name: "Default LogDir", + config: LogFileConfig{ + LogDir: tempDir, + DAGName: "test_dag", + }, + expected: filepath.Join(tempDir, "test_dag"), + }, + { + name: "Custom DAGLogDir", + config: LogFileConfig{ + LogDir: tempDir, + DAGLogDir: filepath.Join(tempDir, "custom"), + DAGName: "test_dag", + }, + expected: filepath.Join(tempDir, "custom", "test_dag"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result, err := prepareLogDirectory(tt.config) + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + assert.DirExists(t, result) + }) + } +} + +func TestGenerateLogFilename(t *testing.T) { + config := LogFileConfig{ + Prefix: "test_", + DAGName: "test dag", + RequestID: "12345678", + } + + filename := generateLogFilename(config) + + assert.Contains(t, filename, "test_") + assert.Contains(t, filename, "test_dag") + assert.Contains(t, filename, time.Now().Format("20060102")) + assert.Contains(t, filename, "12345678") + assert.Contains(t, filename, ".log") +} + +func TestOpenFile(t *testing.T) { + tempDir, err := os.MkdirTemp("", "test_log_dir") + require.NoError(t, err) + defer os.RemoveAll(tempDir) + + filePath := filepath.Join(tempDir, "test.log") + + file, err := openFile(filePath) + require.NoError(t, err) + defer file.Close() + + assert.NotNil(t, file) + assert.Equal(t, filePath, file.Name()) + + info, err := file.Stat() + require.NoError(t, err) + assert.Equal(t, os.FileMode(0644), info.Mode().Perm()) +} diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 73fa3702..7c7ffcf2 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -31,11 +31,13 @@ type ( Info(msg string, tags ...any) Warn(msg string, tags ...any) Error(msg string, tags ...any) + Fatal(msg string, tags ...any) Debugf(format string, v ...any) Infof(format string, v ...any) Warnf(format string, v ...any) Errorf(format string, v ...any) + Fatalf(format string, v ...any) With(attrs ...any) Logger WithGroup(name string) Logger @@ -52,7 +54,6 @@ var _ Logger = (*appLogger)(nil) type appLogger struct { logger *slog.Logger guardedHandler *guardedHandler - prefix string quiet bool } @@ -174,42 +175,54 @@ func newHandler(f *os.File, format string, opts *slog.HandlerOptions) slog.Handl // Debugf implements logger.Logger. func (a *appLogger) Debugf(format string, v ...any) { - a.logger.Debug(fmt.Sprintf(a.prefix+format, v...)) + a.logger.Debug(fmt.Sprintf(format, v...)) } // Errorf implements logger.Logger. func (a *appLogger) Errorf(format string, v ...any) { - a.logger.Error(fmt.Sprintf(a.prefix+format, v...)) + a.logger.Error(fmt.Sprintf(format, v...)) +} + +// Fatalf implements logger.Logger. +func (a *appLogger) Fatalf(format string, v ...any) { + a.logger.Error(fmt.Sprintf(format, v...)) + os.Exit(1) } // Infof implements logger.Logger. func (a *appLogger) Infof(format string, v ...any) { - a.logger.Info(fmt.Sprintf(a.prefix+format, v...)) + a.logger.Info(fmt.Sprintf(format, v...)) } // Warnf implements logger.Logger. func (a *appLogger) Warnf(format string, v ...any) { - a.logger.Warn(fmt.Sprintf(a.prefix+format, v...)) + a.logger.Warn(fmt.Sprintf(format, v...)) } // Debug implements logger.Logger. func (a *appLogger) Debug(msg string, tags ...any) { - a.logger.Debug(a.prefix+msg, tags...) + a.logger.Debug(msg, tags...) } // Error implements logger.Logger. func (a *appLogger) Error(msg string, tags ...any) { - a.logger.Error(a.prefix+msg, tags...) + a.logger.Error(msg, tags...) +} + +// Fatal implements logger.Logger. +func (a *appLogger) Fatal(msg string, tags ...any) { + a.logger.Error(msg, tags...) + os.Exit(1) } // Info implements logger.Logger. func (a *appLogger) Info(msg string, tags ...any) { - a.logger.Info(a.prefix+msg, tags...) + a.logger.Info(msg, tags...) } // Warn implements logger.Logger. func (a *appLogger) Warn(msg string, tags ...any) { - a.logger.Warn(a.prefix+msg, tags...) + a.logger.Warn(msg, tags...) } // With implements logger.Logger. diff --git a/internal/persistence/interface.go b/internal/persistence/interface.go index b52b49b2..2ba5debb 100644 --- a/internal/persistence/interface.go +++ b/internal/persistence/interface.go @@ -20,7 +20,6 @@ import ( "time" "github.com/daguflow/dagu/internal/dag" - "github.com/daguflow/dagu/internal/frontend/gen/restapi/operations/dags" "github.com/daguflow/dagu/internal/persistence/grep" "github.com/daguflow/dagu/internal/persistence/model" ) @@ -54,11 +53,10 @@ type DAGStore interface { Create(name string, spec []byte) (string, error) Delete(name string) error List() (ret []*dag.DAG, errs []string, err error) - ListPagination(params dags.ListDagsParams) (*DagListPaginationResult, error) + ListPagination(params DAGListPaginationArgs) (*DagListPaginationResult, error) GetMetadata(name string) (*dag.DAG, error) GetDetails(name string) (*dag.DAG, error) Grep(pattern string) (ret []*GrepResult, errs []string, err error) - Load(name string) (*dag.DAG, error) Rename(oldID, newID string) error GetSpec(name string) (string, error) UpdateSpec(name string, spec []byte) error @@ -66,18 +64,25 @@ type DAGStore interface { TagList() ([]string, []string, error) } -type GrepResult struct { - Name string - DAG *dag.DAG - Matches []*grep.Match +type DAGListPaginationArgs struct { + Page int + Limit int + Name *string + Tag *string } type DagListPaginationResult struct { DagList []*dag.DAG - Count int64 + Count int ErrorList []string } +type GrepResult struct { + Name string + DAG *dag.DAG + Matches []*grep.Match +} + type FlagStore interface { ToggleSuspend(id string, suspend bool) error IsSuspended(id string) bool diff --git a/internal/persistence/local/dag_store.go b/internal/persistence/local/dag_store.go index e74a72a2..4aa1e5aa 100644 --- a/internal/persistence/local/dag_store.go +++ b/internal/persistence/local/dag_store.go @@ -26,7 +26,6 @@ import ( "time" "github.com/daguflow/dagu/internal/dag" - "github.com/daguflow/dagu/internal/frontend/gen/restapi/operations/dags" "github.com/daguflow/dagu/internal/persistence" "github.com/daguflow/dagu/internal/persistence/filecache" "github.com/daguflow/dagu/internal/persistence/grep" @@ -163,10 +162,6 @@ func (d *dagStoreImpl) ensureDirExist() error { return nil } -func (d *dagStoreImpl) getFileNameDagMeta() { - -} - func (d *dagStoreImpl) searchName(fileName string, searchText *string) bool { if searchText == nil { return true @@ -199,15 +194,15 @@ func (d *dagStoreImpl) getTagList(tagSet map[string]struct{}) []string { return tagList } -func (d *dagStoreImpl) ListPagination(params dags.ListDagsParams) (*persistence.DagListPaginationResult, error) { +func (d *dagStoreImpl) ListPagination(params persistence.DAGListPaginationArgs) (*persistence.DagListPaginationResult, error) { var ( dagList = make([]*dag.DAG, 0) errList = make([]string, 0) - count int64 + count int currentDag *dag.DAG ) - if err := filepath.WalkDir(d.dir, func(path string, dir fs.DirEntry, err error) error { + if err := filepath.WalkDir(d.dir, func(_ string, dir fs.DirEntry, err error) error { if err != nil { return err } @@ -220,12 +215,12 @@ func (d *dagStoreImpl) ListPagination(params dags.ListDagsParams) (*persistence. errList = append(errList, fmt.Sprintf("reading %s failed: %s", dir.Name(), err)) } - if !d.searchName(dir.Name(), params.SearchName) || currentDag == nil || !d.searchTags(currentDag.Tags, params.SearchTag) { + if !d.searchName(dir.Name(), params.Name) || currentDag == nil || !d.searchTags(currentDag.Tags, params.Tag) { return nil } count++ - if count > (params.Page-1)*params.Limit && int64(len(dagList)) < params.Limit { + if count > (params.Page-1)*params.Limit && len(dagList) < params.Limit { dagList = append(dagList, currentDag) } @@ -332,11 +327,6 @@ func (d *dagStoreImpl) Grep( return ret, errs, nil } -func (d *dagStoreImpl) Load(name string) (*dag.DAG, error) { - // TODO implement me - panic("implement me") -} - func (d *dagStoreImpl) Rename(oldID, newID string) error { oldLoc, err := d.fileLocation(oldID) if err != nil { @@ -413,7 +403,7 @@ func (d *dagStoreImpl) TagList() ([]string, []string, error) { err error ) - if err = filepath.WalkDir(d.dir, func(path string, dir fs.DirEntry, err error) error { + if err = filepath.WalkDir(d.dir, func(_ string, dir fs.DirEntry, err error) error { if err != nil { return err } diff --git a/internal/persistence/model/node.go b/internal/persistence/model/node.go index a6be152f..f884bcf0 100644 --- a/internal/persistence/model/node.go +++ b/internal/persistence/model/node.go @@ -43,19 +43,19 @@ func FromNodes(nodes []scheduler.NodeData) []*Node { func FromNode(node scheduler.NodeData) *Node { return &Node{ Step: node.Step, - Log: node.Log, - StartedAt: util.FormatTime(node.StartedAt), - FinishedAt: util.FormatTime(node.FinishedAt), - Status: node.Status, - StatusText: node.Status.String(), - RetryCount: node.RetryCount, - DoneCount: node.DoneCount, - Error: errText(node.Error), + Log: node.State.Log, + StartedAt: util.FormatTime(node.State.StartedAt), + FinishedAt: util.FormatTime(node.State.FinishedAt), + Status: node.State.Status, + StatusText: node.State.Status.String(), + RetryCount: node.State.RetryCount, + DoneCount: node.State.DoneCount, + Error: errText(node.State.Error), } } type Node struct { - dag.Step `json:"Step"` + Step dag.Step `json:"Step"` Log string `json:"Log"` StartedAt string `json:"StartedAt"` FinishedAt string `json:"FinishedAt"` diff --git a/internal/persistence/model/status_test.go b/internal/persistence/model/status_test.go index 09baae2d..e05efc40 100644 --- a/internal/persistence/model/status_test.go +++ b/internal/persistence/model/status_test.go @@ -65,7 +65,7 @@ func TestStatusSerialization(t *testing.T) { require.Equal(t, status.Name, unmarshalled.Name) require.Equal(t, 1, len(unmarshalled.Nodes)) - require.Equal(t, workflow.Steps[0].Name, unmarshalled.Nodes[0].Name) + require.Equal(t, workflow.Steps[0].Name, unmarshalled.Nodes[0].Step.Name) } func TestCorrectRunningStatus(t *testing.T) {