From b925271db0354863a3828d9381a79ce6a9458339 Mon Sep 17 00:00:00 2001 From: Starnop Date: Fri, 8 Mar 2019 15:59:02 +0800 Subject: [PATCH] feature: add storage driver Signed-off-by: Starnop --- supernode/store/local_storage.go | 237 ++++++++++++++++++++++++++ supernode/store/local_storage_test.go | 206 ++++++++++++++++++++++ supernode/store/storage_driver.go | 79 +++++++++ supernode/store/store.go | 85 +++++++++ supernode/store/store_mgr.go | 73 ++++++++ 5 files changed, 680 insertions(+) create mode 100644 supernode/store/local_storage.go create mode 100644 supernode/store/local_storage_test.go create mode 100644 supernode/store/storage_driver.go create mode 100644 supernode/store/store.go create mode 100644 supernode/store/store_mgr.go diff --git a/supernode/store/local_storage.go b/supernode/store/local_storage.go new file mode 100644 index 000000000..a21d46f83 --- /dev/null +++ b/supernode/store/local_storage.go @@ -0,0 +1,237 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path" + "sync" + + "github.com/dragonflyoss/Dragonfly/common/util" +) + +// LocalStorageDriver is a const of local storage driver. +const LocalStorageDriver = "local" + +func init() { + Register(LocalStorageDriver, NewLocalStorage) +} + +type fileMutex struct { + path string + fileInfo os.FileInfo + sync.RWMutex +} + +// LocalStorage is one of the implementions of StorageDriver by locally. +type localStorage struct { + baseDir string + buckets map[string]string +} + +// NewLocalStorage performs initialization for LocalStorage and return a StorageDriver. +func NewLocalStorage(config interface{}) (StorageDriver, error) { + // type assertion for config + cfg, ok := config.(*localStorage) + if !ok { + return nil, fmt.Errorf("failed to parse config") + } + + // prepare the base dir + if !path.IsAbs(cfg.baseDir) { + return nil, fmt.Errorf("Not absolute path: %s", cfg.baseDir) + } + if err := util.CreateDirectory(cfg.baseDir); err != nil { + return nil, err + } + + return &localStorage{ + baseDir: cfg.baseDir, + buckets: cfg.buckets, + }, nil +} + +// Get the content of key from storage and return in io stream. +func (lb *localStorage) Get(raw *Raw, writer io.Writer) error { + file, err := lb.statPath(raw.bucket, raw.key) + if err != nil { + return err + } + + file.RLock() + defer file.RUnlock() + + f, err := os.Open(file.path) + if err != nil { + return err + } + defer f.Close() + + f.Seek(raw.offset, 0) + if raw.length <= 0 { + _, err = io.Copy(writer, f) + } else { + _, err = io.CopyN(writer, f, raw.length) + } + + if err != nil { + return err + } + return nil +} + +// GetBytes gets the content of key from storage and return in bytes. +func (lb *localStorage) GetBytes(raw *Raw) (data []byte, err error) { + file, err := lb.statPath(raw.bucket, raw.key) + if err != nil { + return nil, err + } + + file.RLock() + defer file.RUnlock() + + f, err := os.Open(file.path) + if err != nil { + return nil, err + } + defer f.Close() + + f.Seek(raw.offset, 0) + if raw.length <= 0 { + data, err = ioutil.ReadAll(f) + } else { + data = make([]byte, raw.length) + _, err = f.Read(data) + } + + if err != nil { + return nil, err + } + return data, nil +} + +// Put reads the content from reader and put it into storage. +func (lb *localStorage) Put(raw *Raw, data io.Reader) error { + file, err := lb.preparePath(raw.bucket, raw.key) + if err != nil { + return err + } + + file.Lock() + defer file.Unlock() + + f, err := os.OpenFile(file.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644) + if err != nil { + return err + } + defer f.Close() + + f.Seek(raw.offset, 0) + if _, err = io.Copy(f, data); err != nil { + return err + } + + return nil +} + +// PutBytes puts the content of key from storage with bytes. +func (lb *localStorage) PutBytes(raw *Raw, data []byte) error { + file, err := lb.preparePath(raw.bucket, raw.key) + if err != nil { + return err + } + + file.Lock() + defer file.Unlock() + + f, err := os.OpenFile(file.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644) + if err != nil { + return err + } + defer f.Close() + + f.Seek(raw.offset, 0) + if _, err := f.Write(data); err != nil { + return err + } + + return nil +} + +// Stat determine whether the file exists. +func (lb *localStorage) Stat(raw *Raw) (*StorageInfo, error) { + file, err := lb.statPath(raw.bucket, raw.key) + if err != nil { + return nil, err + } + + return &StorageInfo{ + Path: file.path, + Size: file.fileInfo.Size(), + ModTime: file.fileInfo.ModTime(), + }, nil +} + +// Remove deletes a file or dir. +func (lb *localStorage) Remove(raw *Raw) error { + file, err := lb.statPath(raw.bucket, raw.key) + if err != nil { + return err + } + + file.Lock() + defer file.Unlock() + + if err := os.RemoveAll(file.path); err != nil { + return err + } + return nil +} + +// helper function + +// preparePath gets the target path and creates the upper directory if it does not exist. +func (lb *localStorage) preparePath(bucket, key string) (*fileMutex, error) { + dir := path.Join(lb.baseDir, key) + + if err := util.CreateDirectory(dir); err != nil { + return nil, err + } + + target := path.Join(dir, key+lb.buckets[bucket]) + return &fileMutex{path: target}, nil +} + +// statPath determines whether the target file exists and returns an fileMutex if so. +func (lb *localStorage) statPath(bucket, key string) (*fileMutex, error) { + filePath := path.Join(lb.baseDir, key, key+lb.buckets[bucket]) + f, err := os.Stat(filePath) + if err != nil { + if os.IsNotExist(err) { + return nil, ErrNotFound + } + return nil, err + } + + return &fileMutex{ + path: filePath, + fileInfo: f, + }, nil +} diff --git a/supernode/store/local_storage_test.go b/supernode/store/local_storage_test.go new file mode 100644 index 000000000..b6eb62216 --- /dev/null +++ b/supernode/store/local_storage_test.go @@ -0,0 +1,206 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strings" + "testing" + + "github.com/go-check/check" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type LocalStorageSuite struct { + workHome string + storeLocal *Store + configs map[string]*ManagerConfig +} + +func init() { + check.Suite(&LocalStorageSuite{}) +} + +func (s *LocalStorageSuite) SetUpSuite(c *check.C) { + s.workHome, _ = ioutil.TempDir("/tmp", "storedriver-StoreTestSuite-") + + s.configs = make(map[string]*ManagerConfig) + s.configs["driver1"] = &ManagerConfig{ + driverName: LocalStorageDriver, + driverConfig: &localStorage{ + baseDir: path.Join(s.workHome, "download"), + buckets: map[string]string{"meta": ".meta"}, + }, + } + + // init StorageManager + sm, err := NewManager(s.configs) + c.Assert(err, check.IsNil) + + // init store with local storage driver1 + s.storeLocal, err = sm.Get("driver1") + c.Assert(err, check.IsNil) +} + +func (s *LocalStorageSuite) TearDownSuite(c *check.C) { + if s.workHome != "" { + if err := os.RemoveAll(s.workHome); err != nil { + fmt.Printf("remove path:%s error", s.workHome) + } + } +} + +func (s *LocalStorageSuite) TestGetPutBytes(c *check.C) { + var cases = []struct { + raw *Raw + data []byte + expected string + }{ + { + raw: &Raw{ + bucket: "meta", + key: "bytes", + }, + data: []byte("hello meta file"), + expected: "hello meta file", + }, + { + raw: &Raw{ + bucket: "meta", + key: "bytes", + offset: 0, + length: 5, + }, + data: []byte("hello meta file"), + expected: "hello", + }, + { + raw: &Raw{ + bucket: "meta", + key: "bytes", + offset: 2, + length: -1, + }, + data: []byte("hello meta file"), + expected: "hello meta file", + }, + } + + for _, v := range cases { + // put + s.storeLocal.PutBytes(v.raw, v.data) + + // get + result, err := s.storeLocal.GetBytes(v.raw) + c.Assert(err, check.IsNil) + c.Assert(string(result), check.Equals, v.expected) + + // stat + s.checkStat(v.raw, c) + + // remove + s.checkRemove(v.raw, c) + } + +} + +func (s *LocalStorageSuite) TestGetPut(c *check.C) { + var cases = []struct { + raw *Raw + data io.Reader + expected string + }{ + { + raw: &Raw{ + bucket: "meta", + key: "iorw", + }, + data: strings.NewReader("hello meta file"), + expected: "hello meta file", + }, + { + raw: &Raw{ + bucket: "meta", + key: "iorw", + offset: 2, + length: 5, + }, + data: strings.NewReader("hello meta file"), + expected: "hello", + }, + { + raw: &Raw{ + bucket: "meta", + key: "iorw", + offset: 2, + length: -1, + }, + data: strings.NewReader("hello meta file"), + expected: "hello meta file", + }, + } + + for _, v := range cases { + // put + s.storeLocal.Put(v.raw, v.data) + + // get + buf1 := new(bytes.Buffer) + err := s.storeLocal.Get(v.raw, buf1) + c.Assert(err, check.IsNil) + c.Assert(buf1.String(), check.Equals, v.expected) + + // stat + s.checkStat(v.raw, c) + + // remove + s.checkRemove(v.raw, c) + } + +} + +// helper function + +func (s *LocalStorageSuite) checkStat(raw *Raw, c *check.C) { + info, err := s.storeLocal.Stat(raw) + c.Assert(err, check.IsNil) + + cfg := s.storeLocal.config.(*localStorage) + pathTemp := path.Join(cfg.baseDir, raw.key, raw.key+cfg.buckets[raw.bucket]) + f, _ := os.Stat(pathTemp) + c.Assert(info, check.DeepEquals, &StorageInfo{ + Path: pathTemp, + Size: f.Size(), + ModTime: f.ModTime(), + }) +} + +func (s *LocalStorageSuite) checkRemove(raw *Raw, c *check.C) { + err := s.storeLocal.Remove(raw) + c.Assert(err, check.IsNil) + + _, err = s.storeLocal.Stat(raw) + c.Assert(err, check.DeepEquals, ErrNotFound) +} diff --git a/supernode/store/storage_driver.go b/supernode/store/storage_driver.go new file mode 100644 index 000000000..bfc18406e --- /dev/null +++ b/supernode/store/storage_driver.go @@ -0,0 +1,79 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "fmt" + "io" + "time" +) + +var ( + // ErrNotFound is an error which will be returned + // when the key can not be found. + ErrNotFound = fmt.Errorf("the key not found") +) + +// StorageDriver defines an interface to manage the data stored in the driver. +type StorageDriver interface { + // Get data from the storage based on raw information. + // The data should be written into the writer as io stream. + // If the length<=0, the driver should return all data from the raw.offest. + // Otherwise, just return the data which starts from raw.offset and the length is raw.length. + Get(raw *Raw, writer io.Writer) error + + // Get data from the storage based on raw information. + // The data should be returned in bytes. + // If the length<=0, the storage driver should return all data from the raw.offest. + // Otherwise, just return the data which starts from raw.offset and the length is raw.length. + GetBytes(raw *Raw) ([]byte, error) + + // Put the data into the storage with raw information. + // The storage will get data from io.Reader as io stream. + // If the offset>0, the storage driver should starting at byte raw.offset off. + Put(raw *Raw, data io.Reader) error + + // PutBytes puts the data into the storage with raw information. + // The data is passed in bytes. + // If the offset>0, the storage driver should starting at byte raw.offset off. + PutBytes(raw *Raw, data []byte) error + + // Remove the data from the storage based on raw information. + Remove(raw *Raw) error + + // Stat determine whether the data exists based on raw information. + // If that, and return some info that in the form of struct StorageInfo. + // If not, return the ErrNotFound. + Stat(raw *Raw) (*StorageInfo, error) +} + +// Raw identifies a piece of data uniquely. +// If the length<=0, it represents all data. +type Raw struct { + bucket string + key string + offset int64 + length int64 +} + +// StorageInfo includes partial meta information of the data. +type StorageInfo struct { + Path string + Size int64 + CreateTime time.Time + ModTime time.Time +} diff --git a/supernode/store/store.go b/supernode/store/store.go new file mode 100644 index 000000000..e36f21020 --- /dev/null +++ b/supernode/store/store.go @@ -0,0 +1,85 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "fmt" + "io" +) + +// Store is a wrapper of the storage which implements the interface of StorageDriver. +type Store struct { + // name is a unique identifier, you can also name it ID. + driverName string + // config is used to init storage driver. + config interface{} + // driver holds a storage which implements the interface of StorageDriver + driver StorageDriver +} + +// NewStore create a new Store instance. +func NewStore(name string, cfg interface{}) (*Store, error) { + // determine whether the driver has been registered + initer, ok := driverFactory[name] + if !ok { + return nil, fmt.Errorf("unregisterd storage driver : %s", name) + } + + // init driver with specific config + driver, err := initer(cfg) + if err != nil { + return nil, fmt.Errorf("init storage driver failed %s: %v", name, cfg) + } + + return &Store{ + driverName: name, + config: cfg, + driver: driver, + }, nil +} + +// Get the data from the storage driver in io stream. +func (s *Store) Get(raw *Raw, writer io.Writer) error { + return s.driver.Get(raw, writer) +} + +// GetBytes gets the data from the storage driver in bytes. +func (s *Store) GetBytes(raw *Raw) ([]byte, error) { + return s.driver.GetBytes(raw) +} + +// Put puts data into the storage in io stream. +func (s *Store) Put(raw *Raw, data io.Reader) error { + return s.driver.Put(raw, data) +} + +// PutBytes puts data into the storage in bytes. +func (s *Store) PutBytes(raw *Raw, data []byte) error { + return s.driver.PutBytes(raw, data) +} + +// Remove the data from the storage based on raw information. +func (s *Store) Remove(raw *Raw) error { + return s.driver.Remove(raw) +} + +// Stat determine whether the data exists based on raw information. +// If that, and return some info that in the form of struct StorageInfo. +// If not, return the ErrNotFound. +func (s *Store) Stat(raw *Raw) (*StorageInfo, error) { + return s.driver.Stat(raw) +} diff --git a/supernode/store/store_mgr.go b/supernode/store/store_mgr.go new file mode 100644 index 000000000..c7d03dbd4 --- /dev/null +++ b/supernode/store/store_mgr.go @@ -0,0 +1,73 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "fmt" + "sync" +) + +var ( + storeMap sync.Map + driverFactory = make(map[string]initFunc) +) + +type initFunc func(config interface{}) (StorageDriver, error) + +// Register defines an interface to register a driver with specified name. +// All drivers should call this function to register itself to the driverFactory. +func Register(name string, initializer initFunc) { + driverFactory[name] = initializer +} + +// Manager manage stores. +type Manager struct { +} + +// ManagerConfig wraps the config that defined in the config file. +type ManagerConfig struct { + driverName string + driverConfig interface{} +} + +// NewManager create a store manager. +func NewManager(configs map[string]*ManagerConfig) (*Manager, error) { + if configs == nil { + return nil, fmt.Errorf("empty configs") + } + for name, config := range configs { + // initialize store + store, err := NewStore(config.driverName, config.driverConfig) + if err != nil { + return nil, err + } + storeMap.Store(name, store) + } + return &Manager{}, nil +} + +// Get a store from manager with specified name. +func (sm *Manager) Get(name string) (*Store, error) { + v, ok := storeMap.Load(name) + if !ok { + return nil, fmt.Errorf("not existed storage: %s", name) + } + if store, ok := v.(*Store); ok { + return store, nil + } + return nil, fmt.Errorf("get store error: unknown reason") +}