diff --git a/base/task/monitor.go b/base/task/monitor.go deleted file mode 100644 index 6c70a89e5..000000000 --- a/base/task/monitor.go +++ /dev/null @@ -1,256 +0,0 @@ -// Copyright 2021 gorse Project Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package task - -import ( - "sort" - "strings" - "sync" - "time" - - mapset "github.com/deckarep/golang-set/v2" -) - -type Status string - -const ( - StatusPending Status = "Pending" - StatusComplete Status = "Complete" - StatusRunning Status = "Running" - StatusSuspended Status = "Suspended" - StatusFailed Status = "Failed" -) - -// Task progress information. -type Task struct { - Name string - Status Status - Done int - Total int - StartTime time.Time - FinishTime time.Time - Error string -} - -func NewTask(name string, total int) *Task { - return &Task{ - Name: name, - Status: StatusRunning, - Done: 0, - Total: total, - StartTime: time.Now(), - FinishTime: time.Time{}, - } -} - -func (t *Task) Update(done int) { - t.Done = done - t.Status = StatusRunning -} - -func (t *Task) Add(done int) { - if t != nil { - t.Status = StatusRunning - t.Done += done - } -} - -func (t *Task) Finish() { - if t != nil { - t.Status = StatusComplete - t.Done = t.Total - t.FinishTime = time.Now() - } -} - -func (t *Task) Suspend(flag bool) { - if t != nil { - if flag { - t.Status = StatusSuspended - } else { - t.Status = StatusRunning - } - } -} - -func (t *Task) Fail(err string) { - if t != nil { - t.Error = err - t.Status = StatusFailed - } -} - -func (t *Task) SubTask(done int) *SubTask { - if t == nil { - return nil - } - return &SubTask{ - Parent: t, - Start: t.Done, - End: t.Done + done, - } -} - -type SubTask struct { - Start int - End int - Parent *Task -} - -func (t *SubTask) Add(done int) { - if t != nil { - t.Parent.Add(done) - } -} - -func (t *SubTask) Finish() { - if t != nil { - t.Parent.Update(t.End) - } -} - -// Monitor monitors the progress of all tasks. -type Monitor struct { - TaskLock sync.Mutex - Tasks map[string]*Task -} - -// NewTaskMonitor creates a Monitor and add pending tasks. -func NewTaskMonitor() *Monitor { - return &Monitor{ - Tasks: make(map[string]*Task), - } -} - -func (tm *Monitor) GetTask(name string) *Task { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - return tm.Tasks[name] -} - -// Pending a task. -func (tm *Monitor) Pending(name string) { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - tm.Tasks[name] = &Task{ - Name: name, - Status: StatusPending, - } -} - -// Start a task. -func (tm *Monitor) Start(name string, total int) *Task { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - t := NewTask(name, total) - tm.Tasks[name] = t - return t -} - -// Finish a task. -func (tm *Monitor) Finish(name string) { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - task, exist := tm.Tasks[name] - if exist { - task.Finish() - } -} - -// Update the progress of a task. -func (tm *Monitor) Update(name string, done int) { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - tm.Tasks[name].Update(done) -} - -// Add the progress of a task. -func (tm *Monitor) Add(name string, done int) { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - tm.Tasks[name].Add(done) -} - -// Suspend a task. -func (tm *Monitor) Suspend(name string, flag bool) { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - tm.Tasks[name].Suspend(flag) -} - -func (tm *Monitor) Fail(name, err string) { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - tm.Tasks[name].Fail(err) -} - -// List all tasks and remove tasks from disconnected workers. -func (tm *Monitor) List(workers ...string) []Task { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - workerSet := mapset.NewSet(workers...) - var task []Task - for name, t := range tm.Tasks { - // remove tasks from disconnected workers - if strings.Contains(name, "[") && strings.Contains(name, "]") { - begin := strings.Index(name, "[") + 1 - end := strings.Index(name, "]") - if !workerSet.Contains(name[begin:end]) { - delete(tm.Tasks, name) - continue - } - } - task = append(task, *t) - } - sort.Sort(Tasks(task)) - return task -} - -// Get the progress of a task. -func (tm *Monitor) Get(name string) int { - tm.TaskLock.Lock() - defer tm.TaskLock.Unlock() - task, exist := tm.Tasks[name] - if exist { - return task.Done - } - return 0 -} - -// Tasks is used to sort []Task. -type Tasks []Task - -// Len is used to sort []Task. -func (t Tasks) Len() int { - return len(t) -} - -// Swap is used to sort []Task. -func (t Tasks) Swap(i, j int) { - t[i], t[j] = t[j], t[i] -} - -// Less is used to sort []Task. -func (t Tasks) Less(i, j int) bool { - if t[i].Status != StatusPending && t[j].Status == StatusPending { - return true - } else if t[i].Status == StatusPending && t[j].Status != StatusPending { - return false - } else if t[i].Status == StatusPending && t[j].Status == StatusPending { - return t[i].Name < t[j].Name - } else { - return t[i].StartTime.Before(t[j].StartTime) - } -} diff --git a/base/task/monitor_test.go b/base/task/monitor_test.go deleted file mode 100644 index d0f07d8b5..000000000 --- a/base/task/monitor_test.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2021 gorse Project Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package task - -import ( - "github.com/samber/lo" - "github.com/stretchr/testify/assert" - "testing" -) - -func TestTaskMonitor(t *testing.T) { - taskMonitor := NewTaskMonitor() - taskMonitor.Start("a", 100) - assert.Equal(t, 0, taskMonitor.Get("a")) - assert.Equal(t, "a", taskMonitor.Tasks["a"].Name) - assert.Equal(t, 100, taskMonitor.Tasks["a"].Total) - assert.Equal(t, 0, taskMonitor.Tasks["a"].Done) - assert.Equal(t, StatusRunning, taskMonitor.Tasks["a"].Status) - - taskMonitor.Update("a", 50) - assert.Equal(t, 50, taskMonitor.Get("a")) - assert.Equal(t, "a", taskMonitor.Tasks["a"].Name) - assert.Equal(t, 100, taskMonitor.Tasks["a"].Total) - assert.Equal(t, 50, taskMonitor.Tasks["a"].Done) - assert.Equal(t, StatusRunning, taskMonitor.Tasks["a"].Status) - taskMonitor.Suspend("a", true) - assert.Equal(t, StatusSuspended, taskMonitor.Tasks["a"].Status) - taskMonitor.Suspend("a", false) - assert.Equal(t, StatusRunning, taskMonitor.Tasks["a"].Status) - - taskMonitor.Add("a", 30) - assert.Equal(t, 80, taskMonitor.Get("a")) - assert.Equal(t, "a", taskMonitor.Tasks["a"].Name) - assert.Equal(t, 100, taskMonitor.Tasks["a"].Total) - assert.Equal(t, 80, taskMonitor.Tasks["a"].Done) - assert.Equal(t, StatusRunning, taskMonitor.Tasks["a"].Status) - - taskMonitor.Finish("a") - assert.Equal(t, 100, taskMonitor.Get("a")) - assert.Equal(t, "a", taskMonitor.Tasks["a"].Name) - assert.Equal(t, 100, taskMonitor.Tasks["a"].Total) - assert.Equal(t, 100, taskMonitor.Tasks["a"].Done) - assert.Equal(t, StatusComplete, taskMonitor.Tasks["a"].Status) - - taskMonitor.Start("b", 100) - taskMonitor.Fail("b", "error") - assert.Equal(t, "error", taskMonitor.Tasks["b"].Error) - assert.Equal(t, StatusFailed, taskMonitor.Tasks["b"].Status) - - taskMonitor.Pending("c") - assert.Equal(t, StatusPending, taskMonitor.Tasks["c"].Status) - - taskMonitor.Start("d [a]", 100) - taskMonitor.Start("d [b]", 100) - tasks := taskMonitor.List("a") - assert.ElementsMatch(t, lo.ToSlicePtr(tasks), []*Task{ - taskMonitor.GetTask("a"), - taskMonitor.GetTask("b"), - taskMonitor.GetTask("c"), - taskMonitor.GetTask("d [a]"), - }) -} - -func TestSubTask(t *testing.T) { - task := NewTask("a", 100) - task.Add(10) - assert.Equal(t, 10, task.Done) - s := task.SubTask(80) - assert.Equal(t, 10, s.Start) - assert.Equal(t, 90, s.End) - s.Add(20) - assert.Equal(t, 30, task.Done) - s.Finish() - assert.Equal(t, 90, task.Done) -}