Skip to content

Commit

Permalink
Add taskqueue for ingestion
Browse files Browse the repository at this point in the history
Add ingestion tasks, a worker queue based on go routines and task results. Interactions with the UI are implement as well via buttons and emitted events
  • Loading branch information
phwissmann committed Aug 27, 2024
1 parent 8ad405d commit 6b2d9e1
Show file tree
Hide file tree
Showing 9 changed files with 323 additions and 12 deletions.
15 changes: 15 additions & 0 deletions openem-ingestor/.vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Wails: Debug openem-ingestor",
"type": "go",
"request": "launch",
"mode": "exec",
"program": "${workspaceFolder}/build/bin/openem-ingestor",
"preLaunchTask": "build",
"cwd": "${workspaceFolder}",
"env": {}
}
]
}
22 changes: 22 additions & 0 deletions openem-ingestor/.vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "build",
"type": "shell",
"options": {
"cwd": "${workspaceFolder}"
},
"command": "go",
"args": [
"build",
"-tags",
"dev",
"-gcflags",
"all=-N -l",
"-o",
"build/bin/openem-ingestor"
]
}
]
}
28 changes: 24 additions & 4 deletions openem-ingestor/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package main

import (
"context"
"openem-ingestor/backend"
"openem-ingestor/core"

"github.com/google/uuid"
"github.com/wailsapp/wails/v2/pkg/runtime"
)

// App struct
type App struct {
ctx context.Context
ctx context.Context
taskqueue core.TaskQueue
}

