Skip to content

Commit

Permalink
Add log for success tag check (#431)
Browse files Browse the repository at this point in the history
* Add log for success tag check

* fix

* fix

* fix
  • Loading branch information
piiswrong authored Nov 26, 2020
1 parent d306eb2 commit f44e2e7
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 13 deletions.
47 changes: 36 additions & 11 deletions fedlearner/data_join/data_portal_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,7 @@ def _update_portal_manifest(self, new_portal_manifest):

def _launch_new_portal_job(self):
assert self._sync_processing_job() is None
all_fpaths = self._list_input_dir()
rest_fpaths = []
for fpath in all_fpaths:
if fpath not in self._processed_fpath:
rest_fpaths.append(fpath)
rest_fpaths = self._list_input_dir()
if len(rest_fpaths) == 0:
logging.info("no file left for portal")
return
Expand Down Expand Up @@ -296,21 +292,50 @@ def _list_input_dir(self):
all_inputs = []
wildcard = self._portal_manifest.input_file_wildcard
dirs = [self._portal_manifest.input_base_dir]

num_dirs = 0
num_files = 0
num_target_files = 0
while len(dirs) > 0:
fdir = dirs[0]
dirs = dirs[1:]
has_succ = gfile.Exists(path.join(fdir, '_SUCCESS'))
fnames = gfile.ListDirectory(fdir)
for fname in fnames:
fpath = path.join(fdir, fname)
# OSS does not retain folder structure.
# For example, if we have file oss://test/1001/a.txt
# list(oss://test) returns 1001/a.txt instead of 1001
basename = path.basename(fpath)
if basename == '_SUCCESS':
continue
if gfile.IsDirectory(fpath):
dirs.append(fpath)
elif fname != '_SUCCESS' and (
len(wildcard) == 0 or fnmatch(fname, wildcard)):
if self._check_success_tag and not has_succ:
continue
num_dirs += 1
continue
num_files += 1
if len(wildcard) == 0 or fnmatch(basename, wildcard):
num_target_files += 1
if self._check_success_tag:
has_succ = gfile.Exists(
path.join(path.dirname(fpath), '_SUCCESS'))
if not has_succ:
logging.warning(
'File %s skipped because _SUCCESS file is '
'missing under %s',
fpath, fdir)
continue
all_inputs.append(fpath)
return all_inputs

rest_fpaths = []
for fpath in all_inputs:
if fpath not in self._processed_fpath:
rest_fpaths.append(fpath)
logging.info(
'Listing %s: found %d dirs, %d files, %d files matching wildcard, '
'%d files with success tag, %d new files to process',
self._portal_manifest.input_base_dir, num_dirs, num_files,
num_target_files, len(all_inputs), len(rest_fpaths))
return rest_fpaths

def _sync_job_part(self, job_id, partition_id):
if partition_id not in self._job_part_map or \
Expand Down
7 changes: 5 additions & 2 deletions test/data_join/test_data_portal_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,19 @@ def test_api(self):
if gfile.Exists(portal_input_base_dir):
gfile.DeleteRecursively(portal_input_base_dir)
gfile.MakeDirs(portal_input_base_dir)
all_fnames = ['{}.done'.format(i) for i in range(100)]
all_fnames = ['1001/{}.done'.format(i) for i in range(100)]
all_fnames.append('{}.xx'.format(100))
all_fnames.append('1001/_SUCCESS')
for fname in all_fnames:
fpath = os.path.join(portal_input_base_dir, fname)
gfile.MakeDirs(os.path.dirname(fpath))
with gfile.Open(fpath, "w") as f:
f.write('xxx')
portal_master_addr = 'localhost:4061'
portal_options = dp_pb.DataPotraMasterlOptions(
use_mock_etcd=True,
long_running=False
long_running=False,
check_success_tag=True,
)
data_portal_master = DataPortalMasterService(
int(portal_master_addr.split(':')[1]),
Expand Down

0 comments on commit f44e2e7

Please sign in to comment.