Skip to content

Commit

Permalink
Improve error handling in task queue
Browse files Browse the repository at this point in the history
  • Loading branch information
phwissmann committed Aug 27, 2024
1 parent 6b2d9e1 commit e75e4f6
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
22 changes: 10 additions & 12 deletions openem-ingestor/core/taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
type TaskQueue struct {
datasetSourceFolders sync.Map // Datastructure to store all the upload requests
inputChannel chan IngestionTask // Requests to upload data are put into this channel
errorChannel chan string // the go routine puts the error of the upload here
resultChannel chan TaskResult // The result of the upload is put into this channel
AppContext context.Context
}
Expand Down Expand Up @@ -46,6 +45,7 @@ type IngestionTask struct {
type TaskResult struct {
Elapsed_seconds int
Dataset_PID string
Error error
}

func (w *TaskQueue) Startup(numWorkers int) {
Expand Down Expand Up @@ -89,10 +89,7 @@ func (w *TaskQueue) startWorker() {

ingestionTask.Cancel = cancel

result, err := w.IngestDataset(task_context, ingestionTask)
if err != nil {
w.errorChannel <- err.Error()
}
result := w.IngestDataset(task_context, ingestionTask)
w.resultChannel <- result
}
}
Expand Down Expand Up @@ -132,12 +129,13 @@ func (w *TaskQueue) ScheduleTask(id uuid.UUID) {

// Go routine to handle result and errors
go func(id uuid.UUID) {
select {
case taskResult := <-w.resultChannel:
taskResult := <-w.resultChannel
if taskResult.Error != nil {
runtime.EventsEmit(w.AppContext, "upload-failed", id, fmt.Sprint(taskResult.Error))
println(fmt.Sprint(taskResult.Error))
} else {
runtime.EventsEmit(w.AppContext, "upload-completed", id, taskResult.Elapsed_seconds)
println(taskResult.Dataset_PID, taskResult.Elapsed_seconds)
case err := <-w.errorChannel:
println(err)
}
}(task.DatasetFolder.Id)

Expand All @@ -152,13 +150,13 @@ func (w *TaskQueue) ScheduleTask(id uuid.UUID) {

}

func (w *TaskQueue) IngestDataset(task_context context.Context, task IngestionTask) (TaskResult, error) {
func (w *TaskQueue) IngestDataset(task_context context.Context, task IngestionTask) TaskResult {
start := time.Now()
// TODO: add ingestion function
// dataset_id, err := IngestDataset(task_context, w.AppContext, task)
time.Sleep(time.Second * 5)
dataset_id := "1"
datasetPID := "1"
end := time.Now()
elapsed := end.Sub(start)
return TaskResult{Dataset_PID: dataset_id, Elapsed_seconds: int(elapsed.Seconds())}, nil
return TaskResult{Dataset_PID: datasetPID, Elapsed_seconds: int(elapsed.Seconds()), Error: nil}
}
5 changes: 5 additions & 0 deletions openem-ingestor/frontend/src/App.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
items = items;
});
EventsOn("upload-failed", (id, err) => {
items[id].status = "failed " + err;
items = items;
});
EventsOn("upload-canceled", (id) => {
console.log(id);
items[id].status = "Canceled";
Expand Down

0 comments on commit e75e4f6

Please sign in to comment.