// NewApp creates a new App application struct
Expand All @@ -35,12 +37,30 @@ func (b *App) beforeClose(ctx context.Context) (prevent bool) {
// so we can call the runtime methods
func (a *App) startup(ctx context.Context) {
a.ctx = ctx
a.taskqueue.AppContext = a.ctx
a.taskqueue.Startup(2)
}

func (a *App) SelectFolder() {
folder, err := backend.SelectFolder(a.ctx)
folder, err := core.SelectFolder(a.ctx)
if err != nil {
return
}
println(folder.Folder)

err = a.taskqueue.CreateTask(folder)
if err != nil {
return
}
}

func (a *App) CancelTask(id string) {
a.taskqueue.CancelTask(uuid.MustParse(id))
}
func (a *App) RemoveTask(id string) {
a.taskqueue.RemoveTask(uuid.MustParse(id))
}

func (a *App) ScheduleTask(id uuid.UUID) {

a.taskqueue.ScheduleTask(id)
}
12 changes: 6 additions & 6 deletions openem-ingestor/backend/api.go → openem-ingestor/core/api.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package backend
package core

import (
"context"

"github.com/google/uuid"
"github.com/google/uuid"
"github.com/wailsapp/wails/v2/pkg/runtime"
)

type DatasetFolder struct {
Id uuid.UUID
Folder string
Cancel context.CancelFunc
Id uuid.UUID
FolderPath string
}

// Select a folder using a native menu
func SelectFolder(context context.Context) (DatasetFolder, error) {
dialogOptions := runtime.OpenDialogOptions{
DefaultDirectory: "",
}

folder, err := runtime.OpenDirectoryDialog(context, dialogOptions)
Expand All @@ -27,6 +27,6 @@ func SelectFolder(context context.Context) (DatasetFolder, error) {

runtime.EventsEmit(context, "folder-added", id, folder)

selected_folder := DatasetFolder{Folder: folder, Id: id}
selected_folder := DatasetFolder{FolderPath: folder, Id: id}
return selected_folder, nil
}
164 changes: 164 additions & 0 deletions openem-ingestor/core/taskqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package core

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/google/uuid"
"github.com/wailsapp/wails/v2/pkg/runtime"
)

type TaskQueue struct {
datasetSourceFolders sync.Map // Datastructure to store all the upload requests
inputChannel chan IngestionTask // Requests to upload data are put into this channel
errorChannel chan string // the go routine puts the error of the upload here
resultChannel chan TaskResult // The result of the upload is put into this channel
AppContext context.Context
}

type TransferMethods int

const (
TransferS3 TransferMethods = iota + 1
TransferGlobus
)

type TransferOptions struct {
S3_endpoint string
S3_Bucket string
Md5checksum bool
}

type IngestionTask struct {
// DatasetFolderId uuid.UUID
DatasetFolder DatasetFolder
ScicatUrl string
ScicatAccessToken string
TransferMethod TransferMethods
TransferOptions TransferOptions
RequestContext context.Context
Cancel context.CancelFunc
}

type TaskResult struct {
Elapsed_seconds int
Dataset_PID string
}

func (w *TaskQueue) Startup(numWorkers int) {

w.inputChannel = make(chan IngestionTask)
w.resultChannel = make(chan TaskResult)

// start multiple go routines/workers that will listen on the input channel
for worker := 1; worker <= numWorkers; worker++ {
go w.startWorker()
}

}

func (w *TaskQueue) CreateTask(folder DatasetFolder) error {

task := IngestionTask{
DatasetFolder: folder,
ScicatUrl: "http://scopem-openem.ethz.ch:89/api/v3",
ScicatAccessToken: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJfaWQiOiI2Njk3N2UxMWFhZTUwOWI4YzRiMjQwZTciLCJ1c2VybmFtZSI6ImluZ2VzdG9yIiwiZW1haWwiOiJzY2ljYXRpbmdlc3RvckB5b3VyLnNpdGUiLCJhdXRoU3RyYXRlZ3kiOiJsb2NhbCIsIl9fdiI6MCwiaWQiOiI2Njk3N2UxMWFhZTUwOWI4YzRiMjQwZTciLCJpYXQiOjE3MjM3MDY3MjYsImV4cCI6MTcyMzc0MjcyNn0.p0nlcM_hXoSJMsom36oPXZbknwKDsydWCyQytFLkLT4",
TransferMethod: TransferS3,
TransferOptions: TransferOptions{
S3_endpoint: "scopem-openem.ethz.ch:9000",
S3_Bucket: "landingzone",
Md5checksum: true,
},
}
_, found := w.datasetSourceFolders.Load(task.DatasetFolder.Id)
if found {
return errors.New("Key exists")
}
w.datasetSourceFolders.Store(task.DatasetFolder.Id, task)
return nil
}

// Go routine that listens on the channel continously for upload requests and executes uploads.
func (w *TaskQueue) startWorker() {
for ingestionTask := range w.inputChannel {
task_context, cancel := context.WithCancel(w.AppContext)
defer cancel()

ingestionTask.Cancel = cancel

result, err := w.IngestDataset(task_context, ingestionTask)
if err != nil {
w.errorChannel <- err.Error()
}
w.resultChannel <- result
}
}

func (w *TaskQueue) CancelTask(id uuid.UUID) {
value, found := w.datasetSourceFolders.Load(id)
if found {
f := value.(IngestionTask)
if f.Cancel != nil {
f.Cancel()
}
runtime.EventsEmit(w.AppContext, "upload-canceled", id)
}
}

func (w *TaskQueue) RemoveTask(id uuid.UUID) {
value, found := w.datasetSourceFolders.Load(id)
if found {
f := value.(IngestionTask)
if f.Cancel != nil {
f.Cancel()
}
w.datasetSourceFolders.Delete(id)
runtime.EventsEmit(w.AppContext, "folder-removed", id)
}
}

func (w *TaskQueue) ScheduleTask(id uuid.UUID) {

value, found := w.datasetSourceFolders.Load(id)
if !found {
fmt.Println("Scheduling upload failed for: ", id)
return
}

task := value.(IngestionTask)

// Go routine to handle result and errors
go func(id uuid.UUID) {
select {
case taskResult := <-w.resultChannel:
runtime.EventsEmit(w.AppContext, "upload-completed", id, taskResult.Elapsed_seconds)
println(taskResult.Dataset_PID, taskResult.Elapsed_seconds)
case err := <-w.errorChannel:
println(err)
}
}(task.DatasetFolder.Id)

// Go routine to schedule the upload asynchronously
go func(folder DatasetFolder) {
fmt.Println("Scheduled upload for: ", folder)
runtime.EventsEmit(w.AppContext, "upload-scheduled", folder.Id)

// this channel is read by the go routines that does the actual upload
w.inputChannel <- task
}(task.DatasetFolder)

}

func (w *TaskQueue) IngestDataset(task_context context.Context, task IngestionTask) (TaskResult, error) {
start := time.Now()
// TODO: add ingestion function
// dataset_id, err := IngestDataset(task_context, w.AppContext, task)
time.Sleep(time.Second * 5)
dataset_id := "1"
end := time.Now()
elapsed := end.Sub(start)
return TaskResult{Dataset_PID: dataset_id, Elapsed_seconds: int(elapsed.Seconds())}, nil
}
46 changes: 45 additions & 1 deletion openem-ingestor/frontend/src/App.svelte
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
<script lang="ts">
import logo from "./assets/images/logo-wide-1024x317.png";
import { SelectFolder } from "../wailsjs/go/main/App.js";
import {
SelectFolder,
CancelTask,
RemoveTask,
ScheduleTask,
} from "../wailsjs/go/main/App.js";
import { EventsOn } from "../wailsjs/runtime/runtime";
import List from "./List.svelte";
import ListElement from "./ListElement.svelte";
Expand All @@ -9,6 +14,21 @@
SelectFolder();
}
function cancelTask(id): void {
CancelTask(id);
}
function removeTask(id: string): void {
RemoveTask(id);
}
function secondsToStr(elapsed_seconds): string {
return new Date(elapsed_seconds * 1000).toISOString().substr(11, 8);
}
function scheduleTask(id: string): void {
ScheduleTask(id);
}
let items = {};
function newItem(id: string, folder: string): string {
Expand All @@ -18,13 +38,37 @@
status: "Selected",
progress: 0,
component: ListElement,
cancelTask: cancelTask,
scheduleTask: scheduleTask,
removeTask: removeTask,
};
return id;
}
EventsOn("folder-added", (id, folder) => {
newItem(id, folder);
});
EventsOn("folder-removed", (id) => {
delete items[id];
items = items;
});
EventsOn("upload-scheduled", (id) => {
items[id].status = "Scheduled";
items = items;
});
EventsOn("upload-completed", (id, elapsed_seconds) => {
items[id].status = "Completed in " + secondsToStr(elapsed_seconds);
items = items;
});
EventsOn("upload-canceled", (id) => {
console.log(id);
items[id].status = "Canceled";
items = items;
});
</script>

<main>
Expand Down
Loading

0 comments on commit 6b2d9e1

Please sign in to comment.