Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request #446 from Starnop/storage-backend
Browse files Browse the repository at this point in the history
feature: add storage driver for supernode
  • Loading branch information
lowzj authored Mar 22, 2019
2 parents 961b500 + 980e1e1 commit c3fc4f1
Show file tree
Hide file tree
Showing 9 changed files with 829 additions and 10 deletions.
7 changes: 7 additions & 0 deletions common/util/file_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"os"
"path/filepath"
"syscall"
)

// BufferSize define the buffer size when reading and writing file
Expand Down Expand Up @@ -206,3 +207,9 @@ func Md5Sum(name string) string {

return fmt.Sprintf("%x", h.Sum(nil))
}

// GetSys returns the underlying data source of the os.FileInfo.
func GetSys(info os.FileInfo) (*syscall.Stat_t, bool) {
sys, ok := info.Sys().(*syscall.Stat_t)
return sys, ok
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,29 @@
* limitations under the License.
*/

package initializer
package util

import "syscall"
import (
"syscall"
"time"
)

// Atime returns the last access time in seconds
func Atime(stat *syscall.Stat_t) int64 {
// Atime returns the last access time in time.Time.
func Atime(stat *syscall.Stat_t) time.Time {
return time.Unix(stat.Atimespec.Sec, stat.Atimespec.Nsec)
}

// AtimeSec returns the last access time in seconds.
func AtimeSec(stat *syscall.Stat_t) int64 {
return stat.Atimespec.Sec
}

// Ctime returns the create time in time.Time.
func Ctime(stat *syscall.Stat_t) time.Time {
return time.Unix(stat.Ctimespec.Sec, stat.Ctimespec.Nsec)
}

// CtimeSec returns the create time in seconds.
func CtimeSec(stat *syscall.Stat_t) int64 {
return stat.Ctimespec.Sec
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,29 @@
* limitations under the License.
*/

package initializer
package util

import "syscall"
import (
"syscall"
"time"
)

// Atime returns the last access time in seconds
func Atime(stat *syscall.Stat_t) int64 {
// Atime returns the last access time in time.Time.
func Atime(stat *syscall.Stat_t) time.Time {
return time.Unix(stat.Atim.Sec, stat.Atim.Nsec)
}

// AtimeSec returns the last access time in seconds.
func AtimeSec(stat *syscall.Stat_t) int64 {
return stat.Atim.Sec
}

// Ctime returns the create time in time.Time.
func Ctime(stat *syscall.Stat_t) time.Time {
return time.Unix(stat.Ctim.Sec, stat.Ctim.Nsec)
}

// CtimeSec returns the create time in seconds.
func CtimeSec(stat *syscall.Stat_t) int64 {
return stat.Ctim.Sec
}
4 changes: 2 additions & 2 deletions dfdaemon/initializer/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,13 @@ func cleanLocalRepo(options *options.Options) {
return nil
}
// get the last access time
statT, ok := info.Sys().(*syscall.Stat_t)
statT, ok := util.GetSys(info)
if !ok {
log.Warnf("ignore %s: failed to get last access time", path)
return nil
}
// if the last access time is 1 hour ago
if time.Now().Unix()-Atime(statT) >= 3600 {
if time.Now().Unix()-util.AtimeSec(statT) >= 3600 {
if err := os.Remove(path); err == nil {
log.Infof("remove file:%s success", path)
} else {
Expand Down
285 changes: 285 additions & 0 deletions supernode/store/local_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package store

import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"sync"
"sync/atomic"
"time"

"github.com/dragonflyoss/Dragonfly/common/util"
)

// LocalStorageDriver is a const of local storage driver.
const LocalStorageDriver = "local"

var fileMutexLocker sync.Map

func init() {
Register(LocalStorageDriver, NewLocalStorage)
}

type fileMutex struct {
count int32
sync.RWMutex
}

func getLock(key string, ro bool) {
v, _ := fileMutexLocker.LoadOrStore(key, &fileMutex{})
f := v.(*fileMutex)

if ro {
f.RLock()
} else {
f.Lock()
}

atomic.AddInt32(&f.count, 1)
}

func releaseLock(key string, ro bool) {
v, ok := fileMutexLocker.Load(key)
if !ok {
// return fmt.Errorf("panic error")
}
f := v.(*fileMutex)

atomic.AddInt32(&f.count, -1)

if ro {
f.RUnlock()
} else {
f.Unlock()
}

if f.count < 1 {
fileMutexLocker.Delete(key)
}
}

// LocalStorage is one of the implementions of StorageDriver by locally.
type localStorage struct {
// baseDir is the dir that local storage driver will store content based on it.
baseDir string
}

// NewLocalStorage performs initialization for LocalStorage and return a StorageDriver.
func NewLocalStorage(config interface{}) (StorageDriver, error) {
// type assertion for config
cfg, ok := config.(*localStorage)
if !ok {
return nil, fmt.Errorf("failed to parse config")
}

// prepare the base dir
if !path.IsAbs(cfg.baseDir) {
return nil, fmt.Errorf("not absolute path: %s", cfg.baseDir)
}
if err := util.CreateDirectory(cfg.baseDir); err != nil {
return nil, err
}

return &localStorage{
baseDir: cfg.baseDir,
}, nil
}

// Get the content of key from storage and return in io stream.
func (ls *localStorage) Get(raw *Raw, writer io.Writer) error {
path, _, err := ls.statPath(raw.key)
if err != nil {
return err
}

getLock(getLockKey(path, raw.offset), true)
defer releaseLock(getLockKey(path, raw.offset), true)

f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()

f.Seek(raw.offset, 0)
if raw.length <= 0 {
_, err = io.Copy(writer, f)
} else {
_, err = io.CopyN(writer, f, raw.length)
}

if err != nil {
return err
}
return nil
}

// GetBytes gets the content of key from storage and return in bytes.
func (ls *localStorage) GetBytes(raw *Raw) (data []byte, err error) {
path, _, err := ls.statPath(raw.key)
if err != nil {
return nil, err
}

getLock(getLockKey(path, raw.offset), true)
defer releaseLock(getLockKey(path, raw.offset), true)

f, err := os.Open(path)
if err != nil {
return nil, err
}
defer f.Close()

f.Seek(raw.offset, 0)
if raw.length <= 0 {
data, err = ioutil.ReadAll(f)
} else {
data = make([]byte, raw.length)
_, err = f.Read(data)
}

if err != nil {
return nil, err
}
return data, nil
}

// Put reads the content from reader and put it into storage.
func (ls *localStorage) Put(raw *Raw, data io.Reader) error {
path, err := ls.preparePath(raw.key)
if err != nil {
return err
}

getLock(getLockKey(path, raw.offset), false)
defer releaseLock(getLockKey(path, raw.offset), false)

f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644)
if err != nil {
return err
}
defer f.Close()

f.Seek(raw.offset, 0)
if _, err = io.Copy(f, data); err != nil {
return err
}

return nil
}

// PutBytes puts the content of key from storage with bytes.
func (ls *localStorage) PutBytes(raw *Raw, data []byte) error {
path, err := ls.preparePath(raw.key)
if err != nil {
return err
}

getLock(getLockKey(path, raw.offset), false)
defer releaseLock(getLockKey(path, raw.offset), false)

f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644)
if err != nil {
return err
}
defer f.Close()

f.Seek(raw.offset, 0)
if _, err := f.Write(data); err != nil {
return err
}

return nil
}

// Stat determine whether the file exists.
func (ls *localStorage) Stat(raw *Raw) (*StorageInfo, error) {
path, fileInfo, err := ls.statPath(raw.key)
if err != nil {
return nil, err
}

sys, ok := util.GetSys(fileInfo)
if !ok {
return nil, fmt.Errorf("get create time error")
}
return &StorageInfo{
Path: path,
Size: fileInfo.Size(),
CreateTime: util.Ctime(sys),
ModTime: fileInfo.ModTime(),
}, nil
}

// Remove deletes a file or dir.
func (ls *localStorage) Remove(raw *Raw) error {
path, _, err := ls.statPath(raw.key)
if err != nil {
return err
}

getLock(getLockKey(path, raw.offset), false)
defer releaseLock(getLockKey(path, raw.offset), false)

if err := os.RemoveAll(path); err != nil {
return err
}
return nil
}

// helper function

// preparePath gets the target path and creates the upper directory if it does not exist.
func (ls *localStorage) preparePath(key string) (string, error) {
dir := path.Join(ls.baseDir, getPrefix(key))

if err := util.CreateDirectory(dir); err != nil {
return "", err
}

target := path.Join(dir, key)
return target, nil
}

// statPath determines whether the target file exists and returns an fileMutex if so.
func (ls *localStorage) statPath(key string) (string, os.FileInfo, error) {
filePath := path.Join(ls.baseDir, getPrefix(key), key)
f, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
return "", nil, ErrNotFound
}
return "", nil, err
}

return filePath, f, nil
}

func getLockKey(path string, offset int64) string {
return fmt.Sprintf("%s%d%d", path, offset, time.Now().Unix())
}

func getPrefix(str string) string {
if len(str) > 3 {
return string([]byte(str)[:3])
}
return str
}
Loading

0 comments on commit c3fc4f1

Please sign in to comment.