Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
feature: add storage driver
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <starnop@163.com>
  • Loading branch information
starnop committed Mar 12, 2019
1 parent f64fe80 commit b925271
Show file tree
Hide file tree
Showing 5 changed files with 680 additions and 0 deletions.
237 changes: 237 additions & 0 deletions supernode/store/local_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* Copyright The Dragonfly Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package store

import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"sync"

"github.com/dragonflyoss/Dragonfly/common/util"
)

// LocalStorageDriver is a const of local storage driver.
const LocalStorageDriver = "local"

func init() {
Register(LocalStorageDriver, NewLocalStorage)
}

type fileMutex struct {
path string
fileInfo os.FileInfo
sync.RWMutex
}

// LocalStorage is one of the implementions of StorageDriver by locally.
type localStorage struct {
baseDir string
buckets map[string]string
}

// NewLocalStorage performs initialization for LocalStorage and return a StorageDriver.
func NewLocalStorage(config interface{}) (StorageDriver, error) {
// type assertion for config
cfg, ok := config.(*localStorage)
if !ok {
return nil, fmt.Errorf("failed to parse config")
}

// prepare the base dir
if !path.IsAbs(cfg.baseDir) {
return nil, fmt.Errorf("Not absolute path: %s", cfg.baseDir)
}
if err := util.CreateDirectory(cfg.baseDir); err != nil {
return nil, err
}

return &localStorage{
baseDir: cfg.baseDir,
buckets: cfg.buckets,
}, nil
}

// Get the content of key from storage and return in io stream.
func (lb *localStorage) Get(raw *Raw, writer io.Writer) error {
file, err := lb.statPath(raw.bucket, raw.key)
if err != nil {
return err
}

file.RLock()
defer file.RUnlock()

f, err := os.Open(file.path)
if err != nil {
return err
}
defer f.Close()

f.Seek(raw.offset, 0)
if raw.length <= 0 {
_, err = io.Copy(writer, f)
} else {
_, err = io.CopyN(writer, f, raw.length)
}

if err != nil {
return err
}
return nil
}

// GetBytes gets the content of key from storage and return in bytes.
func (lb *localStorage) GetBytes(raw *Raw) (data []byte, err error) {
file, err := lb.statPath(raw.bucket, raw.key)
if err != nil {
return nil, err
}

file.RLock()
defer file.RUnlock()

f, err := os.Open(file.path)
if err != nil {
return nil, err
}
defer f.Close()

f.Seek(raw.offset, 0)
if raw.length <= 0 {
data, err = ioutil.ReadAll(f)
} else {
data = make([]byte, raw.length)
_, err = f.Read(data)
}

if err != nil {
return nil, err
}
return data, nil
}

// Put reads the content from reader and put it into storage.
func (lb *localStorage) Put(raw *Raw, data io.Reader) error {
file, err := lb.preparePath(raw.bucket, raw.key)
if err != nil {
return err
}

file.Lock()
defer file.Unlock()

f, err := os.OpenFile(file.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644)
if err != nil {
return err
}
defer f.Close()

f.Seek(raw.offset, 0)
if _, err = io.Copy(f, data); err != nil {
return err
}

return nil
}

// PutBytes puts the content of key from storage with bytes.
func (lb *localStorage) PutBytes(raw *Raw, data []byte) error {
file, err := lb.preparePath(raw.bucket, raw.key)
if err != nil {
return err
}

file.Lock()
defer file.Unlock()

f, err := os.OpenFile(file.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_SYNC, 0644)
if err != nil {
return err
}
defer f.Close()

f.Seek(raw.offset, 0)
if _, err := f.Write(data); err != nil {
return err
}

return nil
}

// Stat determine whether the file exists.
func (lb *localStorage) Stat(raw *Raw) (*StorageInfo, error) {
file, err := lb.statPath(raw.bucket, raw.key)
if err != nil {
return nil, err
}

return &StorageInfo{
Path: file.path,
Size: file.fileInfo.Size(),
ModTime: file.fileInfo.ModTime(),
}, nil
}

// Remove deletes a file or dir.
func (lb *localStorage) Remove(raw *Raw) error {
file, err := lb.statPath(raw.bucket, raw.key)
if err != nil {
return err
}

file.Lock()
defer file.Unlock()

if err := os.RemoveAll(file.path); err != nil {
return err
}
return nil
}

// helper function

// preparePath gets the target path and creates the upper directory if it does not exist.
func (lb *localStorage) preparePath(bucket, key string) (*fileMutex, error) {
dir := path.Join(lb.baseDir, key)

if err := util.CreateDirectory(dir); err != nil {
return nil, err
}

target := path.Join(dir, key+lb.buckets[bucket])
return &fileMutex{path: target}, nil
}

// statPath determines whether the target file exists and returns an fileMutex if so.
func (lb *localStorage) statPath(bucket, key string) (*fileMutex, error) {
filePath := path.Join(lb.baseDir, key, key+lb.buckets[bucket])
f, err := os.Stat(filePath)
if err != nil {
if os.IsNotExist(err) {
return nil, ErrNotFound
}
return nil, err
}

return &fileMutex{
path: filePath,
fileInfo: f,
}, nil
}
Loading

0 comments on commit b925271

Please sign in to comment.