diff --git a/.vscode/launch.json b/.vscode/launch.json index 5683ce47..9fe8ac5c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -2,39 +2,33 @@ "version": "0.2.0", "configurations": [ { - "name": "runtime (local plugin storage)", + "name": "Hypermode Runtime (local)", "type": "go", "request": "launch", "mode": "auto", "program": "${workspaceFolder}", - "env": { - "ENV": "dev", - "AWS_REGION": "us-west-2", - "AWS_PROFILE": "sandbox", - "AWS_SDK_LOAD_CONFIG": "true" - }, "args": [ "--dgraph=http://localhost:8080", - "--plugins=${userHome}/plugins", "--refresh=1s", ] }, { - "name": "runtime (AWS S3 plugin storage)", + "name": "Hypermode Runtime (AWS)", "type": "go", "request": "launch", "mode": "auto", "program": "${workspaceFolder}", "env": { - "ENV": "dev", "AWS_REGION": "us-west-2", "AWS_PROFILE": "sandbox", "AWS_SDK_LOAD_CONFIG": "true" }, "args": [ "--dgraph=http://localhost:8080", + "--useAwsSecrets", + "--useAwsStorage", "--s3bucket=${input:s3bucket}", - "--plugins=${input:s3folder}", + "--s3path=${input:s3folder}", "--refresh=1s", ] }, @@ -44,13 +38,13 @@ "id": "s3bucket", "type": "promptString", "description": "Enter the S3 bucket name", - "default": "sandbox-runtime-plugins" + "default": "sandbox-runtime-storage" }, { "id": "s3folder", "type": "promptString", - "description": "Enter the S3 plugins folder name", - "default": "shared-plugins" + "description": "Enter the S3 storage folder name", + "default": "shared" }, ] } diff --git a/README.md b/README.md index 48587b95..cf518a68 100644 --- a/README.md +++ b/README.md @@ -1,79 +1,29 @@ # Hypermode Runtime This repository contains the source code for the _Hypermode Runtime_. - -The runtime loads and executes _plugins_ containing _Hypermode Functions_. +The Runtime loads and executes _plugins_ containing _Hypermode Functions_. To get started with Hypermode Functions written in AssemblyScript, visit the -[`hypermode-as`](https://github.com/gohypermode/hypermode-as) repository. - -## Docker Setup - -To build a Docker image for the Hypermode Runtime: - -```sh -docker build -t hypermode/runtime . -``` - -Then you can run that image. Port `8686` should be exposed. - -```sh -docker run -p 8686:8686 -v :/plugins hypermode/runtime --dgraph= -``` - -Replace the following: -- `` should be the local path to the folder where you will load plugins from. - - You can use paths such as `./plugins` or `~/plugins` etc. depending on where you want to keep your plugin files. -- `` should be the URL to the Dgraph Alpha endpoint you are connecting the runtime to. - - To connect to Dgraph running in another docker container, use `host.docker.internal`. - -Optionally, you may also wish to give the container a specific name using the `--name` flag. -For example, to start a new Docker container named `hmruntime`, looking for plugins in a local `./plugins` folder, -and connecting to a local Dgraph docker image: - -```sh -docker run --name hmruntime -p 8686:8686 -v ./plugins:/plugins hypermode/runtime --plugins=/plugins --dgraph=http://host.docker.internal:8080 -``` - -_Note, if you have previously created a container with the same name, then delete it first with `docker rm hmruntime`._ - -## Building without Docker - -If needed, you can compile and run the Hypermode Runtime without using Docker. -This is most common for local development. - -Be sure that you have Go installed in your dev environment, at the version specified in the [.go-version](./go-verson) file, or higher. - -You can run the code directly using VSCode's debugger. - -Alternatively you can either run the Runtime code directly from source: - -```sh -go run . --plugin ../hypermode-as/examples/hmplugin1 -``` +[`functions-as`](https://github.com/gohypermode/functions-as) repository. -Or, you can build the `hmruntime` executable and then run that: +## Command Line Parameters -```sh -go build -./hmruntime --plugin ../hypermode-as/examples/hmplugin1 -``` - -## Command Line Arguments - -When starting the runtime, you may need to use the following command line arguments: +When starting the Runtime, you can use the following command line parameters: -- `--plugins` (or `--plugin`) - The folder that the runtime will look for plugins in. ***Required.*** -- `--port` - The port that the runtime will listen for HTTP requests on. Defaults to `8686`. -- `--dgraph` - The URL to the Dgraph Alpha endpoint. Defaults to `http://localhost:8080`. -- `--modelHost` - The host portion of the url to the model endpoint. This is used for cloud deployments for kserve hosted models . -- `--noreload` - Disables automatic reloading of plugins. -- `--s3bucket` - The S3 bucket to use, if using AWS for plugin storage. -- `--useAwsSecrets` - Directs the Runtime to use AWS Secret Manager for secrets such as model keys. -- `--refresh` - The refresh interval to check for plugins and schema changes. Defaults to `5s`. -- `--jsonlogs` - Switches log output to JSON format. +- `--port` - The HTTP port to listen on. Defaults to `8686`. +- `--dgraph` - The Dgraph url to connect to. Defaults to `http://localhost:8080`. +- `--modelHost` - The base DNS of the host endpoint to the model server. +- `--storagePath` - The path to a directory used for local storage. + - Linux / OSX default: `$HOME/.hypermode` + - Windows default: `%APPDATA%\Hypermode` +- `--useAwsSecrets` - Use AWS Secrets Manager for API keys and other secrets. +- `--useAwsStorage` - Use AWS S3 for storage instead of the local filesystem. +- `--s3bucket` - The S3 bucket to use, if using AWS storage. +- `--s3path` - The path within the S3 bucket to use, if using AWS storage. +- `--refresh` - The refresh interval to reload any changes. Defaults to `5s`. +- `--jsonlogs` - Use JSON format for logging. -_Note: You can use either `-` or `--` prefixes, and you can add parameters with either a space or `=`._ +_Note: You can use either `-` or `--` prefixes, and you use either a space or `=` to provide values._ ## Environment Variables @@ -99,33 +49,50 @@ HYP_MODEL_KEY_FOO=abc123 HYP_MODEL_KEY_BAR=xyz456 ``` -## Working locally with plugins +## Building the Runtime + +Ensure that you have Go installed in your dev environment. +The required minimum version is specified in the [.go-version](./go-verson) file. + +Then, you can do any of the following: + +- You can run directly from source code: + ```sh + go run . + ``` -Regardless of whether you use Docker or not, it is often useful to be developing both the runtime -and a plugin at the same time. This is especially true if you are developing a new host function -for the runtime, and need to expose it via the `hypermode-as` library. +- You can compile the source code and run the output: + ```sh + go build + ./hmruntime + ``` -To facilitate this, you can point the runtime's plugins path to the root folder of any plugin's -source code. The runtime will use the `build/debug.wasm` file, and will pick up changes -automatically when rebuilding the plugin. +- You can run and debug the source code in VS Code, using the VS Code debugger. + +## Docker Setup -For example, you may have the `runtime` and `hypermode-as` repos in the same parent directory, -and are working on a plugin in the `examples` folder, such as `hmplugin1`. You can start the -runtime like so: +To build a Docker image for the Hypermode Runtime: ```sh -go run . --plugin ../hypermode-as/examples/hmplugin1 +docker build -t hypermode/Runtime . ``` -Or, if you are working on more than one plugin simultaneously you can use their parent directory: +When running the image via Docker Desktop, keep in mind: +- You may wish to give the container a specific name using `--name`. +- Port `8686` should be exposed. +- If using local storage, you'll need to map the container's `/root/.hypermode` folder to your own `~/.hypermode` folder. +- You may need to pass command line parameters and/or set environment variables. + +For example: ```sh -go run . --plugins ../hypermode-as/examples +docker run --name hmruntime \ + -p 8686:8686 \ + -v ~/.hypermode:/root/.hypermode hypermode/Runtime \ + --dgraph=http://host.docker.internal:8080 ``` -However, be aware that if there are conflicts between function names in the plugins, -the last one loaded byt the runtime will take precedence. Thus, it's usually better to work -on one plugin at a time. +_Note, if you have previously created a container with the same name, then delete it first with `docker rm hmruntime`._ ## Dgraph Setup @@ -153,7 +120,8 @@ docker run --name \ ``` ## AWS Setup -Runtime may access AWS resources, so we need to use an AWS profile. +If configured to do so, the Hypermode Runtime may access AWS resources. +If you are debugging locally, set up an AWS profile. ```sh export AWS_PROFILE=sandbox @@ -172,16 +140,23 @@ region = us-west-2 Then run `aws sso login --profile sandbox` to login to the profile. -After SSO login you can start the runtime, either from the VS Code "Run and Debug" panel, -or from the command line as follows: +After SSO login you can start the Runtime, either from the VS Code debugger +using the `Hypermode Runtime (AWS)` launch profile, or from the command line as follows: ```sh export AWS_SDK_LOAD_CONFIG=true export AWS_PROFILE=sandbox -./hmruntime --plugins +./hmruntime \ + --useAwsSecrets \ + --useAwsStorage \ + --s3bucket=sandbox-runtime-storage \ + --s3path=shared ``` -_You can omit the exports if the environment variables are already set._ +You can omit the exports if the environment variables are already set. +You can also use any S3 bucket or path you like. If a path is not specified, the Runtime will look for files in the root of the bucket. + +_The shared sandbox is intended for temporary use. In production, each customer's backend gets a separate path within a single bucket._ #### Troubleshooting @@ -193,35 +168,6 @@ or your AWS region is wrong, or you do not have an AWS secret set for the ModelS `aws secretsmanager create-secret --name '' --secret-string '' ` -### Using S3 for plugin storage - -You can optionally use S3 for plugin storage. This configuration is usually for staging or production, -but you can use it locally as well. - -First, configure the AWS setup as described above. Then start the runtime with the following command line arguments: - -- `--s3bucket ` - An standard S3 bucket within the pre-configured AWS account. -- `--plugins ` - A folder within that bucket where `.wasm` files are contained. - -Note that the `--plugins` argument is re-purposed. It now refers to a folder inside the S3 bucket, -rather than a local directory on disk. - -For example: - -```sh -export AWS_PROFILE=sandbox; ./hmruntime --s3bucket sandbox-runtime-plugins --plugins shared-plugins -``` -You can omit the export if the environment variable is already set. - -_In staging and production, a single S3 bucket is shared, but each backend has its own plugins folder._ - -#### Using S3 from VS Code Debugging Session - -From the VS Code "Run and Debug" pane, you can choose the launch profile called `runtime (AWS S3 plugin storage)`. - -When running using this profile, you will be prompted for the S3 bucket and folder name. -You can use the default values provided, or override them to load plugins from another location. - ### Unit Testing Unit tests are created using Go's [built-in unit test support](https://go.dev/doc/tutorial/add-a-test). diff --git a/appdata/appdata.go b/appdata/appdata.go new file mode 100644 index 00000000..f0082ddb --- /dev/null +++ b/appdata/appdata.go @@ -0,0 +1,59 @@ +/* + * Copyright 2024 Hypermode, Inc. + */ + +package appdata + +var appDataFiles = map[string]AppData{ + "hypermode.json": &HypermodeData, + "models.json": &ModelData, +} + +var HypermodeData HypermodeAppData = HypermodeAppData{} +var ModelData ModelsAppData = ModelsAppData{} + +type AppData any + +type HypermodeAppData struct { + Models []Model `json:"models"` + EmbeddingSpecs []EmbeddingSpec `json:"embeddingSpecs"` + TrainingInstructions []TrainingInstruction `json:"trainingInstructions"` + AppData +} + +type ModelsAppData struct { + AppData +} + +type ModelTask string + +const ( + ClassificationTask ModelTask = "classification" + EmbeddingTask ModelTask = "embedding" + GeneratorTask ModelTask = "generator" +) + +type Model struct { + Name string `json:"name"` + Task ModelTask `json:"task"` + SourceModel string `json:"sourceModel"` + Provider string `json:"provider"` + Host string `json:"host"` + Endpoint string `json:"endpoint"` + AuthHeader string `json:"authHeader"` +} + +type EmbeddingSpec struct { + EntityType string `json:"entityType"` + Attribute string `json:"attribute"` + ModelName string `json:"modelName"` + Config struct { + Query string `json:"query"` + Template string `json:"template"` + } `json:"config"` +} + +type TrainingInstruction struct { + ModelName string `json:"modelName"` + Labels []string `json:"labels"` +} diff --git a/appdata/management.go b/appdata/management.go new file mode 100644 index 00000000..dbac7528 --- /dev/null +++ b/appdata/management.go @@ -0,0 +1,54 @@ +/* + * Copyright 2024 Hypermode, Inc. + */ + +package appdata + +import ( + "context" + "encoding/json" + + "hmruntime/logger" + "hmruntime/storage" +) + +func MonitorAppDataFiles(ctx context.Context) { + + loadFile := func(file storage.FileInfo) { + err := loadAppData(ctx, file.Name) + if err == nil { + logger.Info(ctx). + Str("filename", file.Name). + Msg("Loaded application data file.") + + } else { + logger.Err(ctx, err). + Str("filename", file.Name). + Msg("Failed to load application data file.") + } + } + + // NOTEs: Removing a file entirely is not currently supported. + + sm := storage.NewStorageMonitor(".json") + sm.Added = loadFile + sm.Modified = loadFile + sm.Start(ctx) +} + +func loadAppData(ctx context.Context, filename string) error { + bytes, err := storage.GetFileContents(ctx, filename) + if err != nil { + return err + } + + _, ok := appDataFiles[filename] + if ok { + err = json.Unmarshal(bytes, appDataFiles[filename]) + if err != nil { + return err + } + } + + return nil +} diff --git a/aws/config.go b/aws/config.go index f310f72d..db2304fb 100644 --- a/aws/config.go +++ b/aws/config.go @@ -6,8 +6,8 @@ package aws import ( "context" + "fmt" - hmConfig "hmruntime/config" "hmruntime/logger" "github.com/aws/aws-sdk-go-v2/aws" @@ -16,44 +16,24 @@ import ( ) var awsConfig aws.Config -var awsEnabled bool -var useS3PluginStorage bool -func UseAwsForPluginStorage() bool { - return useS3PluginStorage +func GetAwsConfig() aws.Config { + return awsConfig } func Initialize(ctx context.Context) error { - useS3PluginStorage = hmConfig.S3Bucket != "" - defer func() { - if !useS3PluginStorage { - logger.Info(ctx).Msg("S3 bucket name is not set. Using local storage for plugins.") - } else if !awsEnabled { - logger.Fatal(ctx).Msg("S3 bucket name is set, but AWS configuration failed to load. Exiting.") - } else { - logger.Info(ctx). - Str("bucket", hmConfig.S3Bucket). - Msg("Using S3 for plugin storage.") - } - }() - cfg, err := config.LoadDefaultConfig(ctx) if err != nil { - logger.Warn(ctx).Err(err). - Msg("Error loading AWS configuration.") - return nil + return fmt.Errorf("error loading AWS configuration: %w", err) } client := sts.NewFromConfig(cfg) identity, err := client.GetCallerIdentity(ctx, &sts.GetCallerIdentityInput{}) if err != nil { - logger.Warn(ctx).Err(err). - Msg("Error getting AWS caller identity.") - return nil + return fmt.Errorf("error getting AWS caller identity: %w", err) } awsConfig = cfg - awsEnabled = true logger.Info(ctx). Str("region", awsConfig.Region). diff --git a/aws/secrets.go b/aws/secrets.go index fefb730f..ff5cc149 100644 --- a/aws/secrets.go +++ b/aws/secrets.go @@ -22,10 +22,6 @@ func GetSecretString(ctx context.Context, secretId string) (string, error) { return secret, nil } - if !awsEnabled { - return "", fmt.Errorf("unable to retrieve secret because AWS functionality is disabled") - } - svc := secretsmanager.NewFromConfig(awsConfig) secretValue, err := svc.GetSecretValue(ctx, &secretsmanager.GetSecretValueInput{ SecretId: &secretId, diff --git a/aws/storage.go b/aws/storage.go deleted file mode 100644 index 60ec6744..00000000 --- a/aws/storage.go +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Copyright 2024 Hypermode, Inc. - */ - -package aws - -import ( - "context" - "fmt" - "io" - "strings" - - "hmruntime/config" - "hmruntime/logger" - - "github.com/aws/aws-sdk-go-v2/service/s3" -) - -func S3RetrievalHelper(ctx context.Context) (*s3.ListObjectsV2Output, error) { - if !useS3PluginStorage { - return nil, fmt.Errorf("unable too retrieve from S3, S3 plugin storage is disabled") - } - - path := getPathPrefix() - input := &s3.ListObjectsV2Input{ - Bucket: &config.S3Bucket, - Prefix: &path, - } - - svc := s3.NewFromConfig(awsConfig) - result, err := svc.ListObjectsV2(ctx, input) - if err != nil { - return nil, fmt.Errorf("error getting object from S3: %w", err) - } - return result, nil -} - -func ListJsons(ctx context.Context) (map[string]string, error) { - result, err := S3RetrievalHelper(ctx) - if err != nil { - return nil, err - } - - var jsons = make(map[string]string, *result.KeyCount) - for _, obj := range result.Contents { - if !strings.HasSuffix(*obj.Key, ".json") { - continue - } - - name := strings.TrimSuffix(strings.TrimPrefix(*obj.Key, getPathPrefix()), ".json") - jsons[name] = *obj.ETag - } - - return jsons, nil -} - -func ListPlugins(ctx context.Context) (map[string]string, error) { - - result, err := S3RetrievalHelper(ctx) - if err != nil { - return nil, fmt.Errorf("error listing objects from S3 bucket: %w", err) - } - - var plugins = make(map[string]string, *result.KeyCount) - for _, obj := range result.Contents { - if !strings.HasSuffix(*obj.Key, ".wasm") { - continue - } - - name := strings.TrimSuffix(strings.TrimPrefix(*obj.Key, getPathPrefix()), ".wasm") - plugins[name] = *obj.ETag - } - - return plugins, nil -} - -func GetJsonBytes(ctx context.Context, name string) ([]byte, error) { - if !useS3PluginStorage { - return nil, fmt.Errorf("unable to retrieve JSON because S3 plugin storage is disabled") - } - - key := getPathPrefix() + name + ".json" - input := &s3.GetObjectInput{ - Bucket: &config.S3Bucket, - Key: &key, - } - - svc := s3.NewFromConfig(awsConfig) - obj, err := svc.GetObject(ctx, input) - if err != nil { - return nil, fmt.Errorf("error getting object for JSON '%s' from S3: %w", name, err) - } - - defer obj.Body.Close() - bytes, err := io.ReadAll(obj.Body) - if err != nil { - return nil, fmt.Errorf("error reading content stream of JSON '%s' from S3: %w", name, err) - } - - logger.Info(ctx). - Str("key", key). - Msg(fmt.Sprintf("Retrieved JSON '%s' from S3.", name)) - - return bytes, nil -} - -func GetPluginBytes(ctx context.Context, name string) ([]byte, error) { - - if !useS3PluginStorage { - return nil, fmt.Errorf("unable to retrieve plugin because S3 plugin storage is disabled") - } - - key := getPathPrefix() + name + ".wasm" - input := &s3.GetObjectInput{ - Bucket: &config.S3Bucket, - Key: &key, - } - - svc := s3.NewFromConfig(awsConfig) - obj, err := svc.GetObject(ctx, input) - if err != nil { - return nil, fmt.Errorf("error getting object for plugin '%s' from S3: %w", name, err) - } - - defer obj.Body.Close() - bytes, err := io.ReadAll(obj.Body) - if err != nil { - return nil, fmt.Errorf("error reading content stream of plugin '%s' from S3: %w", name, err) - } - - logger.Info(ctx). - Str("key", key). - Msg("Retrieved plugin from S3.") - - return bytes, nil -} - -func getPathPrefix() string { - path := strings.TrimRight(config.PluginsPath, "/") + "/" - if path == "/" { - path = "" - } - return path -} diff --git a/config/config.go b/config/config.go index 3a19dda2..bf70df49 100644 --- a/config/config.go +++ b/config/config.go @@ -6,92 +6,50 @@ package config import ( "flag" - "sync" + "os" + "path/filepath" + "runtime" "time" ) var Port int var DgraphUrl string var ModelHost string -var PluginsPath string -var NoReload bool -var S3Bucket string +var StoragePath string var UseAwsSecrets bool +var UseAwsStorage bool +var S3Bucket string +var S3Path string var RefreshInterval time.Duration var UseJsonLogging bool -var Mu = &sync.Mutex{} - -var SupportedJsons = map[string]AppData{ - "hypermode.json": &HypermodeData, - "models.json": &ModelData, -} - -// HypermodeJson holds the hypermode.json data -var HypermodeData HypermodeAppData = HypermodeAppData{} - -// ModelJson holds the models.json data -var ModelData ModelsAppData = ModelsAppData{} - -// AppData interface -type AppData any - -// struct that holds the hypermode.json data -type HypermodeAppData struct { - Models []Model `json:"models"` - EmbeddingSpecs []EmbeddingSpec `json:"embeddingSpecs"` - TrainingInstructions []TrainingInstruction `json:"trainingInstructions"` - AppData -} - -type ModelsAppData struct { - AppData -} - -type ModelTask string - -const ( - ClassificationTask ModelTask = "classification" - EmbeddingTask ModelTask = "embedding" - GeneratorTask ModelTask = "generator" -) - -type Model struct { - Name string `json:"name"` - Task ModelTask `json:"task"` - SourceModel string `json:"sourceModel"` - Provider string `json:"provider"` - Host string `json:"host"` - Endpoint string `json:"endpoint"` - AuthHeader string `json:"authHeader"` -} - -type EmbeddingSpec struct { - EntityType string `json:"entityType"` - Attribute string `json:"attribute"` - ModelName string `json:"modelName"` - Config struct { - Query string `json:"query"` - Template string `json:"template"` - } `json:"config"` -} - -type TrainingInstruction struct { - ModelName string `json:"modelName"` - Labels []string `json:"labels"` -} - func ParseCommandLineFlags() { flag.IntVar(&Port, "port", 8686, "The HTTP port to listen on.") flag.StringVar(&DgraphUrl, "dgraph", "http://localhost:8080", "The Dgraph url to connect to.") flag.StringVar(&ModelHost, "modelHost", "", "The base DNS of the host endpoint to the model server.") - flag.StringVar(&PluginsPath, "plugins", "", "The path to the plugins directory.") - flag.StringVar(&PluginsPath, "plugin", "", "alias for -plugins") - flag.BoolVar(&NoReload, "noreload", false, "Disable automatic plugin reloading.") - flag.StringVar(&S3Bucket, "s3bucket", "", "The S3 bucket to use, if using AWS for plugin storage.") + flag.StringVar(&StoragePath, "storagePath", getDefaultStoragePath(), "The path to a directory used for local storage.") flag.BoolVar(&UseAwsSecrets, "useAwsSecrets", false, "Use AWS Secrets Manager for API keys and other secrets.") - flag.DurationVar(&RefreshInterval, "refresh", time.Second*5, "The refresh interval to check for plugins and schema changes.") + flag.BoolVar(&UseAwsStorage, "useAwsStorage", false, "Use AWS S3 for storage instead of the local filesystem.") + flag.StringVar(&S3Bucket, "s3bucket", "", "The S3 bucket to use, if using AWS storage.") + flag.StringVar(&S3Path, "s3path", "", "The path within the S3 bucket to use, if using AWS storage.") + flag.DurationVar(&RefreshInterval, "refresh", time.Second*5, "The refresh interval to reload any changes.") flag.BoolVar(&UseJsonLogging, "jsonlogs", false, "Use JSON format for logging.") flag.Parse() } + +func getDefaultStoragePath() string { + + // On Windows, the default is %APPDATA%\Hypermode + if runtime.GOOS == "windows" { + appData := os.Getenv("APPDATA") + return filepath.Join(appData, "Hypermode") + } + + // On Unix and macOS, the default is $HOME/.hypermode + homedir, err := os.UserHomeDir() + if err != nil { + return "" + } + return filepath.Join(homedir, ".hypermode") +} diff --git a/functions/hostfns.go b/functions/hostfns.go index 15f72ac5..80d0431c 100644 --- a/functions/hostfns.go +++ b/functions/hostfns.go @@ -9,7 +9,7 @@ import ( "encoding/json" "fmt" - "hmruntime/config" + "hmruntime/appdata" "hmruntime/dgraph" "hmruntime/logger" "hmruntime/models" @@ -111,7 +111,7 @@ type ClassifierLabel struct { func hostInvokeClassifier(ctx context.Context, mod wasm.Module, pModelName uint32, pSentenceMap uint32) uint32 { mem := mod.Memory() - model, err := getModel(mem, pModelName, config.ClassificationTask) + model, err := getModel(mem, pModelName, appdata.ClassificationTask) if err != nil { logger.Err(ctx, err).Msg("Error getting model.") return 0 @@ -146,7 +146,7 @@ func hostInvokeClassifier(ctx context.Context, mod wasm.Module, pModelName uint3 func hostComputeEmbedding(ctx context.Context, mod wasm.Module, pModelName uint32, pSentenceMap uint32) uint32 { mem := mod.Memory() - model, err := getModel(mem, pModelName, config.EmbeddingTask) + model, err := getModel(mem, pModelName, appdata.EmbeddingTask) if err != nil { logger.Err(ctx, err).Msg("Error getting model.") return 0 @@ -181,7 +181,7 @@ func hostComputeEmbedding(ctx context.Context, mod wasm.Module, pModelName uint3 func hostInvokeTextGenerator(ctx context.Context, mod wasm.Module, pModelName uint32, pInstruction uint32, pSentence uint32) uint32 { mem := mod.Memory() - model, err := getModel(mem, pModelName, config.GeneratorTask) + model, err := getModel(mem, pModelName, appdata.GeneratorTask) if err != nil { logger.Err(ctx, err).Msg("Error getting model.") return 0 @@ -219,11 +219,11 @@ func hostInvokeTextGenerator(ctx context.Context, mod wasm.Module, pModelName ui return writeString(ctx, mod, string(res)) } -func getModel(mem wasm.Memory, pModelName uint32, task config.ModelTask) (config.Model, error) { +func getModel(mem wasm.Memory, pModelName uint32, task appdata.ModelTask) (appdata.Model, error) { modelName, err := readString(mem, pModelName) if err != nil { err = fmt.Errorf("error reading model name from wasm memory: %w", err) - return config.Model{}, err + return appdata.Model{}, err } return models.GetModel(modelName, task) diff --git a/functions/registration.go b/functions/registration.go index 6092547c..264de422 100644 --- a/functions/registration.go +++ b/functions/registration.go @@ -17,15 +17,11 @@ import ( // map that holds the function info for each resolver var FunctionsMap = make(map[string]schema.FunctionInfo) -// channel used to signal when registration is completed -var RegistrationCompleted chan bool = make(chan bool, 1) - func MonitorRegistration(ctx context.Context) { go func() { for { select { case <-host.RegistrationRequest: - logger.Info(ctx).Msg("Registering functions.") err := registerFunctions(ctx, gqlSchema) if err != nil { logger.Err(ctx, err).Msg("Failed to register functions.") @@ -47,13 +43,14 @@ func registerFunctions(ctx context.Context, gqlSchema string) error { // Build a map of resolvers to function info, including the plugin name. // If there are function name conflicts between plugins, the last plugin loaded wins. - for pluginName, plugin := range host.Plugins { + var plugins = host.Plugins.GetAll() + for _, plugin := range plugins { for _, scma := range funcSchemas { module := *plugin.Module for _, fn := range module.ExportedFunctions() { fnName := fn.ExportNames()[0] if strings.EqualFold(fnName, scma.FunctionName()) { - info := schema.FunctionInfo{PluginName: pluginName, Schema: scma} + info := schema.FunctionInfo{PluginName: plugin.Name(), Schema: scma} resolver := scma.Resolver() oldInfo, existed := FunctionsMap[resolver] if existed && reflect.DeepEqual(oldInfo, info) { @@ -64,7 +61,7 @@ func registerFunctions(ctx context.Context, gqlSchema string) error { logger.Info(ctx). Str("resolver", resolver). Str("function", fnName). - Str("plugin", pluginName). + Str("plugin", plugin.Name()). Msg("Registered function.") } } @@ -80,7 +77,7 @@ func registerFunctions(ctx context.Context, gqlSchema string) error { break } } - _, foundPlugin := host.Plugins[info.PluginName] + _, foundPlugin := host.Plugins.GetByName(info.PluginName) if !foundSchema || !foundPlugin { delete(FunctionsMap, resolver) logger.Info(ctx). @@ -91,13 +88,5 @@ func registerFunctions(ctx context.Context, gqlSchema string) error { } } - // Signal that registration is complete. This is a non-blocking send to - // avoid a deadlock if no one is waiting, but the channel has a buffer size - // of 1, so it will not lose the message if the receiver is slow to start. - select { - case RegistrationCompleted <- true: - default: - } - return nil } diff --git a/go.mod b/go.mod index 6368bb73..19c811bc 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/dgraph-io/gqlparser v1.2.1 github.com/google/uuid v1.6.0 github.com/joho/godotenv v1.5.1 - github.com/radovskyb/watcher v1.0.7 github.com/rs/xid v1.5.0 github.com/rs/zerolog v1.32.0 github.com/tetratelabs/wazero v1.7.0 diff --git a/go.sum b/go.sum index f242c300..0b2b2be1 100644 --- a/go.sum +++ b/go.sum @@ -68,8 +68,6 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/radovskyb/watcher v1.0.7 h1:AYePLih6dpmS32vlHfhCeli8127LzkIgwJGcwwe8tUE= -github.com/radovskyb/watcher v1.0.7/go.mod h1:78okwvY5wPdzcb1UYnip1pvrZNIVEIh/Cm+ZuvsUYIg= github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0= diff --git a/host/host.go b/host/host.go index 41b13339..bf5269e9 100644 --- a/host/host.go +++ b/host/host.go @@ -9,8 +9,5 @@ import "github.com/tetratelabs/wazero" // runtime instance for the WASM modules var WasmRuntime wazero.Runtime -// map that holds all of the loaded plugins, indexed by their name -var Plugins = make(map[string]Plugin) - // Channel used to signal that registration is needed var RegistrationRequest chan bool = make(chan bool) diff --git a/host/loader.go b/host/loader.go deleted file mode 100644 index 6a55d589..00000000 --- a/host/loader.go +++ /dev/null @@ -1,483 +0,0 @@ -/* - * Copyright 2023 Hypermode, Inc. - */ - -package host - -import ( - "context" - "fmt" - "os" - "path" - "regexp" - "strings" - "time" - - "hmruntime/aws" - "hmruntime/config" - "hmruntime/logger" - - "github.com/radovskyb/watcher" -) - -// Map of plugin names and etags as last retrieved from S3. -var awsPlugins map[string]string - -// Map of json files and etags as last retrieved from S3. -var awsJsons map[string]string - -func LoadJsons(ctx context.Context) error { - _, err := loadJsons(ctx) - return err -} - -func LoadPlugins(ctx context.Context) error { - _, err := loadPlugins(ctx) - return err -} - -func ReloadPlugins(ctx context.Context) error { - - // Reload existing plugins - loaded, err := loadPlugins(ctx) - if err != nil { - return err - } - - // Unload any plugins that are no longer present - for name := range Plugins { - if !loaded[name] { - err := unloadPlugin(ctx, name) - if err != nil { - return fmt.Errorf("failed to unload plugin '%s': %w", name, err) - } - } - } - - return nil -} - -func loadJsons(ctx context.Context) (map[string]bool, error) { - var loaded = make(map[string]bool) - - if aws.UseAwsForPluginStorage() { - jsons, err := aws.ListJsons(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list jsons from S3: %w", err) - } - - for json := range jsons { - err := loadJson(ctx, json) - if err != nil { - logger.Err(ctx, err). - Str("json", json). - Msg("Failed to load json.") - } else { - loaded[json] = true - } - } - - // Store the list of jsons and their etags for later comparison. - awsJsons = jsons - - return loaded, nil - } - - // Otherwise, load all jsons in the plugins directory. - entries, err := os.ReadDir(config.PluginsPath) - if err != nil { - return nil, fmt.Errorf("failed to read plugins directory: %w", err) - } - - for _, entry := range entries { - - // Determine if the entry represents a json. - var jsonName string - entryName := entry.Name() - if strings.HasSuffix(entryName, ".json") { - jsonName = strings.TrimSuffix(entryName, ".json") - } else { - continue - } - - // Load the json - err := loadJson(ctx, jsonName) - if err != nil { - logger.Err(ctx, err). - Str("json", jsonName). - Msg("Failed to load json.") - } else { - loaded[jsonName] = true - } - } - - return loaded, nil -} - -func loadPlugins(ctx context.Context) (map[string]bool, error) { - var loaded = make(map[string]bool) - - if aws.UseAwsForPluginStorage() { - plugins, err := aws.ListPlugins(ctx) - if err != nil { - return nil, fmt.Errorf("failed to list plugins from S3: %w", err) - } - - for plugin := range plugins { - err := loadPlugin(ctx, plugin) - if err != nil { - logger.Err(ctx, err). - Str("plugin", plugin). - Msg("Failed to load plugin.") - } else { - loaded[plugin] = true - } - } - - // Store the list of plugins and their etags for later comparison. - awsPlugins = plugins - - return loaded, nil - } - - // If the plugins path is a single plugin's base directory, load the single plugin. - if _, err := os.Stat(config.PluginsPath + "/build/debug.wasm"); err == nil { - pluginName := path.Base(config.PluginsPath) - err := loadPlugin(ctx, pluginName) - if err != nil { - logger.Err(ctx, err). - Str("plugin", pluginName). - Msg("Failed to load plugin.") - } else { - loaded[pluginName] = true - } - } - - // Otherwise, load all plugins in the plugins directory. - entries, err := os.ReadDir(config.PluginsPath) - if err != nil { - return nil, fmt.Errorf("failed to read plugins directory: %w", err) - } - - for _, entry := range entries { - - // Determine if the entry represents a plugin. - var pluginName string - entryName := entry.Name() - if entry.IsDir() { - pluginName = entryName - path := fmt.Sprintf("%s/%s/build/debug.wasm", config.PluginsPath, pluginName) - if _, err := os.Stat(path); err != nil { - continue - } - } else if strings.HasSuffix(entryName, ".wasm") { - pluginName = strings.TrimSuffix(entryName, ".wasm") - } else { - continue - } - - // Load the plugin - err := loadPlugin(ctx, pluginName) - if err != nil { - logger.Err(ctx, err). - Str("plugin", pluginName). - Msg("Failed to load plugin.") - } else { - loaded[pluginName] = true - } - } - - return loaded, nil -} - -func WatchForJsonChanges(ctx context.Context) error { - if aws.UseAwsForPluginStorage() { - return watchStorageForJsonChanges(ctx) - } else { - return watchDirectoryForHypermodeJsonChanges(ctx) - } -} - -func WatchForPluginChanges(ctx context.Context) error { - - if config.NoReload { - logger.Warn(ctx).Msg("Automatic plugin reloading is disabled. Restart the server to load new or modified host.") - return nil - } - - if aws.UseAwsForPluginStorage() { - return watchStorageForPluginChanges(ctx) - } else { - return watchDirectoryForPluginChanges(ctx) - } -} - -func watchDirectoryForHypermodeJsonChanges(ctx context.Context) error { - w := watcher.New() - w.AddFilterHook(watcher.RegexFilterHook(regexp.MustCompile(`^.+\.json$`), false)) - - go func() { - for { - select { - case evt := <-w.Event: - - jsonName, err := getJsonNameFromPath(evt.Path) - if err != nil { - logger.Err(ctx, err).Msg("Failed to get json name.") - } - if jsonName == "" { - continue - } - - switch evt.Op { - case watcher.Create, watcher.Write: - err := loadJson(ctx, jsonName) - if err != nil { - logger.Err(ctx, err). - Str("json", jsonName). - Msg("Failed to load json.") - } - case watcher.Remove: - config.HypermodeData = config.HypermodeAppData{} - logger.Info(ctx).Msg("hypermode.json removed.") - } - case err := <-w.Error: - logger.Err(ctx, err).Msg("Failure while watching directory for hypermode.json") - case <-w.Closed: - return - case <-ctx.Done(): - w.Close() - return - } - } - }() - - return nil -} - -func watchDirectoryForPluginChanges(ctx context.Context) error { - - w := watcher.New() - w.AddFilterHook(watcher.RegexFilterHook(regexp.MustCompile(`^.+\.wasm$`), false)) - - go func() { - for { - select { - case evt := <-w.Event: - - pluginName, err := getPluginNameFromPath(evt.Path) - if err != nil { - logger.Err(ctx, err).Msg("Failed to get plugin name.") - } - if pluginName == "" { - continue - } - - switch evt.Op { - case watcher.Create, watcher.Write: - err = loadPlugin(ctx, pluginName) - if err != nil { - logger.Err(ctx, err). - Str("plugin", pluginName). - Msg("Failed to load plugin.") - } - case watcher.Remove: - err = unloadPlugin(ctx, pluginName) - if err != nil { - logger.Err(ctx, err). - Str("plugin", pluginName). - Msg("Failed to unload plugin.") - } - } - - // Signal that we need to register functions - RegistrationRequest <- true - - case err := <-w.Error: - logger.Err(ctx, err).Msg("Failure while watching plugin directory.") - case <-w.Closed: - return - case <-ctx.Done(): - w.Close() - return - } - } - }() - - // Test if symlinks are supported - _, err := os.Lstat(config.PluginsPath) - if err == nil { - // They are, so we can watch recursively (local dev workflow). - err = w.AddRecursive(config.PluginsPath) - } else { - // They are not. Just watch the single directory (production workflow). - err = w.Add(config.PluginsPath) - } - - if err != nil { - return fmt.Errorf("failed to watch plugins directory: %w", err) - } - - go func() { - err = w.Start(config.RefreshInterval) - if err != nil { - logger.Fatal(ctx).Err(err).Msg("Failed to start the file watcher. Exiting.") - } - }() - - return nil -} - -func getPathForJson(name string) (string, error) { - p := path.Join(config.PluginsPath, name+".json") - if _, err := os.Stat(p); err == nil { - return p, nil - } - - return "", fmt.Errorf("json file not found for plugin '%s'", name) -} - -func getPathForPlugin(name string) (string, error) { - - // Normally the plugin will be directly in the plugins directory, by filename. - p := path.Join(config.PluginsPath, name+".wasm") - if _, err := os.Stat(p); err == nil { - return p, nil - } - - // For local development, the plugin will be in a subdirectory and we'll use the debug.wasm file. - p = path.Join(config.PluginsPath, name, "build", "debug.wasm") - if _, err := os.Stat(p); err == nil { - return p, nil - } - - // Or, the plugins path might pointing to a single plugin's base directory. - p = path.Join(config.PluginsPath, "build", "debug.wasm") - if _, err := os.Stat(p); err == nil { - return p, nil - } - - return "", fmt.Errorf("compiled wasm file not found for plugin '%s'", name) -} - -func getJsonNameFromPath(p string) (string, error) { - if !strings.HasSuffix(p, ".json") { - return "", fmt.Errorf("path does not point to a json file: %s", p) - } - - return strings.TrimSuffix(path.Base(p), ".json"), nil -} - -func getPluginNameFromPath(p string) (string, error) { - if !strings.HasSuffix(p, ".wasm") { - return "", fmt.Errorf("path does not point to a wasm file: %s", p) - } - - parts := strings.Split(p, "/") - - // For local development - if strings.HasSuffix(p, "/build/debug.wasm") { - return parts[len(parts)-3], nil - } else if strings.HasSuffix(p, "/build/release.wasm") { - return "", nil - } - - return strings.TrimSuffix(parts[len(parts)-1], ".wasm"), nil -} - -func watchStorageForJsonChanges(ctx context.Context) error { - go func() { - ticker := time.NewTicker(config.RefreshInterval) - defer ticker.Stop() - - for { - jsons, err := aws.ListJsons(ctx) - if err != nil { - // Don't stop watching. We'll just try again on the next cycle. - logger.Err(ctx, err).Msg("Failed to list jsons from S3.") - continue - } - // Load/reload any new or modified jsons - for name, etag := range jsons { - if awsJsons[name] != etag { - err := loadJson(ctx, name) - if err != nil { - logger.Err(ctx, err). - Str("json", name). - Msg("Failed to load hypermode.json.") - } - awsJsons[name] = etag - } - } - select { - case <-ticker.C: - continue - case <-ctx.Done(): - return - } - } - }() - - return nil - -} - -func watchStorageForPluginChanges(ctx context.Context) error { - go func() { - ticker := time.NewTicker(config.RefreshInterval) - defer ticker.Stop() - - for { - plugins, err := aws.ListPlugins(ctx) - if err != nil { - // Don't stop watching. We'll just try again on the next cycle. - logger.Err(ctx, err).Msg("Failed to list plugins from S3.") - continue - } - - var changed = false - - // Load/reload any new or modified plugins - for name, etag := range plugins { - if awsPlugins[name] != etag { - err := loadPlugin(ctx, name) - if err != nil { - logger.Err(ctx, err). - Str("plugin", name). - Msg("Failed to load plugin.") - } - awsPlugins[name] = etag - changed = true - } - } - - // Unload any plugins that are no longer present - for name := range awsPlugins { - if _, found := plugins[name]; !found { - err := unloadPlugin(ctx, name) - if err != nil { - logger.Err(ctx, err). - Str("plugin", name). - Msg("Failed to unload plugin.") - } - delete(awsPlugins, name) - changed = true - } - } - - // If anything changed, signal that we need to register functions - if changed { - RegistrationRequest <- true - } - - select { - case <-ticker.C: - continue - case <-ctx.Done(): - return - } - } - }() - - return nil -} diff --git a/host/management.go b/host/management.go index 2aebe307..ffa64b05 100644 --- a/host/management.go +++ b/host/management.go @@ -6,16 +6,13 @@ package host import ( "context" - "encoding/json" "fmt" "io" - "os" - "reflect" "strings" + "time" - "hmruntime/aws" - "hmruntime/config" "hmruntime/logger" + "hmruntime/storage" "github.com/google/uuid" "github.com/rs/zerolog" @@ -29,97 +26,49 @@ type buffers struct { Stderr *strings.Builder } -func InitWasmRuntime(ctx context.Context) (wazero.Runtime, error) { - - // Create the runtime +func InitWasmRuntime(ctx context.Context) error { cfg := wazero.NewRuntimeConfig() cfg = cfg.WithCloseOnContextDone(true) - runtime := wazero.NewRuntimeWithConfig(ctx, cfg) - - // Connect WASI host functions - err := instantiateWasiFunctions(ctx, runtime) - if err != nil { - return nil, err - } - - return runtime, nil -} - -func loadJson(ctx context.Context, name string) error { - config.Mu.Lock() // Lock the mutex - defer config.Mu.Unlock() // Unlock the mutex when the function returns - logger.Info(ctx).Str("Loading %s.json.", name) - - // Get the JSON bytes - bytes, err := getJsonBytes(ctx, name) - if err != nil { - return err - } - - _, exists := config.SupportedJsons[name+".json"] - if !exists { - return fmt.Errorf("JSON %s does not exist.", name) - } - - err = json.Unmarshal(bytes, config.SupportedJsons[name+".json"]) - if err != nil { - return fmt.Errorf("failed to unmarshal %s.json: %w", name, err) - } - - return nil + WasmRuntime = wazero.NewRuntimeWithConfig(ctx, cfg) + return instantiateWasiFunctions(ctx) } -func unloadJson(ctx context.Context, name string) { - config.Mu.Lock() // Lock the mutex - defer config.Mu.Unlock() // Unlock the mutex when the function returns - - value, exists := config.SupportedJsons[name+".json"] - if !exists { - logger.Error(ctx).Msg(fmt.Sprintf("JSON %s does not exist.", name)) - return - } - - t := reflect.TypeOf(value).Elem() - v := reflect.New(t).Interface() - config.SupportedJsons[name+".json"] = v - - logger.Info(ctx).Msg(fmt.Sprintf("Unloaded %s.json.", name)) -} +func MonitorPlugins(ctx context.Context) { -func getJsonBytes(ctx context.Context, name string) ([]byte, error) { - if aws.UseAwsForPluginStorage() { - return aws.GetJsonBytes(ctx, name) + loadPluginFile := func(fi storage.FileInfo) { + err := loadPlugin(ctx, fi.Name) + if err != nil { + logger.Err(ctx, err). + Str("filename", fi.Name). + Msg("Failed to load plugin.") + } } - path, err := getPathForJson(name) - if err != nil { - return nil, fmt.Errorf("failed to get path for %s.json: %w", name, err) + sm := storage.NewStorageMonitor(".wasm") + sm.Added = loadPluginFile + sm.Modified = loadPluginFile + sm.Removed = func(fi storage.FileInfo) { + p, ok := Plugins.GetByFile(fi.Name) + if ok { + err := unloadPlugin(ctx, p) + if err != nil { + logger.Err(ctx, err). + Str("filename", fi.Name). + Msg("Failed to unload plugin.") + } + } } - bytes, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("failed to load %s.json: %w", name, err) + sm.Changed = func() { + // Signal that we need to register functions + RegistrationRequest <- true } - - logger.Info(ctx). - Str("path", path). - Msg(fmt.Sprintf("Retrieved %s.json from local storage.", name)) - - return bytes, nil + sm.Start(ctx) } -func loadPlugin(ctx context.Context, name string) error { - _, reloading := Plugins[name] - logger.Info(ctx). - Str("filename", name+".wasm"). - Bool("reloading", reloading). - Msg("Loading plugin.") - - // TODO: Separate plugin name from file name throughout the codebase. - // The plugin name should always come from the metadata, not the file name. - // This requires significant changes, so it's not done yet. +func loadPlugin(ctx context.Context, filename string) error { // Load the binary content of the plugin. - bytes, err := getPluginBytes(ctx, name) + bytes, err := storage.GetFileContents(ctx, filename) if err != nil { return err } @@ -131,83 +80,82 @@ func loadPlugin(ctx context.Context, name string) error { } // Get the metadata for the plugin. - metadata := getPluginMetadata(&cm) - if metadata == (PluginMetadata{}) { + metadata, foundMetadata := getPluginMetadata(&cm) + + // Use the filename as the plugin name if no metadata is found. + if !foundMetadata { + metadata.Name = strings.TrimSuffix(filename, ".wasm") + } + + // Create and store the plugin. + plugin := Plugin{&cm, metadata, filename} + Plugins.Add(plugin) + + // Log the details of the loaded plugin. + logPluginLoaded(ctx, plugin) + if !foundMetadata { logger.Warn(ctx). - Str("filename", name+".wasm"). + Str("filename", filename). + Str("plugin", plugin.Name()). Msg("No metadata found in plugin. Please recompile your plugin using the latest version of the Hypermode Functions library.") } - // Finally, store the plugin to complete the loading process. - Plugins[name] = Plugin{&cm, metadata} - logPluginLoaded(ctx, metadata) - return nil } -func logPluginLoaded(ctx context.Context, metadata PluginMetadata) { +func logPluginLoaded(ctx context.Context, plugin Plugin) { evt := logger.Info(ctx) + evt.Str("filename", plugin.FileName) + + metadata := plugin.Metadata - if metadata != (PluginMetadata{}) { + if metadata.Name != "" { evt.Str("plugin", metadata.Name) - evt.Str("version", metadata.Version) - evt.Str("language", metadata.Language.String()) - evt.Str("build_id", metadata.BuildId) - evt.Time("build_time", metadata.BuildTime) - evt.Str("hypermode_library", metadata.LibraryName) - evt.Str("hypermode_library_version", metadata.LibraryVersion) + } - if metadata.GitRepo != "" { - evt.Str("git_repo", metadata.GitRepo) - } + if metadata.Version != "" { + evt.Str("version", metadata.Version) + } - if metadata.GitCommit != "" { - evt.Str("git_commit", metadata.GitCommit) - } + lang := plugin.Language() + if lang != UnknownLanguage { + evt.Str("language", lang.String()) } - evt.Msg("Loaded plugin.") -} + if metadata.BuildId != "" { + evt.Str("build_id", metadata.BuildId) + } -func getPluginBytes(ctx context.Context, name string) ([]byte, error) { + if metadata.BuildTime != (time.Time{}) { + evt.Time("build_id", metadata.BuildTime) + } - if aws.UseAwsForPluginStorage() { - return aws.GetPluginBytes(ctx, name) + if metadata.LibraryName != "" { + evt.Str("hypermode_library", metadata.LibraryName) } - path, err := getPathForPlugin(name) - if err != nil { - return nil, fmt.Errorf("failed to get path for plugin: %w", err) + if metadata.LibraryVersion != "" { + evt.Str("hypermode_library_version", metadata.LibraryVersion) } - bytes, err := os.ReadFile(path) - if err != nil { - return nil, fmt.Errorf("failed to load the plugin: %w", err) + if metadata.GitRepo != "" { + evt.Str("git_repo", metadata.GitRepo) } - logger.Info(ctx). - Str("plugin", name). - Str("path", path). - Msg("Retrieved plugin from local storage.") + if metadata.GitCommit != "" { + evt.Str("git_commit", metadata.GitCommit) + } - return bytes, nil + evt.Msg("Loaded plugin.") } -func unloadPlugin(ctx context.Context, name string) error { - plugin, found := Plugins[name] - if !found { - return fmt.Errorf("plugin not found '%s'", name) - } - +func unloadPlugin(ctx context.Context, plugin Plugin) error { logger.Info(ctx). - Str("plugin", name). + Str("plugin", plugin.Name()). Msg("Unloading plugin.") - mod := *plugin.Module - delete(Plugins, name) - mod.Close(ctx) - - return nil + Plugins.Remove(plugin) + return (*plugin.Module).Close(ctx) } func GetModuleInstance(ctx context.Context, pluginName string) (wasm.Module, buffers, error) { @@ -224,7 +172,7 @@ func GetModuleInstance(ctx context.Context, pluginName string) (wasm.Module, buf wErr := io.MultiWriter(buf.Stderr, wErrorLog) // Get the plugin. - plugin, ok := Plugins[pluginName] + plugin, ok := Plugins.GetByName(pluginName) if !ok { return nil, buf, fmt.Errorf("plugin not found with name '%s'", pluginName) } @@ -246,8 +194,8 @@ func GetModuleInstance(ctx context.Context, pluginName string) (wasm.Module, buf return mod, buf, nil } -func instantiateWasiFunctions(ctx context.Context, runtime wazero.Runtime) error { - b := runtime.NewHostModuleBuilder(wasi.ModuleName) +func instantiateWasiFunctions(ctx context.Context) error { + b := WasmRuntime.NewHostModuleBuilder(wasi.ModuleName) wasi.NewFunctionExporter().ExportFunctions(b) // If we ever need to override any of the WASI functions, we can do so here. diff --git a/host/pluginregistry.go b/host/pluginregistry.go new file mode 100644 index 00000000..0a25c1c6 --- /dev/null +++ b/host/pluginregistry.go @@ -0,0 +1,98 @@ +/* + * Copyright 2024 Hypermode, Inc. + */ + +package host + +import ( + "cmp" + "fmt" + "slices" + "sync" +) + +// Global, thread-safe registry of all plugins loaded by the host +var Plugins = newPluginRegistry() + +type pluginRegistry struct { + plugins []Plugin + nameIndex map[string]*Plugin + fileIndex map[string]*Plugin + mutex sync.RWMutex +} + +func newPluginRegistry() pluginRegistry { + return pluginRegistry{ + // plugins: make([]Plugin, 0), + nameIndex: make(map[string]*Plugin), + fileIndex: make(map[string]*Plugin), + } +} + +func (pr *pluginRegistry) Add(plugin Plugin) error { + pr.mutex.Lock() + defer pr.mutex.Unlock() + + if _, ok := pr.nameIndex[plugin.Name()]; ok { + return fmt.Errorf("plugin already exists with name %s", plugin.Name()) + } + + if _, ok := pr.fileIndex[plugin.FileName]; ok { + return fmt.Errorf("plugin already exists with filename %s", plugin.FileName) + } + + pr.plugins = append(pr.plugins, plugin) + pr.nameIndex[plugin.Name()] = &plugin + pr.fileIndex[plugin.FileName] = &plugin + return nil +} + +func (pr *pluginRegistry) Remove(plugin Plugin) { + pr.mutex.Lock() + defer pr.mutex.Unlock() + + for i, p := range pr.plugins { + if p == plugin { + pr.plugins[i] = pr.plugins[len(pr.plugins)-1] + pr.plugins = pr.plugins[:len(pr.plugins)-1] + break + } + } + delete(pr.nameIndex, plugin.Name()) + delete(pr.fileIndex, plugin.FileName) +} + +func (pr *pluginRegistry) GetAll() []Plugin { + pr.mutex.RLock() + defer pr.mutex.RUnlock() + + plugins := pr.plugins + slices.SortFunc(plugins, func(a, b Plugin) int { + return cmp.Compare(a.Name(), b.Name()) + }) + return plugins +} + +func (pr *pluginRegistry) GetByName(name string) (Plugin, bool) { + pr.mutex.RLock() + defer pr.mutex.RUnlock() + + plugin, ok := pr.nameIndex[name] + if ok { + return *plugin, true + } else { + return Plugin{}, false + } +} + +func (pr *pluginRegistry) GetByFile(filename string) (Plugin, bool) { + pr.mutex.RLock() + defer pr.mutex.RUnlock() + + plugin, ok := pr.fileIndex[filename] + if ok { + return *plugin, true + } else { + return Plugin{}, false + } +} diff --git a/host/plugins.go b/host/plugins.go index 35c65665..ccfc653f 100644 --- a/host/plugins.go +++ b/host/plugins.go @@ -14,12 +14,12 @@ import ( type Plugin struct { Module *wazero.CompiledModule Metadata PluginMetadata + FileName string } type PluginMetadata struct { Name string Version string - Language PluginLanguage LibraryName string LibraryVersion string BuildId string @@ -47,8 +47,12 @@ func (lang PluginLanguage) String() string { } } -func getPluginLanguage(libraryName string) PluginLanguage { - switch libraryName { +func (p *Plugin) Name() string { + return p.Metadata.Name +} + +func (p *Plugin) Language() PluginLanguage { + switch p.Metadata.LibraryName { case "@hypermode/functions-as": return AssemblyScript case "gohypermode/functions-go": @@ -66,9 +70,7 @@ func parseNameAndVersion(s string) (name string, version string) { return s[:i], s[i+1:] } -func getPluginMetadata(cm *wazero.CompiledModule) PluginMetadata { - var metadata = PluginMetadata{} - +func getPluginMetadata(cm *wazero.CompiledModule) (metadata PluginMetadata, found bool) { for _, sec := range (*cm).CustomSections() { name := sec.Name() data := sec.Data() @@ -76,19 +78,24 @@ func getPluginMetadata(cm *wazero.CompiledModule) PluginMetadata { switch name { case "build_id": metadata.BuildId = string(data) + found = true case "build_ts": metadata.BuildTime, _ = time.Parse(time.RFC3339, string(data)) + found = true case "hypermode_library": metadata.LibraryName, metadata.LibraryVersion = parseNameAndVersion(string(data)) - metadata.Language = getPluginLanguage(metadata.LibraryName) + found = true case "hypermode_plugin": metadata.Name, metadata.Version = parseNameAndVersion(string(data)) + found = true case "git_repo": metadata.GitRepo = string(data) + found = true case "git_commit": metadata.GitCommit = string(data) + found = true } } - return metadata + return metadata, found } diff --git a/main.go b/main.go index 9eb97afd..0f33c911 100644 --- a/main.go +++ b/main.go @@ -7,13 +7,15 @@ package main import ( "context" "os" - "path" + "path/filepath" + "hmruntime/appdata" "hmruntime/aws" "hmruntime/config" "hmruntime/functions" "hmruntime/host" "hmruntime/logger" + "hmruntime/storage" "github.com/joho/godotenv" ) @@ -23,36 +25,25 @@ func main() { config.ParseCommandLineFlags() log := logger.Initialize() - // Validate configuration - if config.PluginsPath == "" && config.S3Bucket == "" { - log.Fatal().Msg("A plugins path and/or S3 bucket are required. Exiting.") - } + log.Info().Msg("Starting Hypermode Runtime.") - if config.S3Bucket == "" { - if _, err := os.Stat(config.PluginsPath); os.IsNotExist(err) { - log.Info(). - Str("path", config.PluginsPath). - Msg("Creating plugins directory.") - err := os.MkdirAll(config.PluginsPath, 0755) - if err != nil { - log.Fatal().Err(err). - Msg("Failed to create plugins directory. Exiting.") - } - } else { - log.Info(). - Str("path", config.PluginsPath). - Msg("Found plugins directory.") - } + // Load environment variables from plugins path + // note: .env file is optional, so don't log if it's not found + err := godotenv.Load(filepath.Join(config.StoragePath, ".env")) + if err != nil && !os.IsNotExist(err) { + log.Warn().Err(err).Msg("Error reading .env file. Ignoring.") } - // Initialize the AWS configuration - err := aws.Initialize(ctx) - if err != nil { - log.Info().Err(err).Msg("AWS functionality will be disabled.") + // Initialize the AWS configuration if we're using any AWS functionality + if config.UseAwsStorage || config.UseAwsSecrets { + err = aws.Initialize(ctx) + if err != nil { + log.Fatal().Err(err).Msg("Failed to initialize AWS. Exiting.") + } } // Initialize the WebAssembly runtime - host.WasmRuntime, err = host.InitWasmRuntime(ctx) + err = host.InitWasmRuntime(ctx) if err != nil { log.Fatal().Err(err).Msg("Failed to initialize the WebAssembly runtime. Exiting.") } @@ -64,42 +55,20 @@ func main() { log.Fatal().Err(err).Msg("Failed to instantiate host functions. Exiting.") } - // Load environment variables from plugins path - // note: .env file is optional, so don't log if it's not found - err = godotenv.Load(path.Join(config.PluginsPath, ".env")) - if err != nil && !os.IsNotExist(err) { - log.Warn().Err(err).Msg("Error reading .env file. Ignoring.") - } + // Initialize the storage system + storage.Initialize() - // Load json - err = host.LoadJsons(ctx) - if err != nil { - log.Fatal().Err(err).Msg("Failed to load hypermode.json. Exiting.") - } - - // Load plugins - err = host.LoadPlugins(ctx) - if err != nil { - log.Fatal().Err(err).Msg("Failed to load plugins. Exiting.") - } - - // Watch for registration requests + // Watch for function registration requests functions.MonitorRegistration(ctx) - // Watch for schema changes - functions.MonitorGqlSchema(ctx) + // Load app data and monitor for changes + appdata.MonitorAppDataFiles(ctx) - // Watch for plugin changes - err = host.WatchForPluginChanges(ctx) - if err != nil { - log.Fatal().Err(err).Msg("Failed to watch for plugin changes. Exiting.") - } + // Load plugins and monitor for changes + host.MonitorPlugins(ctx) - // Watch for hypermode.json changes - err = host.WatchForJsonChanges(ctx) - if err != nil { - log.Fatal().Err(err).Msg("Failed to watch for hypermode.json changes. Exiting.") - } + // Load the GraphQL schema and monitor for changes + functions.MonitorGqlSchema(ctx) // Start the web server err = startServer(ctx) diff --git a/models/models.go b/models/models.go index be7c4c6f..bdb303d2 100644 --- a/models/models.go +++ b/models/models.go @@ -10,6 +10,7 @@ import ( "os" "strings" + "hmruntime/appdata" "hmruntime/aws" "hmruntime/config" "hmruntime/utils" @@ -19,17 +20,17 @@ const modelKeyPrefix = "HYP_MODEL_KEY_" const HypermodeHost string = "hypermode" const OpenAIHost string = "openai" -func GetModel(modelName string, task config.ModelTask) (config.Model, error) { - for _, model := range config.HypermodeData.Models { +func GetModel(modelName string, task appdata.ModelTask) (appdata.Model, error) { + for _, model := range appdata.HypermodeData.Models { if model.Name == modelName && model.Task == task { return model, nil } } - return config.Model{}, fmt.Errorf("a model '%s' for task '%s' was not found", modelName, task) + return appdata.Model{}, fmt.Errorf("a model '%s' for task '%s' was not found", modelName, task) } -func GetModelKey(ctx context.Context, model config.Model) (string, error) { +func GetModelKey(ctx context.Context, model appdata.Model) (string, error) { var key string var err error @@ -58,7 +59,7 @@ func GetModelKey(ctx context.Context, model config.Model) (string, error) { return "", fmt.Errorf("error getting key for model '%s': %w", model.Name, err) } -func getWellKnownEnvironmentVariable(model config.Model) string { +func getWellKnownEnvironmentVariable(model appdata.Model) string { // Some model hosts have well-known environment variables that are used to store the model key. // We should support these to make it easier for users to set up their environment. @@ -75,7 +76,7 @@ type PredictionResult[T any] struct { Predictions []T `json:"predictions"` } -func PostToModelEndpoint[TResult any](ctx context.Context, sentenceMap map[string]string, model config.Model) (map[string]TResult, error) { +func PostToModelEndpoint[TResult any](ctx context.Context, sentenceMap map[string]string, model appdata.Model) (map[string]TResult, error) { // self hosted models takes in array, can optimize for parallelizing later keys, sentences := []string{}, []string{} diff --git a/models/openai/openai.go b/models/openai/openai.go index 99fde320..332c5f30 100644 --- a/models/openai/openai.go +++ b/models/openai/openai.go @@ -8,7 +8,7 @@ import ( "context" "fmt" - "hmruntime/config" + "hmruntime/appdata" "hmruntime/models" "hmruntime/utils" ) @@ -39,7 +39,7 @@ type InvokeError struct { Code string `json:"code"` } -func GenerateText(ctx context.Context, model config.Model, instruction string, sentence string) (ChatResponse, error) { +func GenerateText(ctx context.Context, model appdata.Model, instruction string, sentence string) (ChatResponse, error) { // Get the OpenAI API key to use for this model key, err := models.GetModelKey(ctx, model) diff --git a/server.go b/server.go index d8768302..f940d11f 100644 --- a/server.go +++ b/server.go @@ -239,8 +239,7 @@ func handleAdminRequest(w http.ResponseWriter, r *http.Request) { // Perform the requested action switch req.Action { - case "reload": - err = host.ReloadPlugins(r.Context()) + // TODO: Add admin actions here default: err = fmt.Errorf("unknown action: %s", req.Action) } @@ -255,11 +254,6 @@ func handleAdminRequest(w http.ResponseWriter, r *http.Request) { } func startServer(ctx context.Context) error { - - // Block until the initial registration process is complete - <-functions.RegistrationCompleted - - // Start the HTTP server logger.Info(ctx). Int("port", config.Port). Msg("Listening for incoming requests.") diff --git a/storage/awsstorage.go b/storage/awsstorage.go new file mode 100644 index 00000000..63a15689 --- /dev/null +++ b/storage/awsstorage.go @@ -0,0 +1,82 @@ +/* + * Copyright 2024 Hypermode, Inc. + */ + +package storage + +import ( + "context" + "fmt" + "hmruntime/aws" + "hmruntime/config" + "io" + "path" + "strings" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/rs/zerolog/log" +) + +type awsStorage struct { +} + +func (s *awsStorage) initialize() { + if config.S3Bucket == "" { + log.Fatal().Msg("An S3 bucket is required when using AWS storage. Exiting.") + } +} + +func (s *awsStorage) listFiles(ctx context.Context, extension string) ([]FileInfo, error) { + cfg := aws.GetAwsConfig() + svc := s3.NewFromConfig(cfg) + + input := &s3.ListObjectsV2Input{ + Bucket: &config.S3Bucket, + Prefix: &config.S3Path, + } + + result, err := svc.ListObjectsV2(ctx, input) + if err != nil { + return nil, fmt.Errorf("failed to list files in S3 bucket: %w", err) + } + + var files = make([]FileInfo, 0, *result.KeyCount) + for _, obj := range result.Contents { + if !strings.HasSuffix(*obj.Key, extension) { + continue + } + + _, filename := path.Split(*obj.Key) + files = append(files, FileInfo{ + Name: filename, + Hash: *obj.ETag, + LastModified: *obj.LastModified, + }) + } + + return files, nil +} + +func (s *awsStorage) getFileContents(ctx context.Context, name string) ([]byte, error) { + cfg := aws.GetAwsConfig() + svc := s3.NewFromConfig(cfg) + + key := path.Join(config.S3Path, name) + input := &s3.GetObjectInput{ + Bucket: &config.S3Bucket, + Key: &key, + } + + obj, err := svc.GetObject(ctx, input) + if err != nil { + return nil, fmt.Errorf("failed to get file %s from S3: %w", name, err) + } + + defer obj.Body.Close() + content, err := io.ReadAll(obj.Body) + if err != nil { + return nil, fmt.Errorf("failed to read contents of file %s from S3: %w", name, err) + } + + return content, nil +} diff --git a/storage/localstorage.go b/storage/localstorage.go new file mode 100644 index 00000000..96155c86 --- /dev/null +++ b/storage/localstorage.go @@ -0,0 +1,75 @@ +/* + * Copyright 2024 Hypermode, Inc. + */ + +package storage + +import ( + "context" + "fmt" + "hmruntime/config" + "os" + "path/filepath" + "strings" + + "github.com/rs/zerolog/log" +) + +type localStorage struct { +} + +func (s *localStorage) initialize() { + if config.StoragePath == "" { + log.Fatal().Msg("A storage path is required when using local storage. Exiting.") + } + + if _, err := os.Stat(config.StoragePath); os.IsNotExist(err) { + log.Info(). + Str("path", config.StoragePath). + Msg("Creating local storage directory.") + err := os.MkdirAll(config.StoragePath, 0755) + if err != nil { + log.Fatal().Err(err). + Msg("Failed to create local storage directory. Exiting.") + } + } else { + log.Info(). + Str("path", config.StoragePath). + Msg("Found local storage directory.") + } +} + +func (s *localStorage) listFiles(ctx context.Context, extension string) ([]FileInfo, error) { + entries, err := os.ReadDir(config.StoragePath) + if err != nil { + return nil, fmt.Errorf("failed to list files in storage directory: %w", err) + } + + var files = make([]FileInfo, 0, len(entries)) + for _, entry := range entries { + + if entry.IsDir() || !strings.HasSuffix(entry.Name(), extension) { + continue + } + + info, err := entry.Info() + if err == nil { + files = append(files, FileInfo{ + Name: entry.Name(), + LastModified: info.ModTime(), + }) + } + } + + return files, nil +} + +func (s *localStorage) getFileContents(ctx context.Context, name string) ([]byte, error) { + path := filepath.Join(config.StoragePath, name) + content, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("failed to read contents of file %s from local storage: %w", name, err) + } + + return content, nil +} diff --git a/storage/storage.go b/storage/storage.go new file mode 100644 index 00000000..c0008886 --- /dev/null +++ b/storage/storage.go @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Hypermode, Inc. + */ + +package storage + +import ( + "context" + "hmruntime/config" + "time" +) + +var impl storageImplementation + +type storageImplementation interface { + initialize() + listFiles(ctx context.Context, extension string) ([]FileInfo, error) + getFileContents(ctx context.Context, name string) ([]byte, error) +} + +type FileInfo struct { + Name string + Hash string + LastModified time.Time +} + +func Initialize() { + if config.UseAwsStorage { + impl = &awsStorage{} + } else { + impl = &localStorage{} + } + + impl.initialize() +} + +func ListFiles(ctx context.Context, extension string) ([]FileInfo, error) { + return impl.listFiles(ctx, extension) +} + +func GetFileContents(ctx context.Context, name string) ([]byte, error) { + return impl.getFileContents(ctx, name) +} diff --git a/storage/storagemonitor.go b/storage/storagemonitor.go new file mode 100644 index 00000000..31465963 --- /dev/null +++ b/storage/storagemonitor.go @@ -0,0 +1,104 @@ +/* + * Copyright 2024 Hypermode, Inc. + */ + +package storage + +import ( + "context" + "hmruntime/config" + "hmruntime/logger" + "time" +) + +type StorageMonitor struct { + extension string + files map[string]*monitoredFile + Added func(FileInfo) + Modified func(FileInfo) + Removed func(FileInfo) + Changed func() +} + +type monitoredFile struct { + file FileInfo + lastSeen time.Time +} + +func NewStorageMonitor(extension string) *StorageMonitor { + return &StorageMonitor{ + extension: extension, + files: make(map[string]*monitoredFile), + Added: func(FileInfo) {}, + Modified: func(FileInfo) {}, + Removed: func(FileInfo) {}, + Changed: func() {}, + } +} + +func (sm *StorageMonitor) Start(ctx context.Context) { + go func() { + ticker := time.NewTicker(config.RefreshInterval) + defer ticker.Stop() + + var loggedError = false + + for { + files, err := impl.listFiles(ctx, sm.extension) + if err != nil { + // Don't stop watching. We'll just try again on the next cycle. + if !loggedError { + logger.Err(ctx, err).Msgf("Failed to list %s files.", sm.extension) + loggedError = true + } + continue + } else { + loggedError = false + } + + // Compare list of files retrieved to existing files + var changed = false + var thisTime = time.Now() + for _, file := range files { + existing, found := sm.files[file.Name] + if !found { + // New file + changed = true + sm.files[file.Name] = &monitoredFile{file, thisTime} + sm.Added(file) + } else if file.Hash != existing.file.Hash || + (file.Hash == "" && file.LastModified.After(existing.file.LastModified)) { + // Modified file + changed = true + sm.files[file.Name] = &monitoredFile{file, thisTime} + sm.Modified(file) + } else { + // No change + existing.lastSeen = thisTime + } + } + + // Check for removed files + for name, file := range sm.files { + if file.lastSeen.Before(thisTime) { + changed = true + delete(sm.files, name) + sm.Removed(file.file) + } + } + + // Notify if anything changed + if changed { + sm.Changed() + } + + // Wait for next cycle + select { + case <-ticker.C: + continue + case <-ctx.Done(): + return + } + } + }() +}