From 6e4fd85ff0333e7e55627634dcd3d35feedf057d Mon Sep 17 00:00:00 2001 From: AstaFrode Date: Fri, 10 Nov 2023 10:23:00 +0800 Subject: [PATCH] Base bwlist (#117) * add access and account to cfg * add access control * add bwlist --- configs/configs.go | 16 +- configs/system.go | 2 +- go.mod | 2 +- go.sum | 4 +- node/authHandle.go | 7 + node/common.go | 32 +++ node/delHandle.go | 7 + node/getHandle.go | 8 +- node/getRestore.go | 6 + node/node.go | 2 +- node/postRestore.go | 30 +-- node/putHandle.go | 6 + node/tracker.go | 397 +++++++++++++------------------------ node/types.go | 3 +- pkg/confile/conf_test.yaml | 8 + pkg/confile/confile.go | 40 ++++ 16 files changed, 282 insertions(+), 288 deletions(-) diff --git a/configs/configs.go b/configs/configs.go index ad72b5d..a8b06fd 100644 --- a/configs/configs.go +++ b/configs/configs.go @@ -27,14 +27,14 @@ const ( TokenDated = 60 * 60 * 24 * 30 ) -const ( - // - FileCacheExpirationTime = 720 -) +const FileCacheExpirationTime = 720 + +// Time out waiting for transaction completion +const TimeOut_WaitBlock = time.Duration(time.Second * 15) + +const DefaultConfig = "conf.yaml" const ( - // Time out waiting for transaction completion - TimeOut_WaitBlock = time.Duration(time.Second * 15) - // - DefaultConfig = "conf.yaml" + Access_Public = "public" + Access_Private = "private" ) diff --git a/configs/system.go b/configs/system.go index 4cc5bb8..8dac3e5 100644 --- a/configs/system.go +++ b/configs/system.go @@ -14,7 +14,7 @@ const ( // Name space NameSpace = Name // version - Version = Name + " " + "v0.3.2" + Version = Name + " " + "v0.3.3 dev" // description Description = "Object storage service based on CESS network" ) diff --git a/go.mod b/go.mod index 13e135f..4ba3fc8 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/CESSProject/DeOSS go 1.20 require ( - github.com/CESSProject/cess-go-sdk v0.3.19 + github.com/CESSProject/cess-go-sdk v0.3.21-0.20231107093552-741d1c28e744 github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde github.com/CESSProject/p2p-go v0.2.4 github.com/btcsuite/btcutil v1.0.3-0.20201208143702-a53e38424cce diff --git a/go.sum b/go.sum index 26565c2..05b53c9 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGy github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= -github.com/CESSProject/cess-go-sdk v0.3.19 h1:13nO9Ox4okPRqQgv9us2L6/04RAKDo4eP2IwuLzjrvI= -github.com/CESSProject/cess-go-sdk v0.3.19/go.mod h1:x37J5WrzBYwFK3c2BkU9oLvhTmN0hrGwFV6NczTHrkM= +github.com/CESSProject/cess-go-sdk v0.3.21-0.20231107093552-741d1c28e744 h1:y/ZqscljT2jbl05rsq+v6hzuGWzEhlPYLxAQhtRUqmc= +github.com/CESSProject/cess-go-sdk v0.3.21-0.20231107093552-741d1c28e744/go.mod h1:x37J5WrzBYwFK3c2BkU9oLvhTmN0hrGwFV6NczTHrkM= github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde h1:5MDRjjtg6PEhqyVjupwaapN96cOZiddOGAYwKQeaTu0= github.com/CESSProject/go-keyring v0.0.0-20220614131247-ee3a8da30fde/go.mod h1:RUXBd3ROP98MYepEEa0Y0l/T0vQlIKqFJxI/ocdnRLM= github.com/CESSProject/p2p-go v0.2.4 h1:E/tJfeBGeLZ07jBec5dVUobT33pyA1aRDpEDgLqxvoM= diff --git a/node/authHandle.go b/node/authHandle.go index 6adbf41..38a467e 100644 --- a/node/authHandle.go +++ b/node/authHandle.go @@ -9,6 +9,7 @@ package node import ( "errors" + "fmt" "net/http" "time" @@ -42,6 +43,12 @@ func (n *Node) authHandle(c *gin.Context) { return } + if !n.AccessControl(req.Account) { + n.Log("info", fmt.Sprintf("[%v] %v", c.ClientIP(), ERR_Forbidden)) + c.JSON(http.StatusForbidden, ERR_Forbidden) + return + } + // Check publickey pubkey, err := sutils.ParsingPublickey(req.Account) if err != nil { diff --git a/node/common.go b/node/common.go index da93ab1..385b34f 100644 --- a/node/common.go +++ b/node/common.go @@ -11,6 +11,7 @@ import ( "net/http" "strings" + "github.com/CESSProject/DeOSS/configs" sutils "github.com/CESSProject/cess-go-sdk/core/utils" "github.com/CESSProject/go-keyring" jwt "github.com/dgrijalva/jwt-go" @@ -106,3 +107,34 @@ func (n *Node) verifySignature(account, message, signature string) ([]byte, erro } return nil, errors.New("signature verification failed") } + +func (n *Node) AccessControl(account string) bool { + if account == "" { + return false + } + err := sutils.VerityAddress(account, sutils.CessPrefix) + if err != nil { + return false + } + + bwlist := n.GetAccounts() + + if n.GetAccess() == configs.Access_Public { + for _, v := range bwlist { + if v == account { + return false + } + } + return true + } + + if n.GetAccess() == configs.Access_Private { + for _, v := range bwlist { + if v == account { + return true + } + } + } + + return false +} diff --git a/node/delHandle.go b/node/delHandle.go index 770b663..998aefa 100644 --- a/node/delHandle.go +++ b/node/delHandle.go @@ -103,6 +103,13 @@ func (n *Node) delFilesHandle(c *gin.Context) { c.JSON(respMsg.Code, err.Error()) return } + + if !n.AccessControl(account) { + n.Del("info", fmt.Sprintf("[%v] %v", c.ClientIP(), ERR_Forbidden)) + c.JSON(http.StatusForbidden, ERR_Forbidden) + return + } + n.Del("info", fmt.Sprintf("[%v] %v", clientIp, account)) var delList DelList diff --git a/node/getHandle.go b/node/getHandle.go index 9a17647..509d542 100644 --- a/node/getHandle.go +++ b/node/getHandle.go @@ -73,6 +73,13 @@ func (n *Node) getHandle(c *gin.Context) { n.Query("info", fmt.Sprintf("[%s] %s", clientIp, INFO_GetRequest)) cipher := c.Request.Header.Get(HTTPHeader_Cipher) + account := c.Request.Header.Get(HTTPHeader_Account) + + if !n.AccessControl(account) { + n.Query("info", fmt.Sprintf("[%v] %v", c.ClientIP(), ERR_Forbidden)) + c.JSON(http.StatusForbidden, ERR_Forbidden) + return + } queryName := c.Param(HTTP_ParameterName) if queryName == "version" { @@ -111,7 +118,6 @@ func (n *Node) getHandle(c *gin.Context) { } if len(queryName) != len(pattern.FileHash{}) { - account := c.Request.Header.Get(HTTPHeader_Account) if account == "" { n.Query("err", fmt.Sprintf("[%s] %s", clientIp, ERR_MissAccount)) c.JSON(http.StatusBadRequest, ERR_MissAccount) diff --git a/node/getRestore.go b/node/getRestore.go index b711601..eb6e609 100644 --- a/node/getRestore.go +++ b/node/getRestore.go @@ -44,6 +44,12 @@ func (n *Node) getRestoreHandle(c *gin.Context) { account = userAccount } + if !n.AccessControl(account) { + n.Upfile("info", fmt.Sprintf("[%v] %v", c.ClientIP(), ERR_Forbidden)) + c.JSON(http.StatusForbidden, ERR_Forbidden) + return + } + var userfils_cache userFiles data, err := n.Get([]byte(Cache_UserFiles + account)) if err == nil { diff --git a/node/node.go b/node/node.go index 7c56aff..8830ef0 100644 --- a/node/node.go +++ b/node/node.go @@ -246,7 +246,7 @@ func (n *Node) WriteTrackFile(filehash string, data []byte) error { return err } -func (n *Node) ParseTrackFromFile(filehash string) (RecordInfo, error) { +func (n *Node) ParseTrackFile(filehash string) (RecordInfo, error) { var result RecordInfo n.trackLock.RLock() defer n.trackLock.RUnlock() diff --git a/node/postRestore.go b/node/postRestore.go index e6e67be..bfa75ad 100644 --- a/node/postRestore.go +++ b/node/postRestore.go @@ -37,12 +37,12 @@ func (n *Node) postRestoreHandle(c *gin.Context) { if account != "" && signature != "" { pkey, err = n.verifySignature(account, message, signature) if err != nil { - n.Upfile("info", fmt.Sprintf("[%v] %v", clientIp, err)) + n.Log("info", fmt.Sprintf("[%v] %v", clientIp, err)) c.JSON(respMsg.Code, err.Error()) return } } else { - n.Upfile("info", fmt.Sprintf("[%v] %v", clientIp, err)) + n.Log("info", fmt.Sprintf("[%v] %v", clientIp, err)) c.JSON(respMsg.Code, err.Error()) return } @@ -50,6 +50,12 @@ func (n *Node) postRestoreHandle(c *gin.Context) { account = userAccount } + if !n.AccessControl(account) { + n.Log("info", fmt.Sprintf("[%v] %v", clientIp, ERR_Forbidden)) + c.JSON(http.StatusForbidden, ERR_Forbidden) + return + } + var restoreList RestoreList err = c.ShouldBind(&restoreList) if err != nil { @@ -69,7 +75,7 @@ func (n *Node) postRestoreHandle(c *gin.Context) { // verify the bucket name bucketName := c.Request.Header.Get(HTTPHeader_BucketName) if !sutils.CheckBucketName(bucketName) { - n.Upfile("info", fmt.Sprintf("[%v] %v", clientIp, ERR_HeaderFieldBucketName)) + n.Log("info", fmt.Sprintf("[%v] %v", clientIp, ERR_HeaderFieldBucketName)) c.JSON(http.StatusBadRequest, ERR_HeaderFieldBucketName) return } @@ -84,7 +90,7 @@ func (n *Node) postRestoreHandle(c *gin.Context) { } } if !flag { - n.Upfile("info", fmt.Sprintf("[%v] %v", clientIp, ERR_SpaceNotAuth)) + n.Log("info", fmt.Sprintf("[%v] %v", clientIp, ERR_SpaceNotAuth)) c.JSON(http.StatusForbidden, ERR_SpaceNotAuth) return } @@ -93,24 +99,24 @@ func (n *Node) postRestoreHandle(c *gin.Context) { userInfo, err := n.QueryUserSpaceSt(pkey) if err != nil { if err.Error() == pattern.ERR_Empty { - n.Upfile("info", fmt.Sprintf("[%v] %v", clientIp, ERR_AccountNotExist)) + n.Log("info", fmt.Sprintf("[%v] %v", clientIp, ERR_AccountNotExist)) c.JSON(http.StatusForbidden, ERR_AccountNotExist) return } - n.Upfile("err", fmt.Sprintf("[%v] %v", clientIp, err)) + n.Log("err", fmt.Sprintf("[%v] %v", clientIp, err)) c.JSON(http.StatusForbidden, ERR_RpcFailed) return } blockheight, err := n.QueryBlockHeight("") if err != nil { - n.Upfile("info", fmt.Sprintf("[%v] %v", clientIp, err)) + n.Log("info", fmt.Sprintf("[%v] %v", clientIp, err)) c.JSON(http.StatusForbidden, ERR_RpcFailed) return } if userInfo.Deadline < (blockheight + 100) { - n.Upfile("info", fmt.Sprintf("[%v] %v [%d] [%d]", clientIp, ERR_SpaceExpiresSoon, userInfo.Deadline, blockheight)) + n.Log("info", fmt.Sprintf("[%v] %v [%d] [%d]", clientIp, ERR_SpaceExpiresSoon, userInfo.Deadline, blockheight)) c.JSON(http.StatusForbidden, ERR_SpaceExpiresSoon) return } @@ -133,13 +139,13 @@ func (n *Node) postRestoreHandle(c *gin.Context) { usedSpace := allUsedSpace * 15 / 10 remainingSpace, err := strconv.ParseUint(userInfo.RemainingSpace, 10, 64) if err != nil { - n.Upfile("err", fmt.Sprintf("[%v] %v", clientIp, err)) + n.Log("err", fmt.Sprintf("[%v] %v", clientIp, err)) c.JSON(http.StatusInternalServerError, ERR_InternalServer) return } if usedSpace > int64(remainingSpace) { - n.Upfile("info", fmt.Sprintf("[%v] %v", clientIp, ERR_NotEnoughSpace)) + n.Log("info", fmt.Sprintf("[%v] %v", clientIp, ERR_NotEnoughSpace)) c.JSON(http.StatusForbidden, ERR_NotEnoughSpace) return } @@ -163,14 +169,14 @@ func (n *Node) postRestoreHandle(c *gin.Context) { b, err := json.Marshal(recordInfo) if err != nil { - n.Upfile("err", fmt.Sprintf("[%v] %v", clientIp, err)) + n.Log("err", fmt.Sprintf("[%v] %v", clientIp, err)) c.JSON(http.StatusInternalServerError, ERR_InternalServer) continue } err = n.WriteTrackFile(restoreList.Files[i], b) if err != nil { - n.Upfile("err", fmt.Sprintf("[%v] %v", clientIp, err)) + n.Log("err", fmt.Sprintf("[%v] %v", clientIp, err)) c.JSON(http.StatusInternalServerError, ERR_InternalServer) continue } diff --git a/node/putHandle.go b/node/putHandle.go index 8fe086a..ee71f64 100644 --- a/node/putHandle.go +++ b/node/putHandle.go @@ -74,6 +74,12 @@ func (n *Node) putHandle(c *gin.Context) { account = userAccount } + if !n.AccessControl(account) { + n.Upfile("info", fmt.Sprintf("[%v] %v", c.ClientIP(), ERR_Forbidden)) + c.JSON(http.StatusForbidden, ERR_Forbidden) + return + } + // verify the bucket name bucketName := c.Request.Header.Get(HTTPHeader_BucketName) if !sutils.CheckBucketName(bucketName) { diff --git a/node/tracker.go b/node/tracker.go index 71eade0..de78cd3 100644 --- a/node/tracker.go +++ b/node/tracker.go @@ -18,7 +18,6 @@ import ( "github.com/CESSProject/cess-go-sdk/core/pattern" sutils "github.com/CESSProject/cess-go-sdk/core/utils" "github.com/centrifuge/go-substrate-rpc-client/v4/types" - "github.com/mr-tron/base58" "github.com/pkg/errors" ) @@ -118,135 +117,42 @@ func (n *Node) trackFileThread(trackFile string) { func (n *Node) trackFile(trackfile string) error { var ( - err error - roothash string - b []byte - recordFile RecordInfo + err error + roothash string + recordFile RecordInfo + storageOrder pattern.StorageOrder ) roothash = filepath.Base(trackfile) - _, err = n.QueryFileMetadata(roothash) - if err != nil { - if err.Error() != pattern.ERR_Empty { - return errors.Wrapf(err, "[%s] [QueryFileMetadata]", roothash) - } - } else { - n.Track("info", fmt.Sprintf("[%s] File storage success", roothash)) - recordFile, err = n.ParseTrackFromFile(roothash) - if err == nil { - if len(recordFile.SegmentInfo) > 0 { - baseDir := filepath.Dir(recordFile.SegmentInfo[0].SegmentHash) - os.Rename(filepath.Join(baseDir, roothash), filepath.Join(n.GetDirs().FileDir, roothash)) - os.RemoveAll(baseDir) - } - } - n.DeleteTrackFile(roothash) - return nil - } - - _, err = n.QueryStorageOrder(roothash) - if err != nil { - if err.Error() != pattern.ERR_Empty { - return errors.Wrapf(err, "[%s] [QueryStorageOrder]", roothash) - } - - recordFile, err = n.ParseTrackFromFile(roothash) + for { + recordFile, err = n.ParseTrackFile(roothash) if err != nil { - n.DeleteTrackFile(roothash) - file := utils.FindFile(n.GetDirs().FileDir, roothash) - if file != "" { - os.RemoveAll(filepath.Dir(file)) - } return errors.Wrapf(err, "[ParseTrackFromFile]") } - recordFile.Putflag = false - recordFile.Count = 0 - b, err = json.Marshal(&recordFile) - if err != nil { - return errors.Wrapf(err, "[%s] [json.Marshal]", roothash) - } - err = n.WriteTrackFile(roothash, b) - if err != nil { - return errors.Wrapf(err, "[%s] [WriteTrackFile]", roothash) - } - // verify the space is authorized - authAccs, err := n.QuaryAuthorizedAccounts(recordFile.Owner) + _, err = n.QueryFileMetadata(roothash) if err != nil { if err.Error() != pattern.ERR_Empty { - return errors.Wrapf(err, "[%s] [QuaryAuthorizedAccount]", roothash) - } - } - var flag bool - for _, v := range authAccs { - if n.GetSignatureAcc() == v { - flag = true - break - } - } - if !flag { - baseDir := filepath.Dir(recordFile.SegmentInfo[0].SegmentHash) - os.RemoveAll(baseDir) - n.DeleteTrackFile(roothash) - user, _ := sutils.EncodePublicKeyAsCessAccount(recordFile.Owner) - return errors.Errorf("[%s] user [%s] deauthorization", roothash, user) - } - - _, err = n.GenerateStorageOrder( - roothash, - recordFile.SegmentInfo, - recordFile.Owner, - recordFile.Filename, - recordFile.Buckname, - recordFile.Filesize, - ) - if err != nil { - return errors.Wrapf(err, "[%s] [GenerateStorageOrder]", roothash) - } - } - - recordFile, err = n.ParseTrackFromFile(roothash) - if err != nil { - return errors.Wrapf(err, "[ParseTrackFromFile]") - } - - if roothash != recordFile.Roothash { - n.DeleteTrackFile(roothash) - return errors.Errorf("[%s] Recorded filehash [%s] error", roothash, recordFile.Roothash) - } - - // if recordFile.Putflag { - // if storageorder.AssignedMiner != nil { - // if uint8(storageorder.Count) == recordFile.Count { - // return nil - // } - // } - // } - - if recordFile.Duplicate { - _, err = n.QueryFileMetadata(roothash) - if err == nil { - _, err = n.GenerateStorageOrder(recordFile.Roothash, nil, recordFile.Owner, recordFile.Filename, recordFile.Buckname, recordFile.Filesize) - if err != nil { - return errors.Wrapf(err, " [%s] [GenerateStorageOrder]", roothash) + return errors.Wrapf(err, "[%s] [QueryFileMetadata]", roothash) } + } else { + n.Track("info", fmt.Sprintf("[%s] storage successful", roothash)) if len(recordFile.SegmentInfo) > 0 { baseDir := filepath.Dir(recordFile.SegmentInfo[0].SegmentHash) os.Rename(filepath.Join(baseDir, roothash), filepath.Join(n.GetDirs().FileDir, roothash)) os.RemoveAll(baseDir) } n.DeleteTrackFile(roothash) - n.Track("info", fmt.Sprintf("[%s] Duplicate file declaration suc", roothash)) return nil } - _, err = n.QueryStorageOrder(recordFile.Roothash) + + storageOrder, err = n.QueryStorageOrder(roothash) if err != nil { if err.Error() != pattern.ERR_Empty { return errors.Wrapf(err, "[%s] [QueryStorageOrder]", roothash) } - n.Track("info", fmt.Sprintf("[%s] Duplicate file become primary file", roothash)) - recordFile.Duplicate = false recordFile.Putflag = false - b, err = json.Marshal(&recordFile) + recordFile.Count = 0 + b, err := json.Marshal(&recordFile) if err != nil { return errors.Wrapf(err, "[%s] [json.Marshal]", roothash) } @@ -254,195 +160,164 @@ func (n *Node) trackFile(trackfile string) error { if err != nil { return errors.Wrapf(err, "[%s] [WriteTrackFile]", roothash) } - } - return nil - } - for { - _, err = n.backupFiles(recordFile.Owner, recordFile.SegmentInfo, roothash, recordFile.Filename, recordFile.Buckname, recordFile.Filesize) - if err != nil { - n.Track("err", fmt.Sprintf("[%s] [backupFiles] %v", roothash, err)) - time.Sleep(time.Second * 20) - } - break - } - - n.Track("info", fmt.Sprintf("[%s] File successfully transferred to all allocated storage nodes", roothash)) - - // recordFile.Putflag = true - // recordFile.Count = count - // b, err = json.Marshal(&recordFile) - // if err != nil { - // return errors.Wrapf(err, "[%s] [json.Marshal]", roothash) - // } - // err = n.WriteTrackFile(roothash, b) - // if err != nil { - // return errors.Wrapf(err, "[%s] [WriteTrackFile]", roothash) - // } - // n.Cache.Put([]byte("transfer:"+roothash), []byte(fmt.Sprintf("%v", count))) - return nil -} - -func (n *Node) backupFiles(owner []byte, segmentInfo []pattern.SegmentDataInfo, roothash, filename, bucketname string, filesize uint64) (uint8, error) { - var err error - var storageOrder pattern.StorageOrder - - _, err = n.QueryFileMetadata(roothash) - if err == nil { - return 0, nil - } + // verify the space is authorized + authAccs, err := n.QuaryAuthorizedAccounts(recordFile.Owner) + if err != nil { + if err.Error() != pattern.ERR_Empty { + return errors.Wrapf(err, "[%s] [QuaryAuthorizedAccount]", roothash) + } + } + var flag bool + for _, v := range authAccs { + if n.GetSignatureAcc() == v { + flag = true + break + } + } + if !flag { + if len(recordFile.SegmentInfo) > 0 { + baseDir := filepath.Dir(recordFile.SegmentInfo[0].SegmentHash) + os.RemoveAll(baseDir) + } + n.DeleteTrackFile(roothash) + user, _ := sutils.EncodePublicKeyAsCessAccount(recordFile.Owner) + return errors.Errorf("[%s] user [%s] deauthorization", roothash, user) + } - if segmentInfo == nil { - resegmentInfo, reHash, err := n.ShardedEncryptionProcessing(filepath.Join(n.GetDirs().FileDir, roothash), "") - if err != nil { - return 0, errors.Wrapf(err, "[ShardedEncryptionProcessing]") + _, err = n.GenerateStorageOrder( + roothash, + recordFile.SegmentInfo, + recordFile.Owner, + recordFile.Filename, + recordFile.Buckname, + recordFile.Filesize, + ) + if err != nil { + return errors.Wrapf(err, "[%s] [GenerateStorageOrder]", roothash) + } } - if reHash != reHash { - return 0, errors.Wrapf(err, "The re-stored file hash is not consistent, please store it separately and specify the original encryption key.") + + if roothash != recordFile.Roothash { + n.DeleteTrackFile(roothash) + return errors.Errorf("[%s] Recorded filehash [%s] error", roothash, recordFile.Roothash) } - segmentInfo = resegmentInfo - } - storageOrder, err = n.QueryStorageOrder(roothash) - if err != nil { - if err.Error() == pattern.ERR_Empty { - _, err = n.GenerateStorageOrder(roothash, segmentInfo, owner, filename, bucketname, filesize) - if err != nil { - // verify the space is authorized - authAccs, err := n.QuaryAuthorizedAccounts(owner) + // if recordFile.Putflag { + // if storageorder.AssignedMiner != nil { + // if uint8(storageorder.Count) == recordFile.Count { + // return nil + // } + // } + // } + + if recordFile.Duplicate { + _, err = n.QueryFileMetadata(roothash) + if err == nil { + _, err = n.GenerateStorageOrder(recordFile.Roothash, nil, recordFile.Owner, recordFile.Filename, recordFile.Buckname, recordFile.Filesize) if err != nil { - if err.Error() != pattern.ERR_Empty { - return 0, errors.Wrapf(err, "[QuaryAuthorizedAccount]") - } + return errors.Wrapf(err, " [%s] [GenerateStorageOrder]", roothash) } - var flag bool - for _, v := range authAccs { - if n.GetSignatureAcc() == v { - flag = true - break - } - } - if !flag { - baseDir := filepath.Dir(segmentInfo[0].SegmentHash) + if len(recordFile.SegmentInfo) > 0 { + baseDir := filepath.Dir(recordFile.SegmentInfo[0].SegmentHash) + os.Rename(filepath.Join(baseDir, roothash), filepath.Join(n.GetDirs().FileDir, roothash)) os.RemoveAll(baseDir) - n.DeleteTrackFile(roothash) - n.Delete([]byte("transfer:" + roothash)) - return 0, errors.New("user deauthorization") } - return 0, errors.Wrapf(err, "[GenerateStorageOrder]") + n.DeleteTrackFile(roothash) + n.Track("info", fmt.Sprintf("[%s] Duplicate file declaration suc", roothash)) + return nil + } + storageOrder, err = n.QueryStorageOrder(recordFile.Roothash) + if err != nil { + if err.Error() != pattern.ERR_Empty { + return errors.Wrapf(err, "[%s] [QueryStorageOrder]", roothash) + } + n.Track("info", fmt.Sprintf("[%s] Duplicate file become primary file", roothash)) + recordFile.Duplicate = false + recordFile.Putflag = false + b, err := json.Marshal(&recordFile) + if err != nil { + return errors.Wrapf(err, "[%s] [json.Marshal]", roothash) + } + err = n.WriteTrackFile(roothash, b) + if err != nil { + return errors.Wrapf(err, "[%s] [WriteTrackFile]", roothash) + } } + return nil } - return 0, errors.Wrapf(err, "[QueryStorageOrder]") - } - // store fragment to storage - err = n.storageData(roothash, segmentInfo, storageOrder.AssignedMiner, storageOrder.CompleteList) - if err != nil { - return 0, errors.Wrapf(err, "[storageData]") + if recordFile.SegmentInfo == nil { + resegmentInfo, reHash, err := n.ShardedEncryptionProcessing(filepath.Join(n.GetDirs().FileDir, roothash), "") + if err != nil { + return errors.Wrapf(err, "[ShardedEncryptionProcessing]") + } + if reHash != reHash { + return errors.Wrapf(err, "The re-stored file hash is not consistent, please store it separately and specify the original encryption key.") + } + recordFile.SegmentInfo = resegmentInfo + } + + n.storageData(recordFile.Roothash, recordFile.SegmentInfo, storageOrder.CompleteList) + time.Sleep(time.Minute) } - return uint8(storageOrder.Count), nil } -func (n *Node) storageData(roothash string, segment []pattern.SegmentDataInfo, minerTaskList []pattern.MinerTaskList, completeList []types.AccountID) error { +func (n *Node) storageData(roothash string, segment []pattern.SegmentDataInfo, completeList []types.AccountID) error { var err error - var fpath string var failed bool - var complete bool - // query all assigned miner multiaddr - peerids, accs, err := n.QueryAssignedMiner(minerTaskList) - if err != nil { - return errors.Wrapf(err, "[%s] [QueryAssignedMiner]", roothash) + var dataGroup = make(map[uint8][]string, len(segment[0].FragmentHash)) + for index := 0; index < len(segment[0].FragmentHash); index++ { + dataGroup[uint8(index+1)] = make([]string, len(segment[0].FragmentHash)) + for i := 0; i < len(segment); i++ { + for j := 0; j < len(segment[i].FragmentHash); j++ { + if index == j { + dataGroup[uint8(index+1)][index] = segment[i].FragmentHash[j] + break + } + } + } } - basedir := filepath.Dir(segment[0].FragmentHash[0]) - for i := 0; i < len(peerids); i++ { - complete = false - t, ok := n.HasBlacklist(peerids[i]) - if ok { - if time.Since(time.Unix(t, 0)).Hours() >= 1 { - n.DelFromBlacklist(peerids[i]) - } else { + allpeers := n.GetAllPeerId() + for index, v := range dataGroup { + n.Track("info", fmt.Sprintf("[%s] Prepare to store the %d batch of fragments", roothash, index)) + failed = false + for i := 0; i < len(allpeers); i++ { + t, ok := n.HasBlacklist(allpeers[i]) + if ok { + if time.Since(time.Unix(t, 0)).Hours() >= 1 { + n.DelFromBlacklist(allpeers[i]) + } continue } - } - addr, ok := n.GetPeer(peerids[i]) - if !ok { - addr, err = n.DHTFindPeer(peerids[i]) - if err != nil { - failed = true - n.Track("err", fmt.Sprintf("[%s] No assigned miner found: [%s] [%s]", roothash, accs[i], peerids[i])) + addr, ok := n.GetPeer(allpeers[i]) + if !ok { continue } - } - for j := 0; j < 3; j++ { err = n.Connect(n.GetCtxQueryFromCtxCancel(), addr) if err != nil { - failed = true - n.Track("err", fmt.Sprintf("[%s] Connect to miner [%s] failed: [%s]", roothash, accs[i], err)) - time.Sleep(pattern.BlockInterval) + n.AddToBlacklist(allpeers[i]) continue } - failed = false - break - } - - if failed { - n.AddToBlacklist(peerids[i]) - continue - } - - for j := 0; j < len(minerTaskList[i].Hash); j++ { - for k := 0; k < len(completeList); k++ { - if sutils.CompareSlice(minerTaskList[i].Account[:], completeList[k][:]) { - complete = true - break - } - } - if complete { - break - } - fpath = filepath.Join(basedir, string(minerTaskList[i].Hash[j][:])) - _, err = os.Stat(fpath) - if err != nil { - err = utils.CopyFile(filepath.Join(basedir, roothash), filepath.Join(n.GetDirs().FileDir, roothash)) - if err != nil { - failed = true - return errors.Wrapf(err, "[CopyFile]") - } - _, _, err = n.ProcessingData(filepath.Join(basedir, roothash)) + n.Track("info", fmt.Sprintf("[%s] Will transfer to %s", roothash, allpeers[i])) + for _, f := range v { + err = n.WriteFileAction(addr.ID, roothash, f) if err != nil { failed = true - return errors.Wrapf(err, "[ProcessingData]") + n.AddToBlacklist(allpeers[i]) + n.Track("err", fmt.Sprintf("[%s] [WriteFileAction] [%s] err: %v", roothash, allpeers[i], err)) + break } } - err = n.WriteFileAction(addr.ID, roothash, fpath) - if err != nil { - failed = true - n.AddToBlacklist(peerids[i]) - n.Track("err", fmt.Sprintf("[%s] [WriteFileAction] [%s] [%s] err: %v", roothash, accs[i], peerids[i], err)) + if !failed { + n.Track("info", fmt.Sprintf("[%s] The %d batch of data transfer was successful", roothash, index)) break } - n.Track("info", fmt.Sprintf("[%s] [%s] transfer to [%s] ", roothash, string(minerTaskList[i].Hash[j][:]), accs[i])) } } - if failed { - return errors.New("File storage failure") - } - return nil -} -func (n *Node) QueryAssignedMiner(minerTaskList []pattern.MinerTaskList) ([]string, []string, error) { - var peerids = make([]string, len(minerTaskList)) - var accs = make([]string, len(minerTaskList)) - for i := 0; i < len(minerTaskList); i++ { - minerInfo, err := n.QueryStorageMiner(minerTaskList[i].Account[:]) - if err != nil { - return peerids, accs, err - } - peerids[i] = base58.Encode([]byte(string(minerInfo.PeerId[:]))) - accs[i], _ = sutils.EncodePublicKeyAsCessAccount(minerTaskList[i].Account[:]) - } - return peerids, accs, nil + return nil } diff --git a/node/types.go b/node/types.go index 9fadda1..f29cb14 100644 --- a/node/types.go +++ b/node/types.go @@ -78,7 +78,8 @@ const ( ERR_HeadOperation = "HeaderErr_Invalid_Operation" - ERR_NotFound = "Not found" + ERR_NotFound = "Not found" + ERR_Forbidden = "no permission" ERR_BodyFormat = "BodyErr_InvalidDataFormat" ERR_BodyFieldAccount = "BodyErr_InvalidField_account" diff --git a/pkg/confile/conf_test.yaml b/pkg/confile/conf_test.yaml index 3624e49..d07551b 100644 --- a/pkg/confile/conf_test.yaml +++ b/pkg/confile/conf_test.yaml @@ -14,5 +14,13 @@ Workspace: / P2P_Port: 4001 # Service listening port HTTP_Port: 8080 +# Access mode: public / private +# In public mode, only users in accounts can't access it. +# In private mode, only users in accounts can access it. +Access: public +# Account black/white list +accounts: + - cXjeCHQW3totBGhQXdAUAqjCNqk1NhiR3UK37czSeUak2pqGV + - cXgaee2N8E77JJv9gdsGAckv1Qsf3hqWYf7NL4q6ZuQzuAUtB # If you want to expose your oss service, please configure its domain name Domain: "http://deoss-pub-gateway.cess.cloud" \ No newline at end of file diff --git a/pkg/confile/confile.go b/pkg/confile/confile.go index 691b0e5..21ad8ff 100644 --- a/pkg/confile/confile.go +++ b/pkg/confile/confile.go @@ -12,7 +12,9 @@ import ( "os" "path" + "github.com/CESSProject/DeOSS/configs" "github.com/CESSProject/cess-go-sdk/core/pattern" + "github.com/CESSProject/cess-go-sdk/core/utils" sutils "github.com/CESSProject/cess-go-sdk/core/utils" "github.com/centrifuge/go-substrate-rpc-client/v4/signature" "github.com/pkg/errors" @@ -39,6 +41,14 @@ Workspace: / P2P_Port: 4001 # Service listening port HTTP_Port: 8080 +# Access mode: public / private +# In public mode, only users in Accounts can't access it. +# In private mode, only users in Accounts can access it. +Access: public +# Account black/white list +Accounts: + - cX... + - cX... # If you want to expose your oss service, please configure its domain name Domain: ""` ) @@ -53,6 +63,8 @@ type Confile interface { GetMnemonic() string GetPublickey() ([]byte, error) GetAccount() string + GetAccess() string + GetAccounts() []string GetDomainName() string } @@ -63,6 +75,8 @@ type confile struct { Workspace string `name:"Workspace" toml:"Workspace" yaml:"Workspace"` P2P_Port int `name:"P2P_Port" toml:"P2P_Port" yaml:"P2P_Port"` HTTP_Port int `name:"HTTP_Port" toml:"HTTP_Port" yaml:"HTTP_Port"` + Access string `name:"Access" toml:"Access" yaml:"Access"` + Accounts []string `name:"Accounts" toml:"Accounts" yaml:"Accounts"` Domain string `name:"Domain" toml:"Domain" yaml:"Domain"` } @@ -114,6 +128,24 @@ func (c *confile) Parse(fpath string) error { return errors.New("The port number cannot exceed 65535") } + if c.Access != configs.Access_Public && c.Access != configs.Access_Private { + return errors.New("Invalid Access") + } + + var accounts = make(map[string]struct{}, 0) + for _, v := range c.Accounts { + err = sutils.VerityAddress(v, utils.CessPrefix) + if err != nil { + continue + } + accounts[v] = struct{}{} + } + var accountList = make([]string, 0) + for k, _ := range accounts { + accountList = append(accountList, k) + } + c.Accounts = accountList + err = sutils.CheckDomain(c.Domain) if err != nil { return errors.New("Invalid domain name") @@ -230,3 +262,11 @@ func (c *confile) GetAccount() string { func (c *confile) GetDomainName() string { return c.Domain } + +func (c *confile) GetAccess() string { + return c.Access +} + +func (c *confile) GetAccounts() []string { + return c.Accounts +}