diff --git a/common/util/file_util.go b/common/util/file_util.go index 977012783..28c4d0fe6 100644 --- a/common/util/file_util.go +++ b/common/util/file_util.go @@ -23,6 +23,7 @@ import ( "io" "os" "path/filepath" + "syscall" ) // BufferSize define the buffer size when reading and writing file @@ -206,3 +207,9 @@ func Md5Sum(name string) string { return fmt.Sprintf("%x", h.Sum(nil)) } + +// GetSys returns the underlying data source of the os.FileInfo. +func GetSys(info os.FileInfo) (*syscall.Stat_t, bool) { + sys, ok := info.Sys().(*syscall.Stat_t) + return sys, ok +} diff --git a/dfdaemon/initializer/atime_darwin_amd64.go b/common/util/stat_darwin_amd64.go similarity index 52% rename from dfdaemon/initializer/atime_darwin_amd64.go rename to common/util/stat_darwin_amd64.go index e5a625387..e224c6c0f 100644 --- a/dfdaemon/initializer/atime_darwin_amd64.go +++ b/common/util/stat_darwin_amd64.go @@ -14,11 +14,29 @@ * limitations under the License. */ -package initializer +package util -import "syscall" +import ( + "syscall" + "time" +) -// Atime returns the last access time in seconds -func Atime(stat *syscall.Stat_t) int64 { +// Atime returns the last access time in time.Time. +func Atime(stat *syscall.Stat_t) time.Time { + return time.Unix(stat.Atimespec.Sec, stat.Atimespec.Nsec) +} + +// AtimeSec returns the last access time in seconds. +func AtimeSec(stat *syscall.Stat_t) int64 { return stat.Atimespec.Sec } + +// Ctime returns the create time in time.Time. +func Ctime(stat *syscall.Stat_t) time.Time { + return time.Unix(stat.Ctimespec.Sec, stat.Ctimespec.Nsec) +} + +// CtimeSec returns the create time in seconds. +func CtimeSec(stat *syscall.Stat_t) int64 { + return stat.Ctimespec.Sec +} diff --git a/dfdaemon/initializer/atime_linux_amd64.go b/common/util/stat_linux_amd64.go similarity index 53% rename from dfdaemon/initializer/atime_linux_amd64.go rename to common/util/stat_linux_amd64.go index e62bab9ad..61545c95e 100644 --- a/dfdaemon/initializer/atime_linux_amd64.go +++ b/common/util/stat_linux_amd64.go @@ -14,11 +14,29 @@ * limitations under the License. */ -package initializer +package util -import "syscall" +import ( + "syscall" + "time" +) -// Atime returns the last access time in seconds -func Atime(stat *syscall.Stat_t) int64 { +// Atime returns the last access time in time.Time. +func Atime(stat *syscall.Stat_t) time.Time { + return time.Unix(stat.Atim.Sec, stat.Atim.Nsec) +} + +// AtimeSec returns the last access time in seconds. +func AtimeSec(stat *syscall.Stat_t) int64 { return stat.Atim.Sec } + +// Ctime returns the create time in time.Time. +func Ctime(stat *syscall.Stat_t) time.Time { + return time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec) +} + +// CtimeSec returns the create time in seconds. +func CtimeSec(stat *syscall.Stat_t) int64 { + return stat.Ctim.Sec +} diff --git a/dfdaemon/initializer/initializer.go b/dfdaemon/initializer/initializer.go index d50a5891c..f5f0c3d84 100644 --- a/dfdaemon/initializer/initializer.go +++ b/dfdaemon/initializer/initializer.go @@ -85,13 +85,13 @@ func cleanLocalRepo(options *options.Options) { return nil } // get the last access time - statT, ok := info.Sys().(*syscall.Stat_t) + statT, ok := util.GetSys(info) if !ok { log.Warnf("ignore %s: failed to get last access time", path) return nil } // if the last access time is 1 hour ago - if time.Now().Unix()-Atime(statT) >= 3600 { + if time.Now().Unix()-util.AtimeSec(statT) >= 3600 { if err := os.Remove(path); err == nil { log.Infof("remove file:%s success", path) } else { diff --git a/supernode/store/local_storage.go b/supernode/store/local_storage.go new file mode 100644 index 000000000..f0b479f11 --- /dev/null +++ b/supernode/store/local_storage.go @@ -0,0 +1,285 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path" + "sync" + "sync/atomic" + "time" + + "github.com/dragonflyoss/Dragonfly/common/util" +) + +// LocalStorageDriver is a const of local storage driver. +const LocalStorageDriver = "local" + +var fileMutexLocker sync.Map + +func init() { + Register(LocalStorageDriver, NewLocalStorage) +} + +type fileMutex struct { + count int32 + sync.RWMutex +} + +func getLock(key string, ro bool) { + v, _ := fileMutexLocker.LoadOrStore(key, &fileMutex{}) + f := v.(*fileMutex) + + if ro { + f.RLock() + } else { + f.Lock() + } + + atomic.AddInt32(&f.count, 1) +} + +func releaseLock(key string, ro bool) { + v, ok := fileMutexLocker.Load(key) + if !ok { + // return fmt.Errorf("panic error") + } + f := v.(*fileMutex) + + atomic.AddInt32(&f.count, -1) + + if ro { + f.RUnlock() + } else { + f.Unlock() + } + + if f.count < 1 { + fileMutexLocker.Delete(key) + } +} + +// LocalStorage is one of the implementions of StorageDriver by locally. +type localStorage struct { + // baseDir is the dir that local storage driver will store content based on it. + baseDir string +} + +// NewLocalStorage performs initialization for LocalStorage and return a StorageDriver. +func NewLocalStorage(config interface{}) (StorageDriver, error) { + // type assertion for config + cfg, ok := config.(*localStorage) + if !ok { + return nil, fmt.Errorf("failed to parse config") + } + + // prepare the base dir + if !path.IsAbs(cfg.baseDir) { + return nil, fmt.Errorf("not absolute path: %s", cfg.baseDir) + } + if err := util.CreateDirectory(cfg.baseDir); err != nil { + return nil, err + } + + return &localStorage{ + baseDir: cfg.baseDir, + }, nil +} + +// Get the content of key from storage and return in io stream. +func (ls *localStorage) Get(raw *Raw, writer io.Writer) error { + path, _, err := ls.statPath(raw.key) + if err != nil { + return err + } + + getLock(getLockKey(path, raw.offset), true) + defer releaseLock(getLockKey(path, raw.offset), true) + + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + f.Seek(raw.offset, 0) + if raw.length <= 0 { + _, err = io.Copy(writer, f) + } else { + _, err = io.CopyN(writer, f, raw.length) + } + + if err != nil { + return err + } + return nil +} + +// GetBytes gets the content of key from storage and return in bytes. +func (ls *localStorage) GetBytes(raw *Raw) (data []byte, err error) { + path, _, err := ls.statPath(raw.key) + if err != nil { + return nil, err + } + + getLock(getLockKey(path, raw.offset), true) + defer releaseLock(getLockKey(path, raw.offset), true) + + f, err := os.Open(path) + if err != nil { + return nil, err + } + defer f.Close() + + f.Seek(raw.offset, 0) + if raw.length <= 0 { + data, err = ioutil.ReadAll(f) + } else { + data = make([]byte, raw.length) + _, err = f.Read(data) + } + + if err != nil { + return nil, err + } + return data, nil +} + +// Put reads the content from reader and put it into storage. +func (ls *localStorage) Put(raw *Raw, data io.Reader) error { + path, err := ls.preparePath(raw.key) + if err != nil { + return err + } + + getLock(getLockKey(path, raw.offset), false) + defer releaseLock(getLockKey(path, raw.offset), false) + + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644) + if err != nil { + return err + } + defer f.Close() + + f.Seek(raw.offset, 0) + if _, err = io.Copy(f, data); err != nil { + return err + } + + return nil +} + +// PutBytes puts the content of key from storage with bytes. +func (ls *localStorage) PutBytes(raw *Raw, data []byte) error { + path, err := ls.preparePath(raw.key) + if err != nil { + return err + } + + getLock(getLockKey(path, raw.offset), false) + defer releaseLock(getLockKey(path, raw.offset), false) + + f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644) + if err != nil { + return err + } + defer f.Close() + + f.Seek(raw.offset, 0) + if _, err := f.Write(data); err != nil { + return err + } + + return nil +} + +// Stat determine whether the file exists. +func (ls *localStorage) Stat(raw *Raw) (*StorageInfo, error) { + path, fileInfo, err := ls.statPath(raw.key) + if err != nil { + return nil, err + } + + sys, ok := util.GetSys(fileInfo) + if !ok { + return nil, fmt.Errorf("get create time error") + } + return &StorageInfo{ + Path: path, + Size: fileInfo.Size(), + CreateTime: util.Ctime(sys), + ModTime: fileInfo.ModTime(), + }, nil +} + +// Remove deletes a file or dir. +func (ls *localStorage) Remove(raw *Raw) error { + path, _, err := ls.statPath(raw.key) + if err != nil { + return err + } + + getLock(getLockKey(path, raw.offset), false) + defer releaseLock(getLockKey(path, raw.offset), false) + + if err := os.RemoveAll(path); err != nil { + return err + } + return nil +} + +// helper function + +// preparePath gets the target path and creates the upper directory if it does not exist. +func (ls *localStorage) preparePath(key string) (string, error) { + dir := path.Join(ls.baseDir, getPrefix(key)) + + if err := util.CreateDirectory(dir); err != nil { + return "", err + } + + target := path.Join(dir, key) + return target, nil +} + +// statPath determines whether the target file exists and returns an fileMutex if so. +func (ls *localStorage) statPath(key string) (string, os.FileInfo, error) { + filePath := path.Join(ls.baseDir, getPrefix(key), key) + f, err := os.Stat(filePath) + if err != nil { + if os.IsNotExist(err) { + return "", nil, ErrNotFound + } + return "", nil, err + } + + return filePath, f, nil +} + +func getLockKey(path string, offset int64) string { + return fmt.Sprintf("%s%d%d", path, offset, time.Now().Unix()) +} + +func getPrefix(str string) string { + if len(str) > 3 { + return string([]byte(str)[:3]) + } + return str +} diff --git a/supernode/store/local_storage_test.go b/supernode/store/local_storage_test.go new file mode 100644 index 000000000..ed99af682 --- /dev/null +++ b/supernode/store/local_storage_test.go @@ -0,0 +1,220 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "bytes" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "strings" + "testing" + + "github.com/dragonflyoss/Dragonfly/common/util" + + "github.com/go-check/check" +) + +func Test(t *testing.T) { + check.TestingT(t) +} + +type LocalStorageSuite struct { + workHome string + storeLocal *Store + configs map[string]*ManagerConfig +} + +func init() { + check.Suite(&LocalStorageSuite{}) +} + +func (s *LocalStorageSuite) SetUpSuite(c *check.C) { + s.workHome, _ = ioutil.TempDir("/tmp", "storedriver-StoreTestSuite-") + + s.configs = make(map[string]*ManagerConfig) + s.configs["driver1"] = &ManagerConfig{ + driverName: LocalStorageDriver, + driverConfig: &localStorage{ + baseDir: path.Join(s.workHome, "download"), + }, + } + + // init StorageManager + sm, err := NewManager(s.configs) + c.Assert(err, check.IsNil) + + // init store with local storage driver1 + s.storeLocal, err = sm.Get("driver1") + c.Assert(err, check.IsNil) +} + +func (s *LocalStorageSuite) TearDownSuite(c *check.C) { + if s.workHome != "" { + if err := os.RemoveAll(s.workHome); err != nil { + fmt.Printf("remove path:%s error", s.workHome) + } + } +} + +func (s *LocalStorageSuite) TestGetPutBytes(c *check.C) { + var cases = []struct { + raw *Raw + data []byte + expected string + }{ + { + raw: &Raw{ + key: "foo1", + }, + data: []byte("hello foo"), + expected: "hello foo", + }, + { + raw: &Raw{ + key: "foo2", + offset: 0, + length: 5, + }, + data: []byte("hello foo"), + expected: "hello", + }, + { + raw: &Raw{ + key: "foo3", + offset: 2, + length: -1, + }, + data: []byte("hello foo"), + expected: "hello foo", + }, + } + + for _, v := range cases { + // put + s.storeLocal.PutBytes(v.raw, v.data) + + // get + result, err := s.storeLocal.GetBytes(v.raw) + c.Assert(err, check.IsNil) + c.Assert(string(result), check.Equals, v.expected) + + // stat + s.checkStat(v.raw, c) + + // remove + s.checkRemove(v.raw, c) + } + +} + +func (s *LocalStorageSuite) TestGetPut(c *check.C) { + var cases = []struct { + raw *Raw + data io.Reader + expected string + }{ + { + raw: &Raw{ + key: "foo1.meta", + }, + data: strings.NewReader("hello meta file"), + expected: "hello meta file", + }, + { + raw: &Raw{ + key: "foo2.meta", + offset: 2, + length: 5, + }, + data: strings.NewReader("hello meta file"), + expected: "hello", + }, + { + raw: &Raw{ + key: "foo3.meta", + offset: 2, + length: -1, + }, + data: strings.NewReader("hello meta file"), + expected: "hello meta file", + }, + } + + for _, v := range cases { + // put + s.storeLocal.Put(v.raw, v.data) + + // get + buf1 := new(bytes.Buffer) + err := s.storeLocal.Get(v.raw, buf1) + c.Assert(err, check.IsNil) + c.Assert(buf1.String(), check.Equals, v.expected) + + // stat + s.checkStat(v.raw, c) + + // remove + s.checkRemove(v.raw, c) + } + +} + +func (s *LocalStorageSuite) TestGetPrefix(c *check.C) { + var cases = []struct { + str string + expected string + }{ + {"foo", "foo"}, + {"footest", "foo"}, + {"fo", "fo"}, + } + + for _, v := range cases { + result := getPrefix(v.str) + c.Check(result, check.Equals, v.expected) + } +} + +// helper function + +func (s *LocalStorageSuite) checkStat(raw *Raw, c *check.C) { + info, err := s.storeLocal.Stat(raw) + c.Assert(err, check.IsNil) + + cfg := s.storeLocal.config.(*localStorage) + pathTemp := path.Join(cfg.baseDir, getPrefix(raw.key), raw.key) + f, _ := os.Stat(pathTemp) + sys, _ := util.GetSys(f) + + c.Assert(info, check.DeepEquals, &StorageInfo{ + Path: pathTemp, + Size: f.Size(), + ModTime: f.ModTime(), + CreateTime: util.Ctime(sys), + }) +} + +func (s *LocalStorageSuite) checkRemove(raw *Raw, c *check.C) { + err := s.storeLocal.Remove(raw) + c.Assert(err, check.IsNil) + + _, err = s.storeLocal.Stat(raw) + c.Assert(err, check.DeepEquals, ErrNotFound) +} diff --git a/supernode/store/storage_driver.go b/supernode/store/storage_driver.go new file mode 100644 index 000000000..df01e9449 --- /dev/null +++ b/supernode/store/storage_driver.go @@ -0,0 +1,87 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "fmt" + "io" + "time" +) + +var ( + // ErrNotFound is an error which will be returned + // when the key can not be found. + ErrNotFound = fmt.Errorf("the key not found") + + // ErrEmptyKey is an error when the key is empty. + ErrEmptyKey = fmt.Errorf("the key is empty") +) + +// StorageDriver defines an interface to manage the data stored in the driver. +// +// NOTE: +// It is recommended that the lock granularity of the driver should be in piece. +// That means that the storage driver could read and write +// the different pieces of the same file concurrently. +type StorageDriver interface { + // Get data from the storage based on raw information. + // The data should be written into the writer as io stream. + // If the length<=0, the driver should return all data from the raw.offest. + // Otherwise, just return the data which starts from raw.offset and the length is raw.length. + Get(raw *Raw, writer io.Writer) error + + // Get data from the storage based on raw information. + // The data should be returned in bytes. + // If the length<=0, the storage driver should return all data from the raw.offest. + // Otherwise, just return the data which starts from raw.offset and the length is raw.length. + GetBytes(raw *Raw) ([]byte, error) + + // Put the data into the storage with raw information. + // The storage will get data from io.Reader as io stream. + // If the offset>0, the storage driver should starting at byte raw.offset off. + Put(raw *Raw, data io.Reader) error + + // PutBytes puts the data into the storage with raw information. + // The data is passed in bytes. + // If the offset>0, the storage driver should starting at byte raw.offset off. + PutBytes(raw *Raw, data []byte) error + + // Remove the data from the storage based on raw information. + Remove(raw *Raw) error + + // Stat determine whether the data exists based on raw information. + // If that, and return some info that in the form of struct StorageInfo. + // If not, return the ErrNotFound. + Stat(raw *Raw) (*StorageInfo, error) +} + +// Raw identifies a piece of data uniquely. +// If the length<=0, it represents all data. +type Raw struct { + bucket string + key string + offset int64 + length int64 +} + +// StorageInfo includes partial meta information of the data. +type StorageInfo struct { + Path string + Size int64 + CreateTime time.Time + ModTime time.Time +} diff --git a/supernode/store/store.go b/supernode/store/store.go new file mode 100644 index 000000000..f6760b2dc --- /dev/null +++ b/supernode/store/store.go @@ -0,0 +1,111 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "fmt" + "io" + "strings" +) + +// Store is a wrapper of the storage which implements the interface of StorageDriver. +type Store struct { + // name is a unique identifier, you can also name it ID. + driverName string + // config is used to init storage driver. + config interface{} + // driver holds a storage which implements the interface of StorageDriver + driver StorageDriver +} + +// NewStore create a new Store instance. +func NewStore(name string, cfg interface{}) (*Store, error) { + // determine whether the driver has been registered + initer, ok := driverFactory[name] + if !ok { + return nil, fmt.Errorf("unregisterd storage driver : %s", name) + } + + // init driver with specific config + driver, err := initer(cfg) + if err != nil { + return nil, fmt.Errorf("init storage driver failed %s: %v", name, cfg) + } + + return &Store{ + driverName: name, + config: cfg, + driver: driver, + }, nil +} + +// Get the data from the storage driver in io stream. +func (s *Store) Get(raw *Raw, writer io.Writer) error { + if err := isEmptyKey(raw.key); err != nil { + return err + } + return s.driver.Get(raw, writer) +} + +// GetBytes gets the data from the storage driver in bytes. +func (s *Store) GetBytes(raw *Raw) ([]byte, error) { + if err := isEmptyKey(raw.key); err != nil { + return nil, err + } + return s.driver.GetBytes(raw) +} + +// Put puts data into the storage in io stream. +func (s *Store) Put(raw *Raw, data io.Reader) error { + if err := isEmptyKey(raw.key); err != nil { + return err + } + return s.driver.Put(raw, data) +} + +// PutBytes puts data into the storage in bytes. +func (s *Store) PutBytes(raw *Raw, data []byte) error { + if err := isEmptyKey(raw.key); err != nil { + return err + } + return s.driver.PutBytes(raw, data) +} + +// Remove the data from the storage based on raw information. +func (s *Store) Remove(raw *Raw) error { + if err := isEmptyKey(raw.key); err != nil { + return err + } + return s.driver.Remove(raw) +} + +// Stat determine whether the data exists based on raw information. +// If that, and return some info that in the form of struct StorageInfo. +// If not, return the ErrNotFound. +func (s *Store) Stat(raw *Raw) (*StorageInfo, error) { + if err := isEmptyKey(raw.key); err != nil { + return nil, err + } + return s.driver.Stat(raw) +} + +func isEmptyKey(str string) error { + if strings.TrimSpace(str) == "" { + return ErrEmptyKey + } + return nil +} diff --git a/supernode/store/store_mgr.go b/supernode/store/store_mgr.go new file mode 100644 index 000000000..c7d03dbd4 --- /dev/null +++ b/supernode/store/store_mgr.go @@ -0,0 +1,73 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package store + +import ( + "fmt" + "sync" +) + +var ( + storeMap sync.Map + driverFactory = make(map[string]initFunc) +) + +type initFunc func(config interface{}) (StorageDriver, error) + +// Register defines an interface to register a driver with specified name. +// All drivers should call this function to register itself to the driverFactory. +func Register(name string, initializer initFunc) { + driverFactory[name] = initializer +} + +// Manager manage stores. +type Manager struct { +} + +// ManagerConfig wraps the config that defined in the config file. +type ManagerConfig struct { + driverName string + driverConfig interface{} +} + +// NewManager create a store manager. +func NewManager(configs map[string]*ManagerConfig) (*Manager, error) { + if configs == nil { + return nil, fmt.Errorf("empty configs") + } + for name, config := range configs { + // initialize store + store, err := NewStore(config.driverName, config.driverConfig) + if err != nil { + return nil, err + } + storeMap.Store(name, store) + } + return &Manager{}, nil +} + +// Get a store from manager with specified name. +func (sm *Manager) Get(name string) (*Store, error) { + v, ok := storeMap.Load(name) + if !ok { + return nil, fmt.Errorf("not existed storage: %s", name) + } + if store, ok := v.(*Store); ok { + return store, nil + } + return nil, fmt.Errorf("get store error: unknown reason") +}