Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modules support relative path 2 #1726

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion internal/alloycli/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (fr *alloyRun) Run(configPath string) error {
if err != nil {
return nil, fmt.Errorf("reading config path %q: %w", configPath, err)
}
if err := f.LoadSource(alloySource, nil); err != nil {
if err := f.LoadSource(alloySource, nil, configPath); err != nil {
return alloySource, fmt.Errorf("error during the initial load: %w", err)
}

Expand Down
30 changes: 23 additions & 7 deletions internal/runtime/alloy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ import (
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/runtime/tracing"
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax/vm"
)

// Options holds static options for an Alloy controller.
Expand Down Expand Up @@ -296,22 +298,36 @@ func (f *Runtime) Run(ctx context.Context) {
// The controller will only start running components after Load is called once
// without any configuration errors.
// LoadSource uses default loader configuration.
func (f *Runtime) LoadSource(source *Source, args map[string]any) error {
return f.loadSource(source, args, nil)
func (f *Runtime) LoadSource(source *Source, args map[string]any, configPath string) error {
return f.applyLoaderConfig(controller.ApplyOptions{
Args: args,
ComponentBlocks: source.components,
ConfigBlocks: source.configBlocks,
DeclareBlocks: source.declareBlocks,
ArgScope: &vm.Scope{
Parent: nil,
Variables: map[string]interface{}{
"module_path": util.ExtractDirPath(configPath),
wildum marked this conversation as resolved.
Show resolved Hide resolved
},
},
})
}

// Same as above but with a customComponentRegistry that provides custom component definitions.
func (f *Runtime) loadSource(source *Source, args map[string]any, customComponentRegistry *controller.CustomComponentRegistry) error {
f.loadMut.Lock()
defer f.loadMut.Unlock()

applyOptions := controller.ApplyOptions{
return f.applyLoaderConfig(controller.ApplyOptions{
Args: args,
ComponentBlocks: source.components,
ConfigBlocks: source.configBlocks,
DeclareBlocks: source.declareBlocks,
CustomComponentRegistry: customComponentRegistry,
}
ArgScope: customComponentRegistry.Scope(),
})
}

func (f *Runtime) applyLoaderConfig(applyOptions controller.ApplyOptions) error {
f.loadMut.Lock()
defer f.loadMut.Unlock()

diags := f.loader.Apply(applyOptions)
if !f.loadedOnce.Load() && diags.HasErrors() {
Expand Down
17 changes: 6 additions & 11 deletions internal/runtime/internal/controller/component_references.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Reference struct {

// ComponentReferences returns the list of references a component is making to
// other components.
func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger) ([]Reference, diag.Diagnostics) {
func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger, scope *vm.Scope) ([]Reference, diag.Diagnostics) {
var (
traversals []Traversal

Expand All @@ -48,25 +48,20 @@ func ComponentReferences(cn dag.Node, g *dag.Graph, l log.Logger) ([]Reference,
ref, resolveDiags := resolveTraversal(t, g)
componentRefMatch := !resolveDiags.HasErrors()

// We use an empty scope to determine if a reference refers to something in
// the stdlib, since vm.Scope.Lookup will search the scope tree + the
// stdlib.
//
// Any call to an stdlib function is ignored.
var emptyScope vm.Scope
_, stdlibMatch := emptyScope.Lookup(t[0].Name)
// we look for a match in the provided scope and the stdlib
_, scopeMatch := scope.Lookup(t[0].Name)

if !componentRefMatch && !stdlibMatch {
if !componentRefMatch && !scopeMatch {
diags = append(diags, resolveDiags...)
continue
}

if componentRefMatch {
if stdlibMatch {
if scope.IsStdlibIdentifiers(t[0].Name) {
level.Warn(l).Log("msg", "a component is shadowing an existing stdlib name", "component", strings.Join(ref.Target.Block().Name, "."), "stdlib name", t[0].Name)
}
refs = append(refs, ref)
} else if stdlibMatch && emptyScope.IsDeprecated(t[0].Name) {
} else if scope.IsStdlibDeprecated(t[0].Name) {
level.Warn(l).Log("msg", "this stdlib function is deprecated; please refer to the documentation for updated usage and alternatives", "function", t[0].Name)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"

"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/vm"
)

// CustomComponentRegistry holds custom component definitions that are available in the context.
Expand All @@ -14,6 +15,7 @@ type CustomComponentRegistry struct {
parent *CustomComponentRegistry // nil if root config

mut sync.RWMutex
scope *vm.Scope
imports map[string]*CustomComponentRegistry // importNamespace: importScope
declares map[string]ast.Body // customComponentName: template
}
Expand Down Expand Up @@ -42,6 +44,12 @@ func (s *CustomComponentRegistry) getImport(name string) (*CustomComponentRegist
return im, ok
}

func (s *CustomComponentRegistry) Scope() *vm.Scope {
s.mut.RLock()
defer s.mut.RUnlock()
return s.scope
}

// registerDeclare stores a local declare block.
func (s *CustomComponentRegistry) registerDeclare(declare *ast.BlockStmt) {
s.mut.Lock()
Expand Down Expand Up @@ -71,6 +79,7 @@ func (s *CustomComponentRegistry) updateImportContent(importNode *ImportConfigNo
}
importScope := NewCustomComponentRegistry(nil)
importScope.declares = importNode.ImportedDeclares()
importScope.scope = importNode.Scope()
importScope.updateImportContentChildren(importNode)
s.imports[importNode.label] = importScope
}
Expand All @@ -81,6 +90,7 @@ func (s *CustomComponentRegistry) updateImportContentChildren(importNode *Import
for _, child := range importNode.ImportConfigNodesChildren() {
childScope := NewCustomComponentRegistry(nil)
childScope.declares = child.ImportedDeclares()
childScope.scope = child.Scope()
childScope.updateImportContentChildren(child)
s.imports[child.label] = childScope
}
Expand Down
8 changes: 7 additions & 1 deletion internal/runtime/internal/controller/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/grafana/alloy/internal/service"
"github.com/grafana/alloy/syntax/ast"
"github.com/grafana/alloy/syntax/diag"
"github.com/grafana/alloy/syntax/vm"
"github.com/grafana/dskit/backoff"
"github.com/hashicorp/go-multierror"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -124,6 +125,9 @@ type ApplyOptions struct {
// The definition of a custom component instantiated inside of the loaded config
// should be passed via this field if it's not declared or imported in the config.
CustomComponentRegistry *CustomComponentRegistry

// ArgScope contains additional variables that can be used in the current module.
ArgScope *vm.Scope
}

// Apply loads a new set of components into the Loader. Apply will drop any
Expand All @@ -145,6 +149,8 @@ func (l *Loader) Apply(options ApplyOptions) diag.Diagnostics {
l.cm.controllerEvaluation.Set(1)
defer l.cm.controllerEvaluation.Set(0)

l.cache.SetScope(options.ArgScope)

for key, value := range options.Args {
l.cache.CacheModuleArgument(key, value)
}
Expand Down Expand Up @@ -608,7 +614,7 @@ func (l *Loader) wireGraphEdges(g *dag.Graph) diag.Diagnostics {
}

// Finally, wire component references.
refs, nodeDiags := ComponentReferences(n, g, l.log)
refs, nodeDiags := ComponentReferences(n, g, l.log, l.cache.scope)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We're reading l.cache.scope without the mutex. It may be stale and come up in race detector, but I don't think it's dangerous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add mutex rlock and runlock to protect it: acf8ef6

for _, ref := range refs {
g.AddEdge(dag.Edge{From: n, To: ref.Target})
}
Expand Down
21 changes: 20 additions & 1 deletion internal/runtime/internal/controller/node_config_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,11 @@ func (cn *ImportConfigNode) processImportBlock(stmt *ast.BlockStmt, fullName str
childGlobals.OnBlockNodeUpdate = cn.onChildrenContentUpdate
// Children data paths are nested inside their parents to avoid collisions.
childGlobals.DataPath = filepath.Join(childGlobals.DataPath, cn.globalID)

if importsource.GetSourceType(cn.block.GetBlockName()) == importsource.HTTP && sourceType == importsource.File {
return fmt.Errorf("importing a module via import.http (nodeID: %s) that contains an import.file block is not supported", cn.nodeID)
}

cn.importConfigNodesChildren[stmt.Label] = NewImportConfigNode(stmt, childGlobals, sourceType)
return nil
}
Expand All @@ -297,7 +302,12 @@ func (cn *ImportConfigNode) processImportBlock(stmt *ast.BlockStmt, fullName str
func (cn *ImportConfigNode) evaluateChildren() error {
for _, child := range cn.importConfigNodesChildren {
err := child.Evaluate(&vm.Scope{
Parent: nil,
Parent: &vm.Scope{
Parent: nil,
Variables: map[string]interface{}{
"module_path": cn.source.ModulePath(),
},
},
Variables: make(map[string]interface{}),
})
if err != nil {
Expand Down Expand Up @@ -424,6 +434,15 @@ func (cn *ImportConfigNode) ImportedDeclares() map[string]ast.Body {
return cn.importedDeclares
}

// Scope returns the scope associated with the import source.
func (cn *ImportConfigNode) Scope() *vm.Scope {
return &vm.Scope{
Variables: map[string]interface{}{
"module_path": cn.source.ModulePath(),
},
}
}

// ImportConfigNodesChildren returns the ImportConfigNodesChildren of this ImportConfigNode.
func (cn *ImportConfigNode) ImportConfigNodesChildren() map[string]*ImportConfigNode {
cn.mut.Lock()
Expand Down
9 changes: 8 additions & 1 deletion internal/runtime/internal/controller/value_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type valueCache struct {
moduleArguments map[string]any // key -> module arguments value
moduleExports map[string]any // name -> value for the value of module exports
moduleChangedIndex int // Everytime a change occurs this is incremented
scope *vm.Scope // scope provides additional context for the nodes in the module
}

// newValueCache creates a new ValueCache.
Expand All @@ -34,6 +35,12 @@ func newValueCache() *valueCache {
}
}

func (vc *valueCache) SetScope(scope *vm.Scope) {
vc.mut.Lock()
defer vc.mut.Unlock()
vc.scope = scope
}

// CacheArguments will cache the provided arguments by the given id. args may
// be nil to store an empty object.
func (vc *valueCache) CacheArguments(id ComponentID, args component.Arguments) {
Expand Down Expand Up @@ -165,7 +172,7 @@ func (vc *valueCache) BuildContext() *vm.Scope {
defer vc.mut.RUnlock()

scope := &vm.Scope{
Parent: nil,
Parent: vc.scope,
Variables: make(map[string]interface{}),
}

Expand Down
5 changes: 5 additions & 0 deletions internal/runtime/internal/importsource/import_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/alloy/internal/component"
filedetector "github.com/grafana/alloy/internal/filedetector"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/grafana/alloy/internal/util"
"github.com/grafana/alloy/syntax/vm"
)

Expand Down Expand Up @@ -254,3 +255,7 @@ func collectFilesFromDir(path string) ([]string, error) {
func (im *ImportFile) SetEval(eval *vm.Evaluator) {
im.eval = eval
}

func (im *ImportFile) ModulePath() string {
return util.ExtractDirPath(im.args.Filename)
}
9 changes: 7 additions & 2 deletions internal/runtime/internal/importsource/import_git.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ImportGit struct {
repo *vcs.GitRepo
repoOpts vcs.GitRepoOptions
args GitArguments
repoPath string
onContentChange func(map[string]string)

argsChanged chan struct{}
Expand Down Expand Up @@ -181,7 +182,7 @@ func (im *ImportGit) Update(args component.Arguments) (err error) {
// TODO(rfratto): store in a repo-specific directory so changing repositories
// doesn't risk break the module loader if there's a SHA collision between
// the two different repositories.
repoPath := filepath.Join(im.opts.DataPath, "repo")
im.repoPath = filepath.Join(im.opts.DataPath, "repo")

repoOpts := vcs.GitRepoOptions{
Repository: newArgs.Repository,
Expand All @@ -192,7 +193,7 @@ func (im *ImportGit) Update(args component.Arguments) (err error) {
// Create or update the repo field.
// Failure to update repository makes the module loader temporarily use cached contents on disk
if im.repo == nil || !reflect.DeepEqual(repoOpts, im.repoOpts) {
r, err := vcs.NewGitRepo(context.Background(), repoPath, repoOpts)
r, err := vcs.NewGitRepo(context.Background(), im.repoPath, repoOpts)
if err != nil {
if errors.As(err, &vcs.UpdateFailedError{}) {
level.Error(im.log).Log("msg", "failed to update repository", "err", err)
Expand Down Expand Up @@ -287,3 +288,7 @@ func (im *ImportGit) CurrentHealth() component.Health {
func (im *ImportGit) SetEval(eval *vm.Evaluator) {
im.eval = eval
}

func (im *ImportGit) ModulePath() string {
return im.repoPath
}
Comment on lines +292 to +294
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we now document that the entire repository will be cloned and files in the git repo will be available to reference them using module_path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added documentation: 133aeb0

8 changes: 7 additions & 1 deletion internal/runtime/internal/importsource/import_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"path"
"reflect"
"time"

Expand All @@ -16,7 +17,7 @@ import (
// ImportHTTP imports a module from a HTTP server via the remote.http component.
type ImportHTTP struct {
managedRemoteHTTP *remote_http.Component
arguments component.Arguments
arguments HTTPArguments
managedOpts component.Options
eval *vm.Evaluator
}
Expand Down Expand Up @@ -106,3 +107,8 @@ func (im *ImportHTTP) CurrentHealth() component.Health {
func (im *ImportHTTP) SetEval(eval *vm.Evaluator) {
im.eval = eval
}

func (im *ImportHTTP) ModulePath() string {
dir, _ := path.Split(im.arguments.URL)
return dir
}
thampiotr marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions internal/runtime/internal/importsource/import_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type ImportSource interface {
CurrentHealth() component.Health
// Update evaluator
SetEval(eval *vm.Evaluator)
// ModulePath is the path where the module is stored locally.
ModulePath() string
}

// NewImportSource creates a new ImportSource depending on the type.
Expand Down
7 changes: 7 additions & 0 deletions internal/runtime/internal/importsource/import_string.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type ImportString struct {
arguments component.Arguments
eval *vm.Evaluator
onContentChange func(map[string]string)
modulePath string
}

var _ ImportSource = (*ImportString)(nil)
Expand All @@ -41,6 +42,8 @@ func (im *ImportString) Evaluate(scope *vm.Scope) error {
}
im.arguments = arguments

im.modulePath, _ = scope.Variables["module_path"].(string)

// notifies that the content has changed
im.onContentChange(map[string]string{"import_string": arguments.Content.Value})

Expand All @@ -63,3 +66,7 @@ func (im *ImportString) CurrentHealth() component.Health {
func (im *ImportString) SetEval(eval *vm.Evaluator) {
im.eval = eval
}

func (im *ImportString) ModulePath() string {
return im.modulePath
}
13 changes: 13 additions & 0 deletions internal/util/filepath.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package util

import "path/filepath"

// ExtractDirPath removes the file part of a path if it exists.
func ExtractDirPath(p string) string {
// If the base of the path has an extension, it's likely a file.
if filepath.Ext(filepath.Base(p)) != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's likely that people will use a file without extension, e.g. /etc/alloy_config and this will not work for them.

I think instead we should Stat the path and check if it's directory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the change: 3f08a9e. The error should not happen because if the path is wrong then it will fail earlier but we use this func everywhere for testing so I only logged it with warn

return filepath.Dir(p)
}
// Otherwise, return the path as is.
return p
}
10 changes: 8 additions & 2 deletions syntax/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,14 @@ func (s *Scope) Lookup(name string) (interface{}, bool) {
return nil, false
}

// IsDeprecated returns true if the identifier exists and is deprecated.
func (s *Scope) IsDeprecated(name string) bool {
// IsStdlibIdentifiers returns true if the identifier exists.
func (s *Scope) IsStdlibIdentifiers(name string) bool {
_, exist := stdlib.Identifiers[name]
return exist
}

// IsStdlibDeprecated returns true if the identifier exists and is deprecated.
func (s *Scope) IsStdlibDeprecated(name string) bool {
_, exist := stdlib.DeprecatedIdentifiers[name]
return exist
}