Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/flowlord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ func (tm *taskMaster) htmlTask(w http.ResponseWriter, r *http.Request) {

// Get task summary statistics for the date
summaryStart := time.Now()
taskStats, err := tm.taskCache.GetTaskSummaryByDate(dt)
taskStats, err := tm.taskCache.GetTaskRecapByDate(dt)
summaryTime := time.Since(summaryStart)
if err != nil {
log.Printf("Error getting task summary: %v", err)
Expand Down Expand Up @@ -825,7 +825,7 @@ func (tm *taskMaster) backload(req request) response {
start = at
end = at
}

phase := tm.taskCache.Search(req.Task, req.Job)
if phase.FilePath != "" {
msg = append(msg, "phase found in "+phase.FilePath)
Expand Down
2 changes: 0 additions & 2 deletions apps/flowlord/sqlite/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,3 @@ func BuildCompactSummary(alerts []AlertRecord) []SummaryLine {

return result
}


1 change: 1 addition & 0 deletions apps/flowlord/sqlite/dates.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,4 @@ func (s *SQLite) RebuildDateIndex() error {
}



2 changes: 0 additions & 2 deletions apps/flowlord/sqlite/dbstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,5 +188,3 @@ func formatBytes(bytes int64) string {
}
return fmt.Sprintf("%.1f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
}


1 change: 1 addition & 0 deletions apps/flowlord/sqlite/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,4 @@ func (s *SQLite) GetFileMessagesWithTasks(limit int, offset int) ([]FileMessageW
}



1 change: 1 addition & 0 deletions apps/flowlord/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,4 @@ func (o *SQLite) Sync() error {
}



9 changes: 5 additions & 4 deletions apps/flowlord/sqlite/sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/pcelvng/task"

"github.com/pcelvng/task-tools/file/stat"
)

Expand Down Expand Up @@ -40,15 +41,15 @@ func TestDatesByType(t *testing.T) {
VALUES (?, ?, ?, ?, ?, ?),
(?, ?, ?, ?, ?, ?)
`, "alert1", "2024-01-15T11:00:00Z", "data-validation", "check", "Validation error", "2024-01-15T11:00:00Z",
"alert2", "2024-01-17T11:00:00Z", "data-validation", "check", "Validation error", "2024-01-17T11:00:00Z")
"alert2", "2024-01-17T11:00:00Z", "data-validation", "check", "Validation error", "2024-01-17T11:00:00Z")
if err != nil {
t.Fatalf("Failed to insert alerts: %v", err)
}

// Add sample file messages
fileMsg1 := stat.Stats{
Path: "gs://bucket/file1.json",
Size: 1024,
Path: "gs://bucket/file1.json",
Size: 1024,
}
db.AddFileMessage(fileMsg1, []string{}, []string{})

Expand Down Expand Up @@ -109,4 +110,4 @@ func TestDatesByType(t *testing.T) {
if len(fileDates2) != len(fileDates) {
t.Error("GetDatesWithFiles() should return same results as DatesByType('files')")
}
}
}
14 changes: 7 additions & 7 deletions apps/flowlord/sqlite/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *DurationStats) String() string {

func (stats *Stats) Add(tsk task.Task) {
tm := tmpl.TaskTime(tsk)

// Handle different result types
switch tsk.Result {
case task.ErrResult:
Expand Down Expand Up @@ -180,7 +180,7 @@ func (ts TaskStats) UniqueTypes() []string {
typeSet[key] = struct{}{}
}
}

types := make([]string, 0, len(typeSet))
for t := range typeSet {
types = append(types, t)
Expand All @@ -192,7 +192,7 @@ func (ts TaskStats) UniqueTypes() []string {
// JobsByType returns jobs organized by type
func (ts TaskStats) JobsByType() map[string][]string {
jobsByType := make(map[string][]string)

for key := range ts {
// Split key into type and job
parts := strings.SplitN(key, ":", 2)
Expand All @@ -204,19 +204,19 @@ func (ts TaskStats) JobsByType() map[string][]string {
}
}
}

// Sort jobs for each type
for typ := range jobsByType {
sort.Strings(jobsByType[typ])
}

return jobsByType
}

// TotalCounts returns aggregate result counts across all tasks
func (ts TaskStats) TotalCounts() TaskCounts {
var counts TaskCounts

for _, stats := range ts {
counts.Total += stats.CompletedCount + stats.ErrorCount + stats.AlertCount + stats.WarnCount + stats.RunningCount
counts.Completed += stats.CompletedCount
Expand All @@ -225,6 +225,6 @@ func (ts TaskStats) TotalCounts() TaskCounts {
counts.Warn += stats.WarnCount
counts.Running += stats.RunningCount
}

return counts
}
9 changes: 3 additions & 6 deletions apps/flowlord/sqlite/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ func (s *SQLite) CheckIncompleteTasks() int {
tr.id = ar.task_id AND
tr.type = ar.task_type AND
tr.job = ar.job AND
ar.msg LIKE 'INCOMPLETE:%' AND
ar.created_at > datetime('now', '-1 day')
ar.msg LIKE 'INCOMPLETE:%'
)
WHERE tr.created < ?
AND tr.result = ''
Expand Down Expand Up @@ -400,8 +399,8 @@ func (s *SQLite) GetTasksByDate(date time.Time, filter *TaskFilter) ([]TaskView,
return tasks, totalCount, nil
}

// GetTaskSummaryByDate creates a summary of tasks for a specific date
func (s *SQLite) GetTaskSummaryByDate(date time.Time) (TaskStats, error) {
// GetTaskRecapByDate creates a recap of tasks for a specific date
func (s *SQLite) GetTaskRecapByDate(date time.Time) (TaskStats, error) {
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -463,5 +462,3 @@ func extractJobFromTask(t task.Task) string {
}
return job
}


2 changes: 1 addition & 1 deletion apps/flowlord/sqlite/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *SQLite) IsDir() bool {
// Search the all workflows within the cache and return the first
// matching phase with the specific task and job (optional)
func (s *SQLite) Search(taskType, job string) PhaseDB {
return s.Get(task.Task{Type: taskType, Job: job})
return s.Get(task.Task{Type: taskType, Job: job})
}

// Get the Phase associated with the task
Expand Down
3 changes: 1 addition & 2 deletions apps/flowlord/sqlite/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,7 @@ func TestValidatePhase(t *testing.T) {
"cron 6": {
Input: Phase{Rule: "cron=1 2 3 4 5 6"},
},
"cron complex":
{
"cron complex": {
Input: Phase{Rule: "cron=20 */6 * * SUN"},
},
"parse_err": {
Expand Down
Loading