Skip to content

Commit

Permalink
Refactor dag package for readability (dagu-org#587)
Browse files Browse the repository at this point in the history
* rename package

* move SyncMap type definition

* remove unused code

* simplify DAG loader

* simplify schedule parsing code

* simplify builder code

* simplify the builder code

* simplify builder code

* simplify builder code

* simplify loader

* add comments

* add comments

* refactor util package

* simplify step builder

* fix github workflow

* format code

* format code

* fix typo
  • Loading branch information
yohamta committed Jun 9, 2024
1 parent 333024c commit 6b68bb3
Show file tree
Hide file tree
Showing 52 changed files with 2,090 additions and 1,811 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name: Release
on:
create:
tags:
- '*'
- "v[0-9]+.[0-9]+.[0-9]+"
- "v[0-9]+.[0-9]+.[0-9]+-*"

jobs:
goreleaser:
Expand All @@ -21,13 +22,13 @@ jobs:
uses: actions/setup-node@v3
with:
node-version: 16

- name: Set up yarn
run: npm install --global yarn

- name: Check out code
uses: actions/checkout@v3

- name: Yarn install
run: yarn install
working-directory: ui
Expand All @@ -48,4 +49,4 @@ jobs:
version: latest
args: release --rm-dist
env:
GITHUB_TOKEN: ${{secrets.DAGU_GITHUB_TOKEN}}
GITHUB_TOKEN: ${{secrets.DAGU_GITHUB_TOKEN}}
6 changes: 3 additions & 3 deletions cmd/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (

"github.com/dagu-dev/dagu/internal/engine"
"github.com/dagu-dev/dagu/internal/scheduler"
"github.com/dagu-dev/dagu/internal/utils"
"github.com/dagu-dev/dagu/internal/util"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
)

func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactory) {
t.Helper()

tmpDir := utils.MustTempDir("dagu_test")
tmpDir := util.MustTempDir("dagu_test")
changeHomeDir(tmpDir)

ds := client.NewDataStoreFactory(&config.Config{
Expand Down Expand Up @@ -97,7 +97,7 @@ func withSpool(t *testing.T, f func()) string {
}

func testDAGFile(name string) string {
d := path.Join(utils.MustGetwd(), "testdata")
d := path.Join(util.MustGetwd(), "testdata")
return path.Join(d, name)
}

Expand Down
3 changes: 1 addition & 2 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ func setDefaultConfigPath() {
}

func loadDAG(dagFile, params string) (d *dag.DAG, err error) {
dagLoader := &dag.Loader{BaseConfig: config.Get().BaseConfig}
return dagLoader.Load(dagFile, params)
return dag.Load(config.Get().BaseConfig, dagFile, params)
}

func getFlagString(cmd *cobra.Command, name, fallback string) string {
Expand Down
30 changes: 15 additions & 15 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/dagu-dev/dagu/internal/reporter"
"github.com/dagu-dev/dagu/internal/scheduler"
"github.com/dagu-dev/dagu/internal/sock"
"github.com/dagu-dev/dagu/internal/utils"
"github.com/dagu-dev/dagu/internal/util"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -183,7 +183,7 @@ func (a *Agent) signal(sig os.Signal, allowOverride bool) {
}

func (a *Agent) init() {
logDir := path.Join(a.DAG.LogDir, utils.ValidFilename(a.DAG.Name, "_"))
logDir := path.Join(a.DAG.LogDir, util.ValidFilename(a.DAG.Name, "_"))
config := &scheduler.Config{
LogDir: logDir,
MaxActiveRuns: a.DAG.MaxActiveRuns,
Expand Down Expand Up @@ -221,9 +221,9 @@ func (a *Agent) init() {
}}
logFilename := filepath.Join(
logDir, fmt.Sprintf("agent_%s.%s.%s.log",
utils.ValidFilename(a.DAG.Name, "_"),
util.ValidFilename(a.DAG.Name, "_"),
time.Now().Format("20060102.15:04:05.000"),
utils.TruncString(a.requestId, 8),
util.TruncString(a.requestId, 8),
))
a.logManager = &logManager{logFilename: logFilename}
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func (a *Agent) setupDatabase() error {
// TODO: do not use the persistence package directly.
a.historyStore = a.dataStoreFactory.NewHistoryStore()
if err := a.historyStore.RemoveOld(a.DAG.Location, a.DAG.HistRetentionDays); err != nil {
utils.LogErr("clean old history data", err)
util.LogErr("clean old history data", err)
}

return a.historyStore.Open(a.DAG.Location, time.Now(), a.requestId)
Expand Down Expand Up @@ -291,7 +291,7 @@ func (a *Agent) run(ctx context.Context) error {
return err
}
defer func() {
utils.LogErr("close log file", a.closeLogFile())
util.LogErr("close log file", a.closeLogFile())
tl.Close()
}()

Expand All @@ -301,7 +301,7 @@ func (a *Agent) run(ctx context.Context) error {
}
}()

utils.LogErr("write status", a.historyStore.Write(a.Status()))
util.LogErr("write status", a.historyStore.Write(a.Status()))

listen := make(chan error)
go func() {
Expand All @@ -312,7 +312,7 @@ func (a *Agent) run(ctx context.Context) error {
}()

defer func() {
utils.LogErr("shutdown socket frontend", a.socketServer.Shutdown())
util.LogErr("shutdown socket frontend", a.socketServer.Shutdown())
}()

if err := <-listen; err != nil {
Expand All @@ -325,8 +325,8 @@ func (a *Agent) run(ctx context.Context) error {
go func() {
for node := range done {
status := a.Status()
utils.LogErr("write status", a.historyStore.Write(status))
utils.LogErr("report step", a.reporter.ReportStep(a.DAG, status, node))
util.LogErr("write status", a.historyStore.Write(status))
util.LogErr("report step", a.reporter.ReportStep(a.DAG, status, node))
}
}()

Expand All @@ -335,7 +335,7 @@ func (a *Agent) run(ctx context.Context) error {
if a.finished.Load() {
return
}
utils.LogErr("write status", a.historyStore.Write(a.Status()))
util.LogErr("write status", a.historyStore.Write(a.Status()))
}()

ctx = dag.NewContext(ctx, a.DAG, a.dataStoreFactory.NewDAGStore())
Expand All @@ -344,13 +344,13 @@ func (a *Agent) run(ctx context.Context) error {
status := a.Status()

log.Println("schedule finished.")
utils.LogErr("write status", a.historyStore.Write(a.Status()))
util.LogErr("write status", a.historyStore.Write(a.Status()))

a.reporter.ReportSummary(status, lastErr)
utils.LogErr("send email", a.reporter.SendMail(a.DAG, status, lastErr))
util.LogErr("send email", a.reporter.SendMail(a.DAG, status, lastErr))

a.finished.Store(true)
utils.LogErr("close data file", a.historyStore.Close())
util.LogErr("close data file", a.historyStore.Close())

return lastErr
}
Expand Down Expand Up @@ -439,7 +439,7 @@ func (l *logManager) setupLogFile() (err error) {
if err := os.MkdirAll(dir, 0755); err != nil {
return err
}
l.logFile, err = utils.OpenOrCreateFile(l.logFilename)
l.logFile, err = util.OpenOrCreateFile(l.logFilename)
return
}

Expand Down
9 changes: 4 additions & 5 deletions internal/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ import (
"github.com/dagu-dev/dagu/internal/engine"
"github.com/dagu-dev/dagu/internal/persistence/model"
"github.com/dagu-dev/dagu/internal/scheduler"
"github.com/dagu-dev/dagu/internal/utils"
"github.com/dagu-dev/dagu/internal/util"
"github.com/stretchr/testify/require"
)

var testdataDir = path.Join(utils.MustGetwd(), "testdata")
var testdataDir = path.Join(util.MustGetwd(), "testdata")

func setupTest(t *testing.T) (string, engine.Engine, persistence.DataStoreFactory) {
t.Helper()

tmpDir := utils.MustTempDir("dagu_test")
tmpDir := util.MustTempDir("dagu_test")
_ = os.Setenv("HOME", tmpDir)
_ = config.LoadConfig()

Expand Down Expand Up @@ -351,8 +351,7 @@ func (h *mockResponseWriter) WriteHeader(statusCode int) {

func testLoadDAG(t *testing.T, name string) *dag.DAG {
file := path.Join(testdataDir, name)
cl := &dag.Loader{}
d, err := cl.Load(file, "")
d, err := dag.Load("", file, "")
require.NoError(t, err)
return d
}
Loading

0 comments on commit 6b68bb3

Please sign in to comment.