From 04563c0d0d1f07137e0dc8bf60895fde1867140f Mon Sep 17 00:00:00 2001 From: Jason Kulatunga Date: Sat, 9 Jul 2022 10:05:48 -0700 Subject: [PATCH] ensure we have the ability to keep influxdb tasks up-to-date. --- .../pkg/database/scrutiny_repository_tasks.go | 48 ++++++++++++++++--- .../scrutiny_repository_tasks_test.go | 18 +++---- 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/webapp/backend/pkg/database/scrutiny_repository_tasks.go b/webapp/backend/pkg/database/scrutiny_repository_tasks.go index 079caff7..92b67f40 100644 --- a/webapp/backend/pkg/database/scrutiny_repository_tasks.go +++ b/webapp/backend/pkg/database/scrutiny_repository_tasks.go @@ -11,30 +11,66 @@ import ( //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func (sr *scrutinyRepository) EnsureTasks(ctx context.Context, orgID string) error { weeklyTaskName := "tsk-weekly-aggr" + weeklyTaskScript := sr.DownsampleScript("weekly") if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: weeklyTaskName}); findErr == nil && len(found) == 0 { //weekly on Sunday at 1:00am - _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, weeklyTaskName, sr.DownsampleScript("weekly"), "0 1 * * 0", orgID) + _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, weeklyTaskName, weeklyTaskScript, "0 1 * * 0", orgID) if err != nil { return err } + } else if len(found) == 1 { + //check if we should update + task := &found[0] + if weeklyTaskScript != task.Flux { + sr.logger.Infoln("updating weekly task script") + task.Flux = weeklyTaskScript + _, err := sr.influxTaskApi.UpdateTask(ctx, task) + if err != nil { + return err + } + } } monthlyTaskName := "tsk-monthly-aggr" + monthlyTaskScript := sr.DownsampleScript("monthly") if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: monthlyTaskName}); findErr == nil && len(found) == 0 { //monthly on first day of the month at 1:30am - _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, monthlyTaskName, sr.DownsampleScript("monthly"), "30 1 1 * *", orgID) + _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, monthlyTaskName, monthlyTaskScript, "30 1 1 * *", orgID) if err != nil { return err } + } else if len(found) == 1 { + //check if we should update + task := &found[0] + if monthlyTaskScript != task.Flux { + sr.logger.Infoln("updating monthly task script") + task.Flux = monthlyTaskScript + _, err := sr.influxTaskApi.UpdateTask(ctx, task) + if err != nil { + return err + } + } } yearlyTaskName := "tsk-yearly-aggr" + yearlyTaskScript := sr.DownsampleScript("yearly") if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: yearlyTaskName}); findErr == nil && len(found) == 0 { //yearly on the first day of the year at 2:00am - _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, yearlyTaskName, sr.DownsampleScript("yearly"), "0 2 1 1 *", orgID) + _, err := sr.influxTaskApi.CreateTaskWithCron(ctx, yearlyTaskName, yearlyTaskScript, "0 2 1 1 *", orgID) if err != nil { return err } + } else if len(found) == 1 { + //check if we should update + task := &found[0] + if yearlyTaskScript != task.Flux { + sr.logger.Infoln("updating yearly task script") + task.Flux = yearlyTaskScript + _, err := sr.influxTaskApi.UpdateTask(ctx, task) + if err != nil { + return err + } + } } return nil } @@ -102,14 +138,14 @@ func (sr *scrutinyRepository) DownsampleScript(aggregationType string) string { |> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) |> to(bucket: destBucket, org: destOrg) - temp_data = from(bucket: sourceBucket) + from(bucket: sourceBucket) |> range(start: rangeStart, stop: rangeEnd) |> filter(fn: (r) => r["_measurement"] == "temp") |> group(columns: ["device_wwn"]) |> toInt() - - temp_data |> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) + |> set(key: "_measurement", value: "temp") + |> set(key: "_field", value: "temp") |> to(bucket: destBucket, org: destOrg) `, sourceBucket, diff --git a/webapp/backend/pkg/database/scrutiny_repository_tasks_test.go b/webapp/backend/pkg/database/scrutiny_repository_tasks_test.go index 487a1bc2..4f2dc515 100644 --- a/webapp/backend/pkg/database/scrutiny_repository_tasks_test.go +++ b/webapp/backend/pkg/database/scrutiny_repository_tasks_test.go @@ -42,14 +42,14 @@ func Test_DownsampleScript_Weekly(t *testing.T) { |> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) |> to(bucket: destBucket, org: destOrg) - temp_data = from(bucket: sourceBucket) + from(bucket: sourceBucket) |> range(start: rangeStart, stop: rangeEnd) |> filter(fn: (r) => r["_measurement"] == "temp") |> group(columns: ["device_wwn"]) |> toInt() - - temp_data |> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) + |> set(key: "_measurement", value: "temp") + |> set(key: "_field", value: "temp") |> to(bucket: destBucket, org: destOrg) `, influxDbScript) } @@ -89,14 +89,14 @@ func Test_DownsampleScript_Monthly(t *testing.T) { |> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) |> to(bucket: destBucket, org: destOrg) - temp_data = from(bucket: sourceBucket) + from(bucket: sourceBucket) |> range(start: rangeStart, stop: rangeEnd) |> filter(fn: (r) => r["_measurement"] == "temp") |> group(columns: ["device_wwn"]) |> toInt() - - temp_data |> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) + |> set(key: "_measurement", value: "temp") + |> set(key: "_field", value: "temp") |> to(bucket: destBucket, org: destOrg) `, influxDbScript) } @@ -136,14 +136,14 @@ func Test_DownsampleScript_Yearly(t *testing.T) { |> aggregateWindow(every: aggWindow, fn: last, createEmpty: false) |> to(bucket: destBucket, org: destOrg) - temp_data = from(bucket: sourceBucket) + from(bucket: sourceBucket) |> range(start: rangeStart, stop: rangeEnd) |> filter(fn: (r) => r["_measurement"] == "temp") |> group(columns: ["device_wwn"]) |> toInt() - - temp_data |> aggregateWindow(fn: mean, every: aggWindow, createEmpty: false) + |> set(key: "_measurement", value: "temp") + |> set(key: "_field", value: "temp") |> to(bucket: destBucket, org: destOrg) `, influxDbScript) }