Skip to content

Commit

Permalink
fix append bug (#974)
Browse files Browse the repository at this point in the history
* fix: hdfs readdir bug

* fix append error
  • Loading branch information
luoyuedong authored Dec 29, 2022
1 parent 04a992a commit ec75306
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 21 deletions.
62 changes: 45 additions & 17 deletions pkg/fs/client/ufs/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,45 @@ func (fs *hdfsFileSystem) Open(name string, flags uint32, size uint64) (FileHand
fs.Lock()
defer fs.Unlock()

return &hdfsFileHandle{
name: name,
reader: nil,
fs: fs,
writer: nil,
}, nil
// read only
if flag&syscall.O_ACCMODE == syscall.O_RDONLY {
reader, err := fs.client.Open(fs.GetPath(name))
if err != nil {
log.Errorf("hdfs client open err: %v", err)
return nil, err
}

return &hdfsFileHandle{
name: name,
reader: reader,
fs: fs,
writer: nil,
}, nil
}

// append only

if flag&syscall.O_APPEND != 0 {
// hdfs nameNode maybe not release fh, has error: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException
for i := 0; i < 3; i++ {
writer, err := fs.client.Append(fs.GetPath(name))
if err != nil {
if fs.shouldRetry(err) {
time.Sleep(100 * time.Millisecond * time.Duration(i*i))
continue
}
log.Errorf("hdfs client append err: %v", err)
return nil, err
}
return &hdfsFileHandle{
name: name,
reader: nil,
fs: fs,
writer: writer,
}, nil
}
}
return nil, syscall.ENOTSUP
}

func (fs *hdfsFileSystem) Create(name string, flags, mode uint32) (fd FileHandle, err error) {
Expand Down Expand Up @@ -455,12 +488,9 @@ var _ FileHandle = &hdfsFileHandle{}
func (fh *hdfsFileHandle) Read(buf []byte, off uint64) (int, error) {
log.Tracef("hdfs read: fh.name[%s], offset[%d]", fh.name, off)
if fh.reader == nil {
reader, err := fh.fs.client.Open(fh.fs.GetPath(fh.name))
if err != nil {
log.Errorf("hdfs client open err: %v", err)
return 0, err
}
fh.reader = reader
err := fmt.Errorf("hdfs read: file[%s] bad file descriptor reader==nil", fh.name)
log.Errorf(err.Error())
return 0, err
}
n, err := fh.reader.ReadAt(buf, int64(off))
if err != nil && err != io.EOF {
Expand All @@ -482,11 +512,9 @@ func (fh *hdfsFileHandle) Write(data []byte, off uint64) (uint32, error) {
log.Tracef("hdfs write: fh.name[%s], dataLength[%d], offset[%d], fh[%+v]", fh.name, len(data), off, fh)
var err error
if fh.writer == nil {
fh.writer, err = fh.fs.client.Append(fh.fs.GetPath(fh.name))
if err != nil {
log.Errorf("hdfs client append err: %v", err)
return 0, err
}
err = fmt.Errorf("hdfs write: file[%s] bad file descriptor writer==nil", fh.name)
log.Errorf(err.Error())
return 0, err
}

n, err := fh.writer.Write(data)
Expand Down
156 changes: 156 additions & 0 deletions pkg/fs/client/ufs/hdfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"reflect"
"sync"
"syscall"
"testing"

"github.com/agiledragon/gomonkey/v2"
Expand Down Expand Up @@ -262,3 +263,158 @@ func Test_hdfsFileHandle_Write(t *testing.T) {
})
}
}

func Test_hdfsFileSystem_OpenRead(t *testing.T) {
type fields struct {
client *hdfs.Client
subpath string
blockSize int64
replication int
Mutex sync.Mutex
}
type args struct {
name string
flags uint32
size uint64
}
tests := []struct {
name string
fields fields
args args
want FileHandle
wantErr assert.ErrorAssertionFunc
}{
{
name: "want read open err",
fields: fields{
client: &hdfs.Client{},
subpath: "./",
},
args: args{
name: "test",
flags: uint32(1),
},
want: nil,
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
return true
},
},
{
name: "want read open nil",
fields: fields{
client: &hdfs.Client{},
subpath: "./",
},
args: args{
name: "test",
flags: uint32(1),
},
want: nil,
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
return false
},
},
}

var p1 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) {
return nil, fmt.Errorf("open fail")
})
defer p1.Reset()

var p4 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfsFileSystem{}), "GetOpenFlags", func(_ *hdfsFileSystem, name string, flags uint32) int {
return syscall.O_RDONLY
})
defer p4.Reset()

for _, tt := range tests {
if tt.name == "want read open nil" {
var p2 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) {
return nil, nil
})
defer p2.Reset()
}
t.Run(tt.name, func(t *testing.T) {
fs := &hdfsFileSystem{
client: tt.fields.client,
subpath: tt.fields.subpath,
blockSize: tt.fields.blockSize,
replication: tt.fields.replication,
Mutex: tt.fields.Mutex,
}
got, err := fs.Open(tt.args.name, tt.args.flags, tt.args.size)
if !tt.wantErr(t, err, fmt.Sprintf("Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)) {
return
}
assert.Equalf(t, tt.want, got, "Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)
})
}
}

func Test_hdfsFileSystem_Open(t *testing.T) {
type fields struct {
client *hdfs.Client
subpath string
blockSize int64
replication int
Mutex sync.Mutex
}
type args struct {
name string
flags uint32
size uint64
}
tests := []struct {
name string
fields fields
args args
want FileHandle
wantErr assert.ErrorAssertionFunc
}{
{
name: "want retry err",
fields: fields{
client: &hdfs.Client{},
subpath: "./",
},
args: args{
name: "test",
flags: uint32(1),
},
want: nil,
wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
return true
},
},
}

var p1 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) {
return nil, nil
})
defer p1.Reset()
var p2 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Append", func(_ *hdfs.Client, name string) (*hdfs.FileWriter, error) {
return nil, fmt.Errorf("org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException")
})
defer p2.Reset()

var p4 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfsFileSystem{}), "GetOpenFlags", func(_ *hdfsFileSystem, name string, flags uint32) int {
return syscall.O_WRONLY | syscall.O_APPEND
})
defer p4.Reset()

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fs := &hdfsFileSystem{
client: tt.fields.client,
subpath: tt.fields.subpath,
blockSize: tt.fields.blockSize,
replication: tt.fields.replication,
Mutex: tt.fields.Mutex,
}
got, err := fs.Open(tt.args.name, tt.args.flags, tt.args.size)
if !tt.wantErr(t, err, fmt.Sprintf("Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)) {
return
}
assert.Equalf(t, tt.want, got, "Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)
})
}
}
13 changes: 9 additions & 4 deletions pkg/fs/client/ufs/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,10 +941,15 @@ func (fs *s3FileSystem) Get(name string, flags uint32, off, limit int64) (io.Rea
request.Range = &r
}

response, err := fs.s3.GetObject(request)
if err != nil {
log.Errorf("s3 get: s3.GetObject[%s] off[%d] limit[%d] err: %v ", name, off, limit, err)
return nil, err
var response *s3.GetObjectOutput
var err error
for i := 0; i < 3; i++ {
response, err = fs.s3.GetObject(request)
if err != nil {
log.Errorf("s3 get[%v]: s3.GetObject[%s] off[%d] limit[%d] err: %v ", i, name, off, limit, err)
} else {
break
}
}
return response.Body, err
}
Expand Down

0 comments on commit ec75306

Please sign in to comment.