From 93655eb22bd24a03e9a90f6ee05a195c562a2b98 Mon Sep 17 00:00:00 2001 From: Clinton Knight Date: Wed, 6 Sep 2017 10:11:53 -0400 Subject: [PATCH] Convergence part 1: Docker frontend on Trident --- core/orchestrator_core.go | 73 +++++++ core/orchestrator_core_test.go | 42 +++- core/orchestrator_mock.go | 10 +- core/types.go | 2 + extras/external-etcd/README.md | 2 +- frontend/docker/config.go | 11 ++ frontend/docker/plugin.go | 289 ++++++++++++++++++++++++++++ frontend/docker/volumes.go | 107 +++++++++++ glide.lock | 28 ++- glide.yaml | 9 +- logging/config.go | 11 ++ logging/logging.go | 340 +++++++++++++++++++++++++++++++++ main.go | 58 ++++-- storage_class/storage_class.go | 46 ++++- 14 files changed, 993 insertions(+), 35 deletions(-) create mode 100644 frontend/docker/config.go create mode 100644 frontend/docker/plugin.go create mode 100644 frontend/docker/volumes.go create mode 100644 logging/config.go create mode 100644 logging/logging.go diff --git a/core/orchestrator_core.go b/core/orchestrator_core.go index 548bc222d..cb3d5f67f 100644 --- a/core/orchestrator_core.go +++ b/core/orchestrator_core.go @@ -5,12 +5,14 @@ package core import ( "fmt" "math/rand" + "os" "strings" "sync" "time" log "github.com/Sirupsen/logrus" dvp "github.com/netapp/netappdvp/storage_drivers" + dvp_utils "github.com/netapp/netappdvp/utils" "github.com/netapp/trident/config" "github.com/netapp/trident/frontend" @@ -844,6 +846,77 @@ func (o *tridentOrchestrator) ListVolumesByPlugin(pluginName string) []*storage. return volumes } +// AttachVolume mounts a volume to the local host. It ensures the mount point exists, +// and it calls the underlying storage driver to perform the attach operation as appropriate +// for the protocol and storage controller type. +func (o *tridentOrchestrator) AttachVolume(volumeName, mountpoint string, options map[string]string) error { + + volume, ok := o.volumes[volumeName] + if !ok { + return fmt.Errorf("Volume %s not found.", volumeName) + } + + log.WithFields(log.Fields{"volume": volumeName, "mountpoint": mountpoint}).Debug("Mounting volume.") + + // Ensure mount point exists and is a directory + fileInfo, err := os.Lstat(mountpoint) + if os.IsNotExist(err) { + // Make mount point if it doesn't exist + if err := os.MkdirAll(mountpoint, 0755); err != nil { + return err + } + } else if err != nil { + return err + } else if !fileInfo.IsDir() { + return fmt.Errorf("%v already exists and it's not a directory", mountpoint) + } + + // Check if volume is already mounted + dfOutput, dfOuputErr := dvp_utils.GetDFOutput() + if dfOuputErr != nil { + err = fmt.Errorf("Error checking if %v is already mounted: %v", mountpoint, dfOuputErr) + return err + } + for _, e := range dfOutput { + if e.Target == mountpoint { + log.Debugf("%v is already mounted", mountpoint) + return nil + } + } + + return volume.Backend.Driver.Attach(volume.Config.InternalName, mountpoint, options) +} + +// DetachVolume unmounts a volume from the local host. It ensures the volume is already +// mounted, and it calls the underlying storage driver to perform the detach operation as +// appropriate for the protocol and storage controller type. +func (o *tridentOrchestrator) DetachVolume(volumeName, mountpoint string) error { + + volume, ok := o.volumes[volumeName] + if !ok { + return fmt.Errorf("Volume %s not found.", volumeName) + } + + log.WithFields(log.Fields{"volume": volumeName, "mountpoint": mountpoint}).Debug("Unmounting volume.") + + // Check if the mount point exists, so we know that it's attached and must be cleaned up + _, err := os.Stat(mountpoint) + if err != nil { + // Not attached, so nothing to do + return nil + } + + // Unmount the volume + err = volume.Backend.Driver.Detach(volume.Config.InternalName, mountpoint) + if err != nil { + return err + } + + // Best effort removal of the mount point + os.Remove(mountpoint) + return nil +} + // getProtocol returns the appropriate protocol name based on volume access mode //or an empty string if all protocols are applicable. // ReadWriteOnce -> Any (File + Block) diff --git a/core/orchestrator_core_test.go b/core/orchestrator_core_test.go index 6d0e504b6..eb04d37a6 100644 --- a/core/orchestrator_core_test.go +++ b/core/orchestrator_core_test.go @@ -414,10 +414,9 @@ func TestAddStorageClassVolumes(t *testing.T) { poolNames: []string{tu.SlowNoSnapshots, tu.SlowSnapshots}, }, { - name: "slow-block", - protocol: config.Block, - poolNames: []string{tu.SlowNoSnapshots, tu.SlowSnapshots, - "medium-overlap"}, + name: "slow-block", + protocol: config.Block, + poolNames: []string{tu.SlowNoSnapshots, tu.SlowSnapshots, tu.MediumOverlap}, }, } { pools := make(map[string]*fake.FakeStoragePool, len(c.poolNames)) @@ -504,8 +503,8 @@ func TestAddStorageClassVolumes(t *testing.T) { config: &storage_class.Config{ Name: "specific", BackendStoragePools: map[string][]string{ - "fast-a": []string{tu.FastThinOnly}, - "slow-block": []string{tu.SlowNoSnapshots, tu.MediumOverlap}, + "fast-a": {tu.FastThinOnly}, + "slow-block": {tu.SlowNoSnapshots, tu.MediumOverlap}, }, }, expected: []*tu.PoolMatch{ @@ -514,12 +513,21 @@ func TestAddStorageClassVolumes(t *testing.T) { {Backend: "slow-block", Pool: tu.MediumOverlap}, }, }, + { + config: &storage_class.Config{ + Name: "specificNoMatch", + BackendStoragePools: map[string][]string{ + "unknown": {tu.FastThinOnly}, + }, + }, + expected: []*tu.PoolMatch{}, + }, { config: &storage_class.Config{ Name: "mixed", BackendStoragePools: map[string][]string{ - "slow-file": []string{tu.SlowNoSnapshots}, - "fast-b": []string{tu.FastThinOnly, tu.FastUniqueAttr}, + "slow-file": {tu.SlowNoSnapshots}, + "fast-b": {tu.FastThinOnly, tu.FastUniqueAttr}, }, Attributes: map[string]sa.Request{ sa.IOPS: sa.NewIntRequest(2000), @@ -535,6 +543,22 @@ func TestAddStorageClassVolumes(t *testing.T) { {Backend: "slow-file", Pool: tu.SlowNoSnapshots}, }, }, + { + config: &storage_class.Config{ + Name: "emptyStorageClass", + }, + expected: []*tu.PoolMatch{ + {Backend: "fast-a", Pool: tu.FastSmall}, + {Backend: "fast-a", Pool: tu.FastThinOnly}, + {Backend: "fast-b", Pool: tu.FastThinOnly}, + {Backend: "fast-b", Pool: tu.FastUniqueAttr}, + {Backend: "slow-file", Pool: tu.SlowNoSnapshots}, + {Backend: "slow-file", Pool: tu.SlowSnapshots}, + {Backend: "slow-block", Pool: tu.SlowNoSnapshots}, + {Backend: "slow-block", Pool: tu.SlowSnapshots}, + {Backend: "slow-block", Pool: tu.MediumOverlap}, + }, + }, } for _, s := range scTests { _, err := orchestrator.AddStorageClass(s.config) @@ -1523,7 +1547,7 @@ func TestBootstrapEtcdV2ToEtcdV3Migration(t *testing.T) { t.Fatalf("Couldn't unmarshall the orchestrator persistent state version: %v", err) } - if config.OrchestratorAPIVersion != version.OrchestratorVersion || + if config.OrchestratorAPIVersion != version.OrchestratorAPIVersion || string(orchestratorV3.storeClient.GetType()) != version.PersistentStoreVersion { t.Fatalf("Failed to set the orchestrator persistent state version after bootsrapping: %v", err) diff --git a/core/orchestrator_mock.go b/core/orchestrator_mock.go index 0dcc59600..67504b3b2 100644 --- a/core/orchestrator_mock.go +++ b/core/orchestrator_mock.go @@ -61,7 +61,7 @@ func (m *MockOrchestrator) AddFrontend(f frontend.FrontendPlugin) { // NOP for the time being, since users of MockOrchestrator don't need this } -func (o *MockOrchestrator) GetVersion() string { +func (m *MockOrchestrator) GetVersion() string { return config.OrchestratorVersion.String() } @@ -299,6 +299,14 @@ func (m *MockOrchestrator) ListVolumesByPlugin(pluginName string) []*storage.Vol return nil } +func (m *MockOrchestrator) AttachVolume(volumeName, mountpoint string, options map[string]string) error { + return nil +} + +func (m *MockOrchestrator) DetachVolume(volumeName, mountpoint string) error { + return nil +} + func NewMockOrchestrator() *MockOrchestrator { return &MockOrchestrator{ backends: make(map[string]*storage.StorageBackend), diff --git a/core/types.go b/core/types.go index 35b1c09e5..b9a02553b 100644 --- a/core/types.go +++ b/core/types.go @@ -26,6 +26,8 @@ type Orchestrator interface { ListVolumes() []*storage.VolumeExternal DeleteVolume(volume string) (found bool, err error) ListVolumesByPlugin(pluginName string) []*storage.VolumeExternal + AttachVolume(volumeName, mountpoint string, options map[string]string) error + DetachVolume(volumeName, mountpoint string) error AddStorageClass(scConfig *storage_class.Config) (*storage_class.StorageClassExternal, error) GetStorageClass(scName string) *storage_class.StorageClassExternal diff --git a/extras/external-etcd/README.md b/extras/external-etcd/README.md index 201a88b2e..958fc9023 100644 --- a/extras/external-etcd/README.md +++ b/extras/external-etcd/README.md @@ -104,7 +104,7 @@ For more information about the Secrets used by the operator, please see and [Generating Self-signed Certificates](https://coreos.com/os/docs/latest/generate-self-signed-certificates.html). -### Testing etcd Cluster +### Test etcd Cluster To verify the cluster we brought up in the previous step is working properly, we can run the following commands: diff --git a/frontend/docker/config.go b/frontend/docker/config.go new file mode 100644 index 000000000..99047ce5d --- /dev/null +++ b/frontend/docker/config.go @@ -0,0 +1,11 @@ +// Copyright 2017 NetApp, Inc. All Rights Reserved. + +package docker + +const ( + pluginName = "docker" + pluginVersion = "0.1" + + auto_storage_class_prefix = "auto_sc_%d" + default_volume_size = "1g" +) diff --git a/frontend/docker/plugin.go b/frontend/docker/plugin.go new file mode 100644 index 000000000..ecc600ec3 --- /dev/null +++ b/frontend/docker/plugin.go @@ -0,0 +1,289 @@ +// Copyright 2017 NetApp, Inc. All Rights Reserved. + +package docker + +import ( + "crypto/tls" + "fmt" + "os" + "path/filepath" + "sync" + + log "github.com/Sirupsen/logrus" + "github.com/docker/go-plugins-helpers/volume" + dvp "github.com/netapp/netappdvp/storage_drivers" + + "github.com/netapp/trident/core" + "github.com/netapp/trident/storage" +) + +type DockerPlugin struct { + orchestrator core.Orchestrator + driverName string + driverPort string + volumePath string + mutex *sync.Mutex +} + +func NewPlugin(driverName, driverPort string, orchestrator core.Orchestrator) (*DockerPlugin, error) { + + // Create the plugin object + plugin := &DockerPlugin{ + orchestrator: orchestrator, + driverName: driverName, + driverPort: driverPort, + volumePath: filepath.Join(volume.DefaultDockerRootDirectory, driverName), + mutex: &sync.Mutex{}, + } + + // Register the plugin with Docker + err := registerDockerVolumePlugin(plugin.volumePath) + if err != nil { + return nil, err + } + + log.WithFields(log.Fields{ + "version": dvp.DriverVersion, + "mode": dvp.ExtendedDriverVersion, + "volumePath": plugin.volumePath, + "volumeDriver": driverName, + }).Info("Initializing Trident plugin for Docker.") + + return plugin, nil +} + +func registerDockerVolumePlugin(root string) error { + + // If root (volumeDir) doesn't exist, make it. + dir, err := os.Lstat(root) + if os.IsNotExist(err) { + if err := os.MkdirAll(root, 0755); err != nil { + return err + } + } + // If root (volumeDir) isn't a directory, error + if dir != nil && !dir.IsDir() { + return fmt.Errorf("Volume directory '%v' exists and it's not a directory", root) + } + + return nil +} + +func (p *DockerPlugin) Activate() error { + handler := volume.NewHandler(p) + if p.driverPort != "" { + go handler.ServeTCP(p.driverName, ":"+p.driverPort, "", &tls.Config{InsecureSkipVerify: true}) + } else { + go handler.ServeUnix(p.driverName, 0) // 0 is the unix group to start as (root gid) + } + return nil +} + +func (p *DockerPlugin) Deactivate() error { + + return nil +} + +func (p *DockerPlugin) GetName() string { + return pluginName +} + +func (p *DockerPlugin) Version() string { + return pluginVersion +} + +func (p *DockerPlugin) Create(request *volume.CreateRequest) error { + + log.WithFields(log.Fields{ + "Method": "Create", + "Type": "DockerPlugin", + "name": request.Name, + "options": request.Options, + }).Debug("Create") + + // Find a matching storage class, or register a new one + scConfig, err := getStorageClass(request.Options, p.orchestrator) + if err != nil { + return err + } + + // Convert volume creation options into a Trident volume config + volConfig, err := getVolumeConfig(request.Name, scConfig.Name, request.Options) + if err != nil { + return err + } + + // Invoke the orchestrator to create the new volume + _, err = p.orchestrator.AddVolume(volConfig) + return err +} + +func (p *DockerPlugin) List() (*volume.ListResponse, error) { + + log.WithFields(log.Fields{ + "Method": "List", + "Type": "DockerPlugin", + }).Debug("List") + + tridentVols := p.orchestrator.ListVolumes() + var dockerVols []*volume.Volume + + for _, tridentVol := range tridentVols { + dockerVol := &volume.Volume{Name: tridentVol.Config.Name} + dockerVols = append(dockerVols, dockerVol) + } + + return &volume.ListResponse{Volumes: dockerVols}, nil +} + +func (p *DockerPlugin) Get(request *volume.GetRequest) (*volume.GetResponse, error) { + + log.WithFields(log.Fields{ + "Method": "Get", + "Type": "DockerPlugin", + "name": request.Name, + }).Debug("Get") + + tridentVol := p.orchestrator.GetVolume(request.Name) + if tridentVol == nil { + return &volume.GetResponse{}, fmt.Errorf("Volume %s not found.", request.Name) + } + + // Get the mountpoint, if this volume is mounted + mountpoint, _ := p.getPath(tridentVol) + status := map[string]interface{}{ + "Snapshots": make([]dvp.CommonSnapshot, 0), + } + + vol := &volume.Volume{ + Name: tridentVol.Config.Name, + Mountpoint: mountpoint, + Status: status, + } + + return &volume.GetResponse{Volume: vol}, nil +} + +func (p *DockerPlugin) Remove(request *volume.RemoveRequest) error { + + log.WithFields(log.Fields{ + "Method": "Remove", + "Type": "DockerPlugin", + "name": request.Name, + }).Debug("Remove") + + found, err := p.orchestrator.DeleteVolume(request.Name) + if !found { + log.WithField("volume", request.Name).Warn("Volume not found.") + } + return err +} + +func (p *DockerPlugin) Path(request *volume.PathRequest) (*volume.PathResponse, error) { + + log.WithFields(log.Fields{ + "Method": "Path", + "Type": "DockerPlugin", + "name": request.Name, + }).Debug("Path") + + tridentVol := p.orchestrator.GetVolume(request.Name) + if tridentVol == nil { + return &volume.PathResponse{}, fmt.Errorf("Volume %s not found.", request.Name) + } + + mountpoint, err := p.getPath(tridentVol) + if err != nil { + return &volume.PathResponse{}, err + } + + return &volume.PathResponse{Mountpoint: mountpoint}, nil +} + +func (p *DockerPlugin) Mount(request *volume.MountRequest) (*volume.MountResponse, error) { + + log.WithFields(log.Fields{ + "Method": "Mount", + "Type": "DockerPlugin", + "name": request.Name, + "id": request.ID, + }).Debug("Mount") + + tridentVol := p.orchestrator.GetVolume(request.Name) + if tridentVol == nil { + return &volume.MountResponse{}, fmt.Errorf("Volume %s not found.", request.Name) + } + + mountpoint := p.mountpoint(tridentVol.Config.InternalName) + options := make(map[string]string) + + err := p.orchestrator.AttachVolume(request.Name, mountpoint, options) + if err != nil { + log.Error(err) + err = fmt.Errorf("Error attaching volume %v, mountpoint %v, error: %v", request.Name, mountpoint, err) + return &volume.MountResponse{}, err + } + + return &volume.MountResponse{Mountpoint: mountpoint}, nil +} + +func (p *DockerPlugin) Unmount(request *volume.UnmountRequest) error { + + log.WithFields(log.Fields{ + "Method": "Unmount", + "Type": "DockerPlugin", + "name": request.Name, + "id": request.ID, + }).Debug("Unmount") + + tridentVol := p.orchestrator.GetVolume(request.Name) + if tridentVol == nil { + return fmt.Errorf("Volume %s not found.", request.Name) + } + + mountpoint := p.mountpoint(tridentVol.Config.InternalName) + + err := p.orchestrator.DetachVolume(request.Name, mountpoint) + if err != nil { + log.Error(err) + return fmt.Errorf("Error detaching volume %v, mountpoint %v, error: %v", request.Name, mountpoint, err) + } + + return nil +} + +func (p *DockerPlugin) Capabilities() *volume.CapabilitiesResponse { + + log.WithFields(log.Fields{ + "Method": "Capabilities", + "Type": "DockerPlugin", + }).Debug("Capabilities") + + return &volume.CapabilitiesResponse{Capabilities: volume.Capability{Scope: "global"}} +} + +// getPath returns the mount point if the path exists. +func (p *DockerPlugin) getPath(vol *storage.VolumeExternal) (string, error) { + + mountpoint := p.mountpoint(vol.Config.InternalName) + + log.WithFields(log.Fields{ + "name": vol.Config.Name, + "internalName": vol.Config.InternalName, + "mountpoint": mountpoint, + }).Debug("Getting path for volume.") + + fi, err := os.Lstat(mountpoint) + if os.IsNotExist(err) { + return "", err + } + if fi == nil { + return "", fmt.Errorf("Could not stat %v", mountpoint) + } + + return mountpoint, nil +} + +func (p *DockerPlugin) mountpoint(name string) string { + return filepath.Join(p.volumePath, name) +} diff --git a/frontend/docker/volumes.go b/frontend/docker/volumes.go new file mode 100644 index 000000000..634a3fe36 --- /dev/null +++ b/frontend/docker/volumes.go @@ -0,0 +1,107 @@ +package docker + +import ( + "fmt" + + log "github.com/Sirupsen/logrus" + hash "github.com/mitchellh/hashstructure" + dvp_utils "github.com/netapp/netappdvp/utils" + + "github.com/netapp/trident/config" + "github.com/netapp/trident/core" + "github.com/netapp/trident/storage" + "github.com/netapp/trident/storage_attribute" + "github.com/netapp/trident/storage_class" +) + +// getStorageClass accepts a list of volume creation options and returns a +// matching storage class. If the orchestrator already has a matching +// storage class, that is returned; otherwise a new one is created and +// registered with the orchestrator. +func getStorageClass(options map[string]string, o core.Orchestrator) (*storage_class.Config, error) { + + // Create a storage class based on available options + newScConfig, err := makeStorageClass(options) + if err != nil { + return nil, err + } + + // Check existing storage classes for a match based on the name + sc := o.GetStorageClass(newScConfig.Name) + if sc != nil { + log.WithField("storageClass", sc.Config.Name).Debug("Matched existing storage class.") + return sc.Config, nil + } + + // No match found, so register the new storage class + addedSc, err := o.AddStorageClass(newScConfig) + if err != nil { + log.WithFields(log.Fields{ + "storageClass": addedSc.Config.Name, + }).Error("Docker frontend couldn't add the storage class: ", err) + return nil, err + } + + return addedSc.Config, nil +} + +// makeStorageClass accepts a list of volume creation options and creates a +// matching storage class. The name of the new storage class contains a hash +// of the attributes it contains, thereby enabling comparison of storage +// classes generated by this method by simply comparing their names. +func makeStorageClass(options map[string]string) (*storage_class.Config, error) { + + scConfig := new(storage_class.Config) + + // Populate storage class config attributes + scConfig.Attributes = make(map[string]storage_attribute.Request) + for k, v := range options { + // format: attribute: "type:value" + req, err := storage_attribute.CreateAttributeRequestFromTypedValue(k, v) + if err != nil { + log.WithFields(log.Fields{ + "storageClass": scConfig.Name, + "storageClass_parameters": options, + }).Debug("Docker frontend ignoring storage class attribute: ", err) + continue + } + scConfig.Attributes[k] = req + } + + // Set name based on hash value + scHash, err := hash.Hash(scConfig.Attributes, nil) + if err != nil { + log.WithFields(log.Fields{ + "storageClass": scConfig.Name, + "storageClass_parameters": options, + }).Error("Docker frontend couldn't hash the storage class attributes: ", err) + return nil, err + } + scConfig.Name = fmt.Sprintf(auto_storage_class_prefix, scHash) + + return scConfig, nil +} + +// getVolumeConfig accepts a set of parameters describing a volume creation request +// and returns a volume config structure suitable for passing to the orchestrator core. +func getVolumeConfig(name, storageClass string, opts map[string]string) (*storage.VolumeConfig, error) { + + sizeBytes, err := dvp_utils.GetVolumeSizeBytes(opts, default_volume_size) + if err != nil { + return nil, fmt.Errorf("Error creating volume: %v", err) + } + + return &storage.VolumeConfig{ + Name: name, + Size: fmt.Sprintf("%d", sizeBytes), + Protocol: config.ProtocolAny, + SnapshotPolicy: dvp_utils.GetV(opts, "snapshotPolicy", ""), + ExportPolicy: dvp_utils.GetV(opts, "exportPolicy", ""), + SnapshotDir: dvp_utils.GetV(opts, "snapshotDir", ""), + UnixPermissions: dvp_utils.GetV(opts, "unixPermissions", ""), + StorageClass: storageClass, + BlockSize: dvp_utils.GetV(opts, "blocksize", ""), + FileSystem: "", + AccessMode: config.ModeAny, + }, nil +} diff --git a/glide.lock b/glide.lock index 2d083f92a..4a386d9d7 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 5fe603f312560cb422ebbc058ff8d38eff441549b1894ada2221739e9d4a41d0 -updated: 2017-10-31T11:44:35.484406719-04:00 +hash: b3ccf56738b1f933c6e6306a37625b77d1093c9e150be11091da806523db1e97 +updated: 2017-11-03T09:51:07.213565884-04:00 imports: - name: github.com/beorn7/perks version: 3ac7bf7a47d159a033b107610db8a1b6575507a4 @@ -20,6 +20,13 @@ imports: - pkg/pathutil - pkg/tlsutil - pkg/types +- name: github.com/coreos/go-systemd + version: 48702e0da86bd25e76cfef347e2adeb434a0d0a6 + subpackages: + - activation + - daemon + - journal + - util - name: github.com/davecgh/go-spew version: 5215b55f46b2b919f50a1df0eaa5886afe4e3b3d subpackages: @@ -29,6 +36,15 @@ imports: subpackages: - digest - reference +- name: github.com/docker/go-connections + version: 3ede32e2033de7505e6500d6c868c2b9ed9f169d + subpackages: + - sockets +- name: github.com/docker/go-plugins-helpers + version: bd8c600f0cdd76c7a57ff6aa86bd2b423868c688 + subpackages: + - sdk + - volume - name: github.com/dustin/go-humanize version: 8929fe90cee4b2cb9deb468b51fb34eba64d1bf0 - name: github.com/emicklei/go-restful @@ -105,6 +121,10 @@ imports: version: fc2b8d3a73c4867e51861bbdd5ae3c1f0869dd6a subpackages: - pbutil +- name: github.com/Microsoft/go-winio + version: 78439966b38d69bf38227fbf57ac8a6fee70f69a +- name: github.com/mitchellh/hashstructure + version: 2bca23e0e452137f789efbc8610126fd8b94f73b - name: github.com/netapp/netappdvp version: 5721dac47725b60e7e2a2a706ae5615493d1ab95 subpackages: @@ -141,7 +161,7 @@ imports: - name: github.com/PuerkitoBio/urlesc version: 5bd2802263f21d8788851d5305584c82a5c75d7e - name: github.com/Sirupsen/logrus - version: 10f801ebc38b33738c9d17d50860f484a0988ff5 + version: 89742aefa4b206dcf400792f3bd35b542998eb3b - name: github.com/spf13/cobra version: f62e98d28ab7ad31d707ba837a966378465c7b57 - name: github.com/spf13/pflag @@ -163,11 +183,13 @@ imports: - idna - internal/timeseries - lex/httplex + - proxy - trace - name: golang.org/x/sys version: 8f0908ab3b2457e2e15403d3697c9ef5cb4b57a9 subpackages: - unix + - windows - name: golang.org/x/text version: 2910a502d2bf9e43193af9d68ca516529614eed3 subpackages: diff --git a/glide.yaml b/glide.yaml index fcba348f9..a23d13e28 100644 --- a/glide.yaml +++ b/glide.yaml @@ -1,7 +1,7 @@ package: github.com/netapp/trident import: - package: github.com/Sirupsen/logrus - version: 10f801ebc38b33738c9d17d50860f484a0988ff5 + version: 89742aefa4b206dcf400792f3bd35b542998eb3b - package: github.com/coreos/etcd version: 0520cb9304cb2385f7e72b8bc02d6e4d3257158a subpackages: @@ -72,4 +72,9 @@ import: - pkg/util/yaml - pkg/version - pkg/watch - +- package: github.com/docker/go-plugins-helpers + version: bd8c600f0cdd76c7a57ff6aa86bd2b423868c688 + subpackages: + - volume +- package: github.com/mitchellh/hashstructure + version: 2bca23e0e452137f789efbc8610126fd8b94f73b diff --git a/logging/config.go b/logging/config.go new file mode 100644 index 000000000..b1eb4346c --- /dev/null +++ b/logging/config.go @@ -0,0 +1,11 @@ +// Copyright 2017 NetApp, Inc. All Rights Reserved. + +package logging + +import "github.com/netapp/trident/config" + +const ( + LogRoot = "/var/log/" + config.OrchestratorName + LogRotationThreshold = 10485760 // 10 MB + MaxLogEntryLength = 64000 +) diff --git a/logging/logging.go b/logging/logging.go new file mode 100644 index 000000000..123f643e6 --- /dev/null +++ b/logging/logging.go @@ -0,0 +1,340 @@ +// Copyright 2016 NetApp, Inc. All Rights Reserved. + +package logging + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "runtime" + "sort" + "strings" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/netapp/netappdvp/storage_drivers" + "github.com/netapp/netappdvp/utils" + "golang.org/x/crypto/ssh/terminal" +) + +// InitLogging configures logging for nDVP. Logs are written both to a log file as well as stdout/stderr. +// Since logrus doesn't support multiple writers, each log stream is implemented as a hook. +func InitLogging(logName string) error { + + // No output except for the hooks + log.SetOutput(ioutil.Discard) + + // Write to the log file + logFileHook, err := NewFileHook(logName) + if err != nil { + return fmt.Errorf("Could not initialize logging to file %s. %v", logFileHook.GetLocation(), err) + } + log.AddHook(logFileHook) + + // Write to stdout/stderr + log.AddHook(NewConsoleHook()) + + // Remind users where the log file lives + log.WithFields(log.Fields{ + "logLevel": log.GetLevel().String(), + "logFileLocation": logFileHook.GetLocation(), + "driverVersion": storage_drivers.FullDriverVersion, + "driverBuild": storage_drivers.BuildVersion, + "buildTime": storage_drivers.BuildTime, + }).Info("Initialized logging.") + + return nil +} + +// InitLogLevel configures the logging level. The debug flag takes precedence if set, +// otherwise the logLevel flag (debug, info, warn, error, fatal) is used. +func InitLogLevel(debug bool, logLevel string) error { + if debug { + log.SetLevel(log.DebugLevel) + } else { + level, err := log.ParseLevel(logLevel) + if err != nil { + return err + } + log.SetLevel(level) + } + return nil +} + +// ConsoleHook sends log entries to stdout. +type ConsoleHook struct { + formatter log.Formatter +} + +// NewConsoleHook creates a new log hook for writing to stdout/stderr. +func NewConsoleHook() *ConsoleHook { + + formatter := &log.TextFormatter{FullTimestamp: true} + return &ConsoleHook{formatter} +} + +func (hook *ConsoleHook) Levels() []log.Level { + return log.AllLevels +} + +func (hook *ConsoleHook) checkIfTerminal(w io.Writer) bool { + switch v := w.(type) { + case *os.File: + return terminal.IsTerminal(int(v.Fd())) + default: + return false + } +} + +func (hook *ConsoleHook) Fire(entry *log.Entry) error { + + // Determine output stream + var logWriter io.Writer + switch entry.Level { + case log.DebugLevel, log.InfoLevel, log.WarnLevel: + logWriter = os.Stdout + case log.ErrorLevel, log.FatalLevel, log.PanicLevel: + logWriter = os.Stderr + } + + // Write log entry to output stream + hook.formatter.(*log.TextFormatter).ForceColors = hook.checkIfTerminal(logWriter) + lineBytes, err := hook.formatter.Format(entry) + if err != nil { + fmt.Fprintf(os.Stderr, "Unable to read entry, %v", err) + return err + } + if len(lineBytes) > MaxLogEntryLength { + logWriter.Write(lineBytes[:MaxLogEntryLength]) + logWriter.Write([]byte("\n")) + } else { + logWriter.Write(lineBytes) + } + + return nil +} + +// FileHook sends log entries to a file. +type FileHook struct { + logFileLocation string + formatter log.Formatter + mutex *sync.Mutex +} + +// NewFileHook creates a new log hook for writing to a file. +func NewFileHook(logName string) (*FileHook, error) { + + formatter := &PlainTextFormatter{} + + // If config.LogRoot doesn't exist, make it + dir, err := os.Lstat(LogRoot) + if os.IsNotExist(err) { + if err := os.MkdirAll(LogRoot, 0755); err != nil { + return nil, fmt.Errorf("Could not create log directory %v. %v", LogRoot, err) + } + } + // If config.LogRoot isn't a directory, return an error + if dir != nil && !dir.IsDir() { + return nil, fmt.Errorf("Log path %v exists and is not a directory, please remove.", LogRoot) + } + + // Build log file path + logFileLocation := "" + switch runtime.GOOS { + case utils.Linux: + logFileLocation = LogRoot + "/" + logName + ".log" + break + case utils.Darwin: + logFileLocation = LogRoot + "/" + logName + ".log" + break + case utils.Windows: + logFileLocation = logName + ".log" + break + } + + return &FileHook{logFileLocation, formatter, &sync.Mutex{}}, nil +} + +func (hook *FileHook) Levels() []log.Level { + return log.AllLevels +} + +func (hook *FileHook) Fire(entry *log.Entry) error { + + // Get formatted entry + lineBytes, err := hook.formatter.Format(entry) + if err != nil { + fmt.Fprintf(os.Stderr, "Could not read log entry. %v", err) + return err + } + + // Write log entry to file + logFile, err := hook.openFile() + if err != nil { + return err + } + logFile.WriteString(string(lineBytes)) + logFile.Close() + + // Rotate the file as needed + logEntry, _ := hook.doLogfileRotation() + if logEntry != nil { + logEntry.Info("Rotated log file.") + } + + return nil +} + +func (hook *FileHook) GetLocation() string { + return hook.logFileLocation +} + +func (hook *FileHook) openFile() (*os.File, error) { + + logFile, err := os.OpenFile(hook.logFileLocation, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0666) + if err != nil { + fmt.Fprintf(os.Stderr, "Could not open log file %v. %v", hook.logFileLocation, err) + return nil, err + } + return logFile, nil +} + +func (hook *FileHook) doLogfileRotation() (*log.Entry, error) { + + // Protect rotation from concurrent loggers + hook.mutex.Lock() + defer hook.mutex.Unlock() + + logFile, err := hook.openFile() + if err != nil { + return nil, err + } + + fileInfo, err := logFile.Stat() + if err != nil { + logFile.Close() + return nil, err + } + + size := fileInfo.Size() + logFile.Close() + + if size < LogRotationThreshold { + return nil, nil + } + + // Do the rotation. The Rename call will overwrite any previous .old file. + oldLogFileLocation := hook.logFileLocation + ".old" + os.Rename(hook.logFileLocation, oldLogFileLocation) + + // Don't log here, lest the mutex deadlock + rotationLogger := log.WithFields(log.Fields{ + "oldLogFileLocation": oldLogFileLocation, + "logFileLocation": hook.GetLocation(), + "logFileSize": size, + }) + + return rotationLogger, nil +} + +// PlainTextFormatter is a formatter than does no coloring *and* does not insist on writing logs as key/value pairs. +type PlainTextFormatter struct { + + // TimestampFormat to use for display when a full timestamp is printed + TimestampFormat string + + // The fields are sorted by default for a consistent output. For applications + // that log extremely frequently and don't use the JSON formatter this may not + // be desired. + DisableSorting bool +} + +func (f *PlainTextFormatter) Format(entry *log.Entry) ([]byte, error) { + + var b *bytes.Buffer + var keys []string = make([]string, 0, len(entry.Data)) + for k := range entry.Data { + keys = append(keys, k) + } + + if !f.DisableSorting { + sort.Strings(keys) + } + if entry.Buffer != nil { + b = entry.Buffer + } else { + b = &bytes.Buffer{} + } + + f.prefixFieldClashes(entry.Data) + + timestampFormat := f.TimestampFormat + if timestampFormat == "" { + timestampFormat = time.RFC3339 + } + f.printUncolored(b, entry, keys, timestampFormat) + b.WriteByte('\n') + + return b.Bytes(), nil +} + +func (f *PlainTextFormatter) prefixFieldClashes(data log.Fields) { + if t, ok := data["time"]; ok { + data["fields.time"] = t + } + + if m, ok := data["msg"]; ok { + data["fields.msg"] = m + } + + if l, ok := data["level"]; ok { + data["fields.level"] = l + } +} + +func (f *PlainTextFormatter) printUncolored(b *bytes.Buffer, entry *log.Entry, keys []string, timestampFormat string) { + + levelText := strings.ToUpper(entry.Level.String())[0:4] + + fmt.Fprintf(b, "%s[%s] %-44s ", levelText, entry.Time.Format(timestampFormat), entry.Message) + for _, k := range keys { + v := entry.Data[k] + fmt.Fprintf(b, " %s=", k) + f.appendValue(b, v) + } +} + +func (f *PlainTextFormatter) needsQuoting(text string) bool { + for _, ch := range text { + if !((ch >= 'a' && ch <= 'z') || + (ch >= 'A' && ch <= 'Z') || + (ch >= '0' && ch <= '9') || + ch == '-' || ch == '.') { + return true + } + } + return false +} + +func (f *PlainTextFormatter) appendValue(b *bytes.Buffer, value interface{}) { + switch value := value.(type) { + case string: + if !f.needsQuoting(value) { + b.WriteString(value) + } else { + fmt.Fprintf(b, "%q", value) + } + case error: + errmsg := value.Error() + if !f.needsQuoting(errmsg) { + b.WriteString(errmsg) + } else { + fmt.Fprintf(b, "%q", errmsg) + } + default: + fmt.Fprint(b, value) + } +} diff --git a/main.go b/main.go index 3f3c35d28..bdf414adb 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ package main import ( "flag" + "fmt" "os" "os/signal" "runtime" @@ -15,13 +16,17 @@ import ( "github.com/netapp/trident/config" "github.com/netapp/trident/core" "github.com/netapp/trident/frontend" + "github.com/netapp/trident/frontend/docker" "github.com/netapp/trident/frontend/kubernetes" "github.com/netapp/trident/frontend/rest" + "github.com/netapp/trident/logging" "github.com/netapp/trident/persistent_store" ) var ( - debug = flag.Bool("debug", false, "Enable debugging output") + debug = flag.Bool("debug", false, "Enable debugging output") + logLevel = flag.String("log_level", "info", "Logging level (debug, info, warn, error, fatal)") + k8sAPIServer = flag.String("k8s_api_server", "", "Kubernetes API server "+ "address to enable dynamic storage provisioning for Kubernetes.") k8sConfigPath = flag.String("k8s_config_path", "", "Path to KubeConfig file.") @@ -41,9 +46,15 @@ var ( port = flag.String("port", "8000", "Storage orchestrator API port") useInMemory = flag.Bool("no_persistence", false, "Does not persist "+ "any metadata. WILL LOSE TRACK OF VOLUMES ON REBOOT/CRASH.") - kubernetesVersion = "unknown" - storeClient persistent_store.Client - enableKubernetes bool + + enableDocker = flag.Bool("docker", false, "Enable Docker Volume Plugin frontend") + driverName = flag.String("volume_driver", "netapp", "Register as a Docker "+ + "volume plugin with this driver name") + driverPort = flag.String("driver_port", "", "Listen on this port instead of using a "+ + "Unix domain socket") + + storeClient persistent_store.Client + enableKubernetes bool ) func shouldEnableTLS() bool { @@ -62,9 +73,7 @@ func shouldEnableTLS() bool { func processCmdLineArgs() { var err error - if *debug { - log.SetLevel(log.DebugLevel) - } + // Don't bother validating the Kubernetes API server address; we'll know if // it's invalid during start-up. Given that users can specify DNS names, // validation would be more trouble than it's worth. @@ -101,17 +110,28 @@ func processCmdLineArgs() { } func main() { + + runtime.GOMAXPROCS(runtime.NumCPU()) + flag.Parse() + frontends := make([]frontend.FrontendPlugin, 0) + + // Set log level + err := logging.InitLogLevel(*debug, *logLevel) + if err != nil { + log.Fatal(err) + } + log.WithFields(log.Fields{ "version": config.OrchestratorVersion.String(), "build_time": config.BuildTime, }).Info("Running Trident storage orchestrator.") - frontends := make([]frontend.FrontendPlugin, 0) - runtime.GOMAXPROCS(runtime.NumCPU()) - flag.Parse() - processCmdLineArgs() + if enableKubernetes && *enableDocker { + log.Fatal("Trident cannot serve both Docker and Kubernetes at the same time.") + } + orchestrator := core.NewTridentOrchestrator(storeClient) if enableKubernetes { @@ -119,6 +139,7 @@ func main() { kubernetesFrontend frontend.FrontendPlugin err error ) + if *k8sAPIServer != "" { kubernetesFrontend, err = kubernetes.NewPlugin(orchestrator, *k8sAPIServer, *k8sConfigPath) } else { @@ -130,6 +151,21 @@ func main() { orchestrator.AddFrontend(kubernetesFrontend) frontends = append(frontends, kubernetesFrontend) } + if *enableDocker { + + // Set up multi-output logging + err := logging.InitLogging(*driverName) + if err != nil { + fmt.Fprint(os.Stderr, err) + os.Exit(1) + } + + dockerPlugin, err := docker.NewPlugin(*driverName, *driverPort, orchestrator) + if err != nil { + log.Fatalf("Unable to start the Docker frontend. %v", err) + } + frontends = append(frontends, dockerPlugin) + } restServer := rest.NewAPIServer(orchestrator, *address, *port) frontends = append(frontends, restServer) // Bootstrapping the orchestrator diff --git a/storage_class/storage_class.go b/storage_class/storage_class.go index 4d9bc31d5..90b2b27e5 100644 --- a/storage_class/storage_class.go +++ b/storage_class/storage_class.go @@ -38,24 +38,42 @@ func NewFromPersistent(persistent *StorageClassPersistent) *StorageClass { } func (s *StorageClass) Matches(storagePool *storage.StoragePool) bool { + + log.WithFields(log.Fields{ + "pool": storagePool.Name, + "storageClass": s.GetName(), + }).Debug("Checking pool for storage class") + + // First consider requiredStorage, since a match of the pool name will always return true + // regardless of the storage pool attributes. if len(s.config.BackendStoragePools) > 0 { if storagePoolList, ok := s.config.BackendStoragePools[storagePool.Backend.Name]; ok { for _, storagePoolName := range storagePoolList { if storagePoolName == storagePool.Name { + log.WithField("pool", storagePoolName).Debug("Matched by pool name.") return true } } } - } - matches := len(s.config.Attributes) > 0 - for name, request := range s.config.Attributes { - if storagePool.Attributes == nil { + + // Handle the sub-case where requiredStorage is specified (but didn't match) and + // there are no attributes specified in the storage class. This should always return + // false. + if len(s.config.Attributes) == 0 { log.WithFields(log.Fields{ "storageClass": s.GetName(), "pool": storagePool.Name, - "attribute": name, - }).Panic("Storage pool attributes are nil") + }).Debug("Pool failed to match storage class requiredStorage attribute.") + return false } + } + + // Now handle the case where requiredStorage produced no match, so it's up to the storage + // class attributes to determine whether a pool matches. Because storage class attributes + // are used to restrict the set of matching pools, it follows that a storage class without + // any attributes (or requiredStorage!) should match all pools. + matches := true + for name, request := range s.config.Attributes { if offer, ok := storagePool.Attributes[name]; !ok || !offer.Matches(request) { log.WithFields(log.Fields{ "offer": offer, @@ -63,8 +81,8 @@ func (s *StorageClass) Matches(storagePool *storage.StoragePool) bool { "storageClass": s.GetName(), "pool": storagePool.Name, "attribute": name, - "found": ok}).Debug("Attribute for storage " + - "pool failed to match storage class.") + "found": ok, + }).Debug("Attribute for storage pool failed to match storage class.") matches = false break } @@ -76,15 +94,27 @@ func (s *StorageClass) Matches(storagePool *storage.StoragePool) bool { // for a given backend. If the pool satisfies the storage class, it // adds that pool. Returns the number of storage pools added. func (s *StorageClass) CheckAndAddBackend(b *storage.StorageBackend) int { + + log.WithFields(log.Fields{ + "backend": b.Name, + "storageClass": s.GetName(), + }).Debug("Checking backend for storage class") + if !b.Online { + log.WithField("backend", b.Name).Warn("Backend not online.") return 0 } + added := 0 for _, storagePool := range b.Storage { if s.Matches(storagePool) { s.pools = append(s.pools, storagePool) storagePool.AddStorageClass(s.GetName()) added++ + log.WithFields(log.Fields{ + "pool": storagePool.Name, + "storageClass": s.GetName(), + }).Debug("Storage class added to the storage pool.") } } return added