Skip to content

Commit

Permalink
fix(backend): Allow empty namespace and remove default namespace. Fixes
Browse files Browse the repository at this point in the history
#8945. Fixes #8897. Fixes #8854. (#9286)

* Add pipeline_id to run and recurring run protos

* Add pipeline_id to run and recurring_run

* Set status when creating a new job. Closes #9125.

* stage progress

* Fix timestamps in tasks and add unit test

* Add upsert to sql dialects. Closes #8851.

* Remove dead code

* Fix unit test

* Switch to upsert

* Add unit tests

* Allow empty namespace

* Cleanup

* Fix tests

* Fix inconsistencies

* Fix old tests

* Differentiate list requests in multi-user mode

* Refactor create run and create job paths

* Revert default experiment change
  • Loading branch information
gkcalat authored May 4, 2023
1 parent 3c9dbee commit fbb8b39
Show file tree
Hide file tree
Showing 23 changed files with 665 additions and 670 deletions.
3 changes: 2 additions & 1 deletion backend/src/apiserver/list/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strings"

sq "github.com/Masterminds/squirrel"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/filter"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand Down Expand Up @@ -240,7 +241,7 @@ func FilterOnResourceReference(tableName string, columns []string, resourceType
selectBuilder = sq.Select("count(*)")
}
selectBuilder = selectBuilder.From(tableName)
if filterContext.ReferenceKey != nil && filterContext.ReferenceKey.ID != "" {
if filterContext.ReferenceKey != nil && (filterContext.ReferenceKey.ID != "" || common.IsMultiUserMode()) {
resourceReferenceFilter, args, err := sq.Select("ResourceUUID").
From("resource_references as rf").
Where(sq.And{
Expand Down
20 changes: 9 additions & 11 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,11 @@ import (
)

var (
rpcPortFlag = flag.String("rpcPortFlag", ":8887", "RPC Port")
httpPortFlag = flag.String("httpPortFlag", ":8888", "Http Proxy Port")
configPath = flag.String("config", "", "Path to JSON file containing config")
sampleConfigPath = flag.String("sampleconfig", "", "Path to samples")

rpcPortFlag = flag.String("rpcPortFlag", ":8887", "RPC Port")
httpPortFlag = flag.String("httpPortFlag", ":8888", "Http Proxy Port")
configPath = flag.String("config", "", "Path to JSON file containing config")
sampleConfigPath = flag.String("sampleconfig", "", "Path to samples")
collectMetricsFlag = flag.Bool("collectMetricsFlag", true, "Whether to collect Prometheus metrics in API server.")
defaultNamespace = flag.String("defaultNamespace", "", "Default namespace used in ApiServer.")
)

type RegisterHttpHandlerFromEndpoint func(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) error
Expand All @@ -64,16 +62,17 @@ func main() {
clientManager := newClientManager()
resourceManager := resource.NewResourceManager(
&clientManager,
*defaultNamespace,
)
err := loadSamples(resourceManager)
if err != nil {
glog.Fatalf("Failed to load samples. Err: %v", err)
}

_, err = resourceManager.CreateDefaultExperiment()
if err != nil {
glog.Fatalf("Failed to create default experiment. Err: %v", err)
if !common.IsMultiUserMode() {
_, err = resourceManager.CreateDefaultExperiment("")
if err != nil {
glog.Fatalf("Failed to create default experiment. Err: %v", err)
}
}

go startRpcServer(resourceManager)
Expand Down Expand Up @@ -246,7 +245,6 @@ func loadSamples(resourceManager *resource.ResourceManager) error {
&model.Pipeline{
Name: config.Name,
Description: config.Description,
Namespace: *defaultNamespace,
},
)
if configErr != nil {
Expand Down
155 changes: 57 additions & 98 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ type ResourceManager struct {
time util.TimeInterface
uuid util.UUIDGeneratorInterface
authenticators []kfpauth.Authenticator
defaultNamespace string
}

func NewResourceManager(clientManager ClientManagerInterface, defaultNamespace string) *ResourceManager {
func NewResourceManager(clientManager ClientManagerInterface) *ResourceManager {
return &ResourceManager{
experimentStore: clientManager.ExperimentStore(),
pipelineStore: clientManager.PipelineStore(),
Expand All @@ -119,7 +118,6 @@ func NewResourceManager(clientManager ClientManagerInterface, defaultNamespace s
time: clientManager.Time(),
uuid: clientManager.UUID(),
authenticators: clientManager.Authenticators(),
defaultNamespace: defaultNamespace,
}
}

Expand Down Expand Up @@ -153,17 +151,16 @@ func (r *ResourceManager) ListExperiments(filterContext *model.FilterContext, op

// Deletes the experiment with the given id.
func (r *ResourceManager) DeleteExperiment(experimentId string) error {
_, err := r.experimentStore.GetExperiment(experimentId)
if err != nil {
return util.Wrapf(err, "Failed to delete experiment %v due to error fetching it", experimentId)
}
defaultExperimentId, err := r.GetDefaultExperimentId()
if err != nil {
return util.Wrapf(err, "Failed to delete experiment %v due to error fetching the default experiment id", experimentId)
}
if defaultExperimentId != "" && experimentId == defaultExperimentId {
return util.NewBadRequestError(util.NewInvalidInputError("Experiment id cannot be equal to the default id %v", defaultExperimentId), "Failed to delete experiment %v. The default experiment cannot be deleted", experimentId)
}
if _, err := r.experimentStore.GetExperiment(experimentId); err != nil {
return util.Wrapf(err, "Failed to delete experiment %v due to error fetching it", experimentId)
}
return r.experimentStore.DeleteExperiment(experimentId)
}

Expand Down Expand Up @@ -311,11 +308,6 @@ func (r *ResourceManager) UpdatePipelineDefaultVersion(pipelineId string, versio
// Creates a pipeline, but does not create a pipeline version.
// Call CreatePipelineVersion to create a pipeline version.
func (r *ResourceManager) CreatePipeline(p *model.Pipeline) (*model.Pipeline, error) {
// Assign the default namespace if it is empty
if p.Namespace == "" {
p.Namespace = r.GetDefaultNamespace()
}

// Create a record in KFP DB (only pipelines table)
newPipeline, err := r.pipelineStore.CreatePipeline(p)
if err != nil {
Expand All @@ -336,11 +328,6 @@ func (r *ResourceManager) CreatePipeline(p *model.Pipeline) (*model.Pipeline, er
// Creates a pipeline and a pipeline version.
// This is used when two resources need to be created in a single DB transaction.
func (r *ResourceManager) CreatePipelineAndPipelineVersion(p *model.Pipeline, pv *model.PipelineVersion) (*model.Pipeline, *model.PipelineVersion, error) {
// Assign the default namespace if it is empty
if p.Namespace == "" {
p.Namespace = r.GetDefaultNamespace()
}

// Fetch pipeline spec, verify it, and parse parameters
pipelineSpecBytes, pipelineSpecURI, err := r.fetchTemplateFromPipelineVersion(pv)
if err != nil {
Expand Down Expand Up @@ -436,20 +423,9 @@ func (r *ResourceManager) GetPipelineLatestTemplate(pipelineId string) ([]byte,
}

// Creates a run and schedule a workflow CR.
// Note: when creating a run from a manifest, this triggers creation of
// a new pipeline and pipeline version that share the name, description, and namespace.
// If run.ExperimentId is not specified, it is set to the default experiment.
// If run.Namespace is no specified, it gets inferred from the parent experiment.
// Manifest's namespace gets overwritten with the run.Namespace.
// Creating a run from recurring run prioritizes recurring run's pipeline spec over the run's one.
func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model.Run, error) {
expNs, expId, err := r.validateExperimentNamespace(run.Namespace, run.ExperimentId)
if err != nil {
return nil, util.Wrapf(err, "Failed to create a run. Specify a valid experiment id and namespace combination")
}
run.ExperimentId = expId
run.Namespace = expNs

// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the run.PipelineSpec if an existing pipeline version is used.
tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&run.PipelineSpec)
Expand Down Expand Up @@ -641,7 +617,7 @@ func (r *ResourceManager) CreateTask(t *model.Task) (*model.Task, error) {
return nil, util.NewInternalServerError(util.NewInvalidInputError("Task cannot have an empty namespace in multi-user mode"), "Failed to create a task in run %v", t.RunId)
}
}
if err := r.ValidateExperimentNamespace(run.ExperimentId, t.Namespace); err != nil {
if err := r.CheckExperimentBelongsToNamespace(run.ExperimentId, t.Namespace); err != nil {
return nil, util.Wrapf(err, "Failed to create a task in run %v", t.RunId)
}

Expand Down Expand Up @@ -914,58 +890,10 @@ func (r *ResourceManager) fetchPipelineVersionFromPipelineSpec(pipelineSpec mode
return nil, nil
}

// Checks if experiment exists and whether it belongs to the specified namespace.
// Returns a valid namespace/experiment combination.
// If experiment id is missing, a default experiment id is assumed.
// If the default experiment does not exist, creates it.
// If namespace is empty, experiment's namespace is used.
func (r ResourceManager) validateExperimentNamespace(namespace string, experimentId string) (string, string, error) {
if experimentId == "" {
defExpId, err := r.GetDefaultExperimentId()
if err != nil {
return "", "", util.Wrapf(err, "Failed to create a resource with empty experiment id. Specify experiment id for the run or check if the default experiment table exists")
}
// Create the default experiment if it is missing
if defExpId == "" {
defExpId, err = r.CreateDefaultExperiment()
if err != nil {
return "", "", util.Wrapf(err, "Failed to create a resource with empty experiment id due to error creating the default experiment")
}
}
experimentId = defExpId
}
// Validate namespace
if namespace == "" {
ns, err := r.GetNamespaceFromExperimentId(experimentId)
if err != nil {
return "", "", util.Wrapf(err, "Failed to create a resource due to error fetching namespace for experiment %v", experimentId)
}
namespace = ns
}
if common.IsMultiUserMode() {
if namespace == "" {
return "", "", util.NewInternalServerError(util.NewInvalidInputError("Resource cannot have an empty namespace in multi-user mode"), "Failed to create a resource")
}
}
if err := r.ValidateExperimentNamespace(experimentId, namespace); err != nil {
return "", "", util.Wrapf(err, "Failed to create a resource due to invalid namespace %v and experiment %v combination", namespace, experimentId)
}
return namespace, experimentId, nil
}

// Creates a recurring run.
// Note: when creating a recurring run from a manifest, this triggers creation of
// a new pipeline and pipeline version that share the name, description, and namespace.
// Manifest's namespace gets overwritten with the job.Namespace if the later is non-empty.
// Otherwise, job.Namespace gets overwritten by the manifest.
func (r *ResourceManager) CreateJob(ctx context.Context, job *model.Job) (*model.Job, error) {
expNs, expId, err := r.validateExperimentNamespace(job.Namespace, job.ExperimentId)
if err != nil {
return nil, util.Wrapf(err, "Failed to create a recurring run. Specify a valid experiment id and namespace combination")
}
job.ExperimentId = expId
job.Namespace = expNs

// Create a template based on the manifest of an existing pipeline version or used-provided manifest.
// Update the job.PipelineSpec if an existing pipeline version is used.
tmpl, manifest, err := r.fetchTemplateFromPipelineSpec(&job.PipelineSpec)
Expand Down Expand Up @@ -1368,7 +1296,7 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
}

// Creates the default experiment entry.
func (r *ResourceManager) CreateDefaultExperiment() (string, error) {
func (r *ResourceManager) CreateDefaultExperiment(namespace string) (string, error) {
// First check that we don't already have a default experiment ID in the DB.
defaultExperimentId, err := r.GetDefaultExperimentId()
if err != nil {
Expand All @@ -1380,15 +1308,14 @@ func (r *ResourceManager) CreateDefaultExperiment() (string, error) {
return defaultExperimentId, nil
}

// TODO(gkcalat): consider moving the default namespace and experiment to server config.
// Check if an experiment named Default exists
defaultExperiment, err := r.experimentStore.GetExperimentByName("Default")
// Check if an experiment named Default already exists
defaultExperiment, err := r.experimentStore.GetExperimentByNameNamespace("Default", namespace)
if err != nil || defaultExperiment == nil {
// Create default experiment
// Create the default experiment
defaultExperiment = &model.Experiment{
Name: "Default",
Description: "All runs created without specifying an experiment will be grouped here.",
Namespace: r.GetDefaultNamespace(),
Namespace: namespace,
StorageState: model.StorageStateAvailable,
}
defaultExperiment, err = r.CreateExperiment(defaultExperiment)
Expand Down Expand Up @@ -1731,7 +1658,7 @@ func (r *ResourceManager) IsAuthorized(ctx context.Context, resourceAttributes *
// Fetches namespace that an experiment belongs to.
func (r *ResourceManager) GetNamespaceFromExperimentId(experimentId string) (string, error) {
if experimentId == "" {
return r.GetDefaultNamespace(), nil
return "", nil
}
experiment, err := r.GetExperiment(experimentId)
if err != nil {
Expand All @@ -1748,7 +1675,7 @@ func (r *ResourceManager) GetNamespaceFromExperimentId(experimentId string) (str
}
experiment.Namespace = namespaceRef.ReferenceUUID
} else {
experiment.Namespace = r.GetDefaultNamespace()
experiment.Namespace = ""
}
}
return experiment.Namespace, nil
Expand Down Expand Up @@ -1785,19 +1712,11 @@ func (r *ResourceManager) FetchNamespaceFromPipelineVersionId(versionId string)
return r.FetchNamespaceFromPipelineId(pipelineVersion.PipelineId)
}

// Fetches the default namespace for resources.
func (r *ResourceManager) GetDefaultNamespace() string {
return r.defaultNamespace
}

// Checks if the namespace is empty or equal to one of {`-`, `POD_NAMESPACE`, or the default value}.
func (r *ResourceManager) IsDefaultNamespace(namespace string) bool {
// Checks if the namespace is empty or equal to `-`.
func (r *ResourceManager) IsEmptyNamespace(namespace string) bool {
if namespace == "" || namespace == model.NoNamespace {
return true
}
if namespace == r.GetDefaultNamespace() {
return true
}
return false
}

Expand All @@ -1806,13 +1725,17 @@ func (r *ResourceManager) ReplaceNamespace(namespace string) string {
if common.IsMultiUserMode() {
return namespace
} else {
return r.GetDefaultNamespace()
return ""
}
}

// Validates that the provided experiment belongs to the namespace. Returns error otherwise.
func (r *ResourceManager) ValidateExperimentNamespace(experimentId string, namespace string) error {
if experimentId == "" || r.IsDefaultNamespace(namespace) {
// Returns an error in multi-user mode when experimentId and namespace are both empty.
func (r *ResourceManager) CheckExperimentBelongsToNamespace(experimentId string, namespace string) error {
if experimentId == "" || r.IsEmptyNamespace(namespace) {
if common.IsMultiUserMode() {
return util.NewInvalidInputError("Resource cannot have an empty namespace and experiment id in multi-user mode")
}
return nil
}
experimentNamespace, err := r.GetNamespaceFromExperimentId(experimentId)
Expand All @@ -1825,6 +1748,42 @@ func (r *ResourceManager) ValidateExperimentNamespace(experimentId string, names
return nil
}

// Validates the provided experimentId and namespace. Returns valid values if the provided ones are empty.
// For multi-user more at least one of the input must be non-empty, otherwise, returns an error.
// 1. Validates that given experimentId belongs to namespace if both are not empty
// 2. If experimentId is empty, replaces it with the default experimentId from the given namespace.
// Creates the default experiment in the given namespace (could be empty in single-user mode) if it is missing.
// 3. Replaces empty namespace with the parent namespace of the given experimentId.
func (r *ResourceManager) GetValidExperimentNamespacePair(experimentId string, namespace string) (string, string, error) {
if common.IsMultiUserMode() && experimentId == "" {
return "", "", util.NewInvalidInputError("Experiment id can not be empty in multi-user mode")
}
if experimentId != "" {
ns, err := r.GetNamespaceFromExperimentId(experimentId)
if err != nil {
return "", "", util.Wrapf(err, "Failed to fetch namespace for experiment %v", experimentId)
}
if namespace != "" && namespace != ns {
return "", "", util.NewInvalidInputError("Experiment %v belongs to namespace '%v' instead of '%v'", experimentId, ns, namespace)
}
namespace = ns
} else {
defExpId, err := r.GetDefaultExperimentId()
if err != nil {
return "", "", util.Wrapf(err, "Specify experiment id or check if the default experiment exists in namespace %v", namespace)
}
// Create the default experiment if it is missing
if defExpId == "" {
defExpId, err = r.CreateDefaultExperiment(namespace)
if err != nil {
return "", "", util.Wrapf(err, "Experiment id is empty. Failed to create a new default experiment in namespace %v", namespace)
}
}
experimentId = defExpId
}
return experimentId, namespace, nil
}

// Fetches a task entry.
func (r *ResourceManager) GetTask(taskId string) (*model.Task, error) {
task, err := r.taskStore.GetTask(taskId)
Expand Down
Loading

0 comments on commit fbb8b39

Please sign in to comment.