-
Notifications
You must be signed in to change notification settings - Fork 0
/
xargs.py
executable file
·168 lines (125 loc) · 6.29 KB
/
xargs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
#!/usr/bin/python3
'''
An implementation of xargs functionality for python.
'''
import subprocess
import logging as logging
tab = '\t'
newline = '\n'
##################################
##################################
#
# FUNCTIONS
#
##################################
##################################
def executePipeline(pipelineOfCommands, outputAppend):
# By default, do NOT consume the input of the parent. It isn't insane to have
# multiple processes pulling from STDIN of the parent process; it just doesn't
# often make sense.
nextInput = subprocess.DEVNULL
lastOutput = None # By default, inherit parent's STDOUT.
processList = []
pipeline = pipelineOfCommands.copy()
logging.info("Executing: %s", pipeline)
if len(pipeline[0]) == 1: # Input file
inputFile = pipeline.pop(0)[0]
if inputFile is not None:
nextInput = open(inputFile,'rb')
logging.debug("Opened %s for reading", inputFile)
else:
nextInput = None # Redirect STDIN of parent to the pipeline.
logging.debug("Using STDIN as input for pipeline")
if len(pipeline[-1]) == 1: # Output file.
outputFile = pipeline.pop(-1)[0]
if outputFile is not None:
if outputAppend:
lastOutput = open(outputFile, 'a')
logging.debug("Opened %s for appending", outputFile)
else:
lastOutput = open(outputFile, 'w')
logging.debug("Opened %s for overwriting", outputFile)
while len(pipeline) > 1:
logging.debug("Starting process %s", pipeline[0])
process = subprocess.Popen(pipeline.pop(0), stdout = subprocess.PIPE, stdin=nextInput)
# We don't want to keep the redirected I/O stream open; leave that to the subprocess.
nextInput.close()
# Set up the output of this process as the input for the next.
nextInput = process.stdout
processList.append(process)
# This is the last process, and will be redirected to the output file instead:
logging.debug("Starting process %s", pipeline[0])
process = subprocess.Popen(pipeline.pop(0), stdout = lastOutput, stdin = nextInput)
if lastOutput is not None:
# Remove our open handle to the file, if we had one.
lastOutput.close()
processList.append(process)
logging.debug("Process list for this pipeline: %s", processList)
return processList
def waitForPipelineSize(processListList, targetSize):
logging.debug("There are %s pipelines running, waiting for %s to finish and drop down to %s.", len(processListList), len(processListList)-targetSize, targetSize)
# Until enough processes have completed to get us to our target size...
while len(processListList) > targetSize:
# Cycle through the pipelines in progress...
for pipelineIndex in range(len(processListList)):
# And the processes within them....
for processIndex in reversed(range(len(processListList[pipelineIndex]))):
# Check to see if the process has completed
result = processListList[pipelineIndex][processIndex].poll()
# Two valid results are None (process didn't end) or 0 (process terminated successfully).
if result == 0:
# Remove this process from the list of pending processes in this
# pipeline.:
processListList[pipelineIndex].pop(processIndex)
logging.debug("Completed pipeline %s process %s", pipelineIndex, processIndex)
elif result is not None: # Non-zero exit codes are errors.
raise RuntimeError("Process exited with error code %s"%(result))
# Remove any pipelines that were emptied.
# Process the pipelines in reverse order so that we don't change the indexing
# when we remove empties.
for pipelineIndex in reversed(range(len(processListList))):
if len(processListList[pipelineIndex]) == 0:
# If all of the processes in this pipeline are complete, remove the
# entry in the list so that we can refill.
processListList.pop(pipelineIndex)
logging.debug("Pipeline %s empty", pipelineIndex)
def xargs(listOfPipelinesOfCommands, concurrentPipelines, outputAppend=False):
# A list of lists of processes?
inProcessList = []
nextPipeline = 0
# Assume that all CommandsLists in the pipeline are the same length;
# this is an error otherwise. Also assumes we have access to the entire
# input list; it is possible that we could implement with iterators later.
while nextPipeline < len(listOfPipelinesOfCommands):
# Wait for enough processes to complete to bring the number of running
# pipelines down to one less than the max.
waitForPipelineSize(inProcessList, concurrentPipelines - 1)
while len(inProcessList) < concurrentPipelines and nextPipeline < len(listOfPipelinesOfCommands):
logging.debug("Executing new pipeline number %s", nextPipeline)
inProcessList.append(executePipeline(listOfPipelinesOfCommands[nextPipeline], outputAppend))
nextPipeline = nextPipeline + 1
# Now we have put all of our commands into the pipeline, we just have to wait for them to finish.
waitForPipelineSize(inProcessList, 0)
##################################
##################################
#
# MAIN
#
##################################
##################################
if __name__ == "__main__":
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
'''
Possible input format:
file > program with arguments | other program with other arguments > outputFile
secondFile > third program arguments | blah blah blha | more stuff? > differentFile
Python input structure:
[
[ ['infile1',], ['grep','CCACTACTT',], ['fold','-w', '10',], ['outfile1',],],
[ ['infile2',], ['grep','CCACTACTT',], ['fold','-w', '10',], ['outfile2',],],
]
'''
xargs([
[ ['infile1',], ['grep','CCACTACTT',], ['fold','-w', '10',], ['outfile1',],],
[ ['infile2',], ['grep','CCACTACTT',], ['fold','-w', '10',], ['outfile2',],],
] , 2, outputAppend=False)