From ec753061dc569f178ff3660272211f33adfe2efc Mon Sep 17 00:00:00 2001 From: luoyuedong Date: Thu, 29 Dec 2022 16:31:51 +0800 Subject: [PATCH] fix append bug (#974) * fix: hdfs readdir bug * fix append error --- pkg/fs/client/ufs/hdfs.go | 62 +++++++++---- pkg/fs/client/ufs/hdfs_test.go | 156 +++++++++++++++++++++++++++++++++ pkg/fs/client/ufs/s3.go | 13 ++- 3 files changed, 210 insertions(+), 21 deletions(-) diff --git a/pkg/fs/client/ufs/hdfs.go b/pkg/fs/client/ufs/hdfs.go index 3e68edd6a..59b3c6b3a 100644 --- a/pkg/fs/client/ufs/hdfs.go +++ b/pkg/fs/client/ufs/hdfs.go @@ -359,12 +359,45 @@ func (fs *hdfsFileSystem) Open(name string, flags uint32, size uint64) (FileHand fs.Lock() defer fs.Unlock() - return &hdfsFileHandle{ - name: name, - reader: nil, - fs: fs, - writer: nil, - }, nil + // read only + if flag&syscall.O_ACCMODE == syscall.O_RDONLY { + reader, err := fs.client.Open(fs.GetPath(name)) + if err != nil { + log.Errorf("hdfs client open err: %v", err) + return nil, err + } + + return &hdfsFileHandle{ + name: name, + reader: reader, + fs: fs, + writer: nil, + }, nil + } + + // append only + + if flag&syscall.O_APPEND != 0 { + // hdfs nameNode maybe not release fh, has error: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException + for i := 0; i < 3; i++ { + writer, err := fs.client.Append(fs.GetPath(name)) + if err != nil { + if fs.shouldRetry(err) { + time.Sleep(100 * time.Millisecond * time.Duration(i*i)) + continue + } + log.Errorf("hdfs client append err: %v", err) + return nil, err + } + return &hdfsFileHandle{ + name: name, + reader: nil, + fs: fs, + writer: writer, + }, nil + } + } + return nil, syscall.ENOTSUP } func (fs *hdfsFileSystem) Create(name string, flags, mode uint32) (fd FileHandle, err error) { @@ -455,12 +488,9 @@ var _ FileHandle = &hdfsFileHandle{} func (fh *hdfsFileHandle) Read(buf []byte, off uint64) (int, error) { log.Tracef("hdfs read: fh.name[%s], offset[%d]", fh.name, off) if fh.reader == nil { - reader, err := fh.fs.client.Open(fh.fs.GetPath(fh.name)) - if err != nil { - log.Errorf("hdfs client open err: %v", err) - return 0, err - } - fh.reader = reader + err := fmt.Errorf("hdfs read: file[%s] bad file descriptor reader==nil", fh.name) + log.Errorf(err.Error()) + return 0, err } n, err := fh.reader.ReadAt(buf, int64(off)) if err != nil && err != io.EOF { @@ -482,11 +512,9 @@ func (fh *hdfsFileHandle) Write(data []byte, off uint64) (uint32, error) { log.Tracef("hdfs write: fh.name[%s], dataLength[%d], offset[%d], fh[%+v]", fh.name, len(data), off, fh) var err error if fh.writer == nil { - fh.writer, err = fh.fs.client.Append(fh.fs.GetPath(fh.name)) - if err != nil { - log.Errorf("hdfs client append err: %v", err) - return 0, err - } + err = fmt.Errorf("hdfs write: file[%s] bad file descriptor writer==nil", fh.name) + log.Errorf(err.Error()) + return 0, err } n, err := fh.writer.Write(data) diff --git a/pkg/fs/client/ufs/hdfs_test.go b/pkg/fs/client/ufs/hdfs_test.go index f5c101159..a008bc30f 100644 --- a/pkg/fs/client/ufs/hdfs_test.go +++ b/pkg/fs/client/ufs/hdfs_test.go @@ -23,6 +23,7 @@ import ( "os" "reflect" "sync" + "syscall" "testing" "github.com/agiledragon/gomonkey/v2" @@ -262,3 +263,158 @@ func Test_hdfsFileHandle_Write(t *testing.T) { }) } } + +func Test_hdfsFileSystem_OpenRead(t *testing.T) { + type fields struct { + client *hdfs.Client + subpath string + blockSize int64 + replication int + Mutex sync.Mutex + } + type args struct { + name string + flags uint32 + size uint64 + } + tests := []struct { + name string + fields fields + args args + want FileHandle + wantErr assert.ErrorAssertionFunc + }{ + { + name: "want read open err", + fields: fields{ + client: &hdfs.Client{}, + subpath: "./", + }, + args: args{ + name: "test", + flags: uint32(1), + }, + want: nil, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return true + }, + }, + { + name: "want read open nil", + fields: fields{ + client: &hdfs.Client{}, + subpath: "./", + }, + args: args{ + name: "test", + flags: uint32(1), + }, + want: nil, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return false + }, + }, + } + + var p1 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) { + return nil, fmt.Errorf("open fail") + }) + defer p1.Reset() + + var p4 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfsFileSystem{}), "GetOpenFlags", func(_ *hdfsFileSystem, name string, flags uint32) int { + return syscall.O_RDONLY + }) + defer p4.Reset() + + for _, tt := range tests { + if tt.name == "want read open nil" { + var p2 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) { + return nil, nil + }) + defer p2.Reset() + } + t.Run(tt.name, func(t *testing.T) { + fs := &hdfsFileSystem{ + client: tt.fields.client, + subpath: tt.fields.subpath, + blockSize: tt.fields.blockSize, + replication: tt.fields.replication, + Mutex: tt.fields.Mutex, + } + got, err := fs.Open(tt.args.name, tt.args.flags, tt.args.size) + if !tt.wantErr(t, err, fmt.Sprintf("Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)) { + return + } + assert.Equalf(t, tt.want, got, "Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size) + }) + } +} + +func Test_hdfsFileSystem_Open(t *testing.T) { + type fields struct { + client *hdfs.Client + subpath string + blockSize int64 + replication int + Mutex sync.Mutex + } + type args struct { + name string + flags uint32 + size uint64 + } + tests := []struct { + name string + fields fields + args args + want FileHandle + wantErr assert.ErrorAssertionFunc + }{ + { + name: "want retry err", + fields: fields{ + client: &hdfs.Client{}, + subpath: "./", + }, + args: args{ + name: "test", + flags: uint32(1), + }, + want: nil, + wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { + return true + }, + }, + } + + var p1 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Open", func(_ *hdfs.Client, name string) (*hdfs.FileReader, error) { + return nil, nil + }) + defer p1.Reset() + var p2 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfs.Client{}), "Append", func(_ *hdfs.Client, name string) (*hdfs.FileWriter, error) { + return nil, fmt.Errorf("org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException") + }) + defer p2.Reset() + + var p4 = gomonkey.ApplyMethod(reflect.TypeOf(&hdfsFileSystem{}), "GetOpenFlags", func(_ *hdfsFileSystem, name string, flags uint32) int { + return syscall.O_WRONLY | syscall.O_APPEND + }) + defer p4.Reset() + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fs := &hdfsFileSystem{ + client: tt.fields.client, + subpath: tt.fields.subpath, + blockSize: tt.fields.blockSize, + replication: tt.fields.replication, + Mutex: tt.fields.Mutex, + } + got, err := fs.Open(tt.args.name, tt.args.flags, tt.args.size) + if !tt.wantErr(t, err, fmt.Sprintf("Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size)) { + return + } + assert.Equalf(t, tt.want, got, "Open(%v, %v, %v)", tt.args.name, tt.args.flags, tt.args.size) + }) + } +} diff --git a/pkg/fs/client/ufs/s3.go b/pkg/fs/client/ufs/s3.go index d9796636d..80bf952a2 100644 --- a/pkg/fs/client/ufs/s3.go +++ b/pkg/fs/client/ufs/s3.go @@ -941,10 +941,15 @@ func (fs *s3FileSystem) Get(name string, flags uint32, off, limit int64) (io.Rea request.Range = &r } - response, err := fs.s3.GetObject(request) - if err != nil { - log.Errorf("s3 get: s3.GetObject[%s] off[%d] limit[%d] err: %v ", name, off, limit, err) - return nil, err + var response *s3.GetObjectOutput + var err error + for i := 0; i < 3; i++ { + response, err = fs.s3.GetObject(request) + if err != nil { + log.Errorf("s3 get[%v]: s3.GetObject[%s] off[%d] limit[%d] err: %v ", i, name, off, limit, err) + } else { + break + } } return response.Body, err }