-
Notifications
You must be signed in to change notification settings - Fork 1
/
reposclass.py
310 lines (237 loc) · 8.24 KB
/
reposclass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
################################################################################
#
#
#
#
################################################################################
import os
import re
import logging
import inspect
import subprocess
from ordereddict import OrderedDict
from debian_bundle import debfile
class RepositoryException(Exception):
pass
class ConfigurationException(Exception):
pass
def config_reader(file_path):
def parse(file_path):
settings = OrderedDict()
for i, line in enumerate(open(file_path)):
tokens = [x.strip() for x in line.split(":")]
if len(tokens) == 2:
try:
k, v = tokens
settings[k] = v
except Exception as e:
raise ConfigurationException(e)
else:
yield settings
settings = OrderedDict()
return [x for x in parse(file_path)]
config_reader("/mnt/tech/repositories/apt/auto-lucid/conf/distributions")
class reprepro(object):
def __init__(self, settings):
self.configuration = settings
self.listformat = '${$identifier} ${package} ${version}\n'
def action(self, action, *params):
cmd = ['reprepro']
if self.configuration.verbose:
cmd.append('-V')
if self.configuration.components:
cmd.append('-C')
cmd.extend(self.configuration.components)
if self.configuration.architectures:
cmd.append('-A')
cmd.extend(self.configuration.architectures)
cmd.extend(['-b', self.configuration.settings['basedir']])
cmd.append(action)
cmd.extend(params)
# ... build up cmd
return self._exec(cmd)
def _exec(self, cmd):
print " ".join(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
p.wait()
out, err = p.communicate()
if p.returncode:
raise RepositoryException('not a reprepro repository')
return out.strip().split('\n')
class dpkg_deb(object):
def __init__(self):
object.__init__(self)
def action(self, action, *args, **params):
cmd = ['dpkg-deb']
cmd.extend(['--showformat', '${Package}\t${Version}\t"${Description}",\t${Replaces},\t${Maintainer}'])
cmd.append(action)
for item in params.items():
cmd.extend(item)
cmd.extend(args)
return self._exec(cmd)
def _exec(self, cmd):
# print " ".join(cmd)
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
p.wait()
out, err = p.communicate()
if p.returncode:
raise RepositoryException('dpkg-deb fail')
return out.strip().split(';')
def deb_listchanges(path):
deb = debfile.DebFile(path)
chg = deb.changelog()
# handle if not changelog has been provided
try:
entries = chg._blocks
except:
entries = []
return [str(entry) for entry in entries]
class DefaultSetup(object):
"""
Configuration class. Provides options and settings used by reprepro cli.
"""
verbose = True
settings = {}
def __init__(self, **kwargs):
object.__init__(self)
self.set_path(**kwargs)
self.set_filters(**kwargs)
def set_path(self, **paths):
"""
Set/Update all paths for the repository. If only basedir is given all other
paths will be set relative to this path.
"""
# Set first, since the others depend on it.
self.settings['basedir'] = paths.get('basedir', '.')
# And then all the rest.
self.settings['dbdir'] = paths.get('dbdir', '%s/db' % self.settings['basedir'])
self.settings['outdir'] = paths.get('outdir', self.settings['basedir'])
self.settings['logdir'] = paths.get('logdir', '%s/logs' % self.settings['basedir'])
self.settings['confdir'] = paths.get('confdir', '%s/conf' % self.settings['basedir'])
self.settings['distdir'] = paths.get('distdir', '%s/dists' % self.settings['basedir'])
self.settings['listdir'] = paths.get('listdir', '%s/lists' % self.settings['basedir'])
self.settings['morgedir'] = paths.get('morgedir', '')
self.settings['methoddir'] = paths.get('methoddir', '')
def set_filters(self, **filters):
"""
Set global filter options.
"""
self.settings['components'] = filters.get('components', [])
self.settings['architectures'] = filters.get('architectures', [])
def __getattr__(self, attr):
try:
return self.settings[attr]
except:
raise AttributeError("%s has no attribute '%s'" % (self, attr))
def __repr__(self):
return str(self.settings)
class Repository(object):
"""
Top level class. Used for grabbing a repository instance and handling it
gracefully.
"""
def __init__(self, basedir):
object.__init__(self)
self.name = os.path.basename(basedir)
self.options = DefaultSetup(basedir=basedir)
def dist(self, codename):
d = Dist(codename)
d.options = self.options # inherit our current settings.
return d
def list_dists(self):
"""
Multi parse of conf/distributions. 1st parse detects the number of
Codenames declared in the file. 2nd parse returns Dist() objects for
each codename.
Returns - A list of dictionaries which contain the configuration for the
dists.
"""
return config_reader("%s/distributions" % self.options.confdir)
def list_components(self, codename):
for dist in self.list_dists():
if dist['Codename'] == codename:
components = []
for component in dist['Components'].split():
components.append({'component': component})
return components
def list_architectures(self, codename):
for dist in self.list_dists():
if dist['Codename'] == codename:
architectures = []
for arch in dist['Architectures'].split():
architectures.append({'architecture': arch})
return architectures
def list_changes(self, debfile):
return deb_listchanges(debfile)
### reprepro actions
def check(self):
return reprepro(self.options).action('check')
def dumpreferences(self):
"""Print out which files are marked to be needed by whom."""
# print inspect.stack()[0][3] # <-- could use to to generate annonymouse calls.
def parse_raw_package_data(raw):
try:
codename, component, arch, deb = re.split('\|+| ', raw)
fields = OrderedDict({'codename': codename, 'component': component, 'arch': arch, 'deb': deb})
except Exception as e:
logging.warn(e)
logging.warn('unparseable result 1=> %s' % raw)
fields = None
try:
package_info = dpkg_deb().action('--show', os.path.join(self.options.basedir, deb))[0]
# print package_info
package, version, description, replaces, maintainer = package_info.split('\t')
fields['package'] = package
fields['version'] = version
fields['description'] = description
fields['replaces'] = replaces
fields['maintainer'] = maintainer
except RepositoryException as e:
logging.warn(e)
logging.warn('dpkg fail')
fields = None
except Exception as e:
logging.warn(e)
logging.warn(package_info)
logging.warn('unparseable result 2=> %s' % raw)
fields = None
return fields
# format ='{"codename":"${$codename}", "component":"${$component}", "arch":"${$architecture}", "package":"${package}", "version":"${version}"}\t'
return [parse_raw_package_data(p) for p in reprepro(self.options).action('dumpreferences') if
parse_raw_package_data(p)]
def ls(self, package_name):
"""List the versions of the the specified package in all distributions."""
return reprepro(self.options).action('ls', package_name)
def list(self, codename, package_name=''):
def parse_raw_package_data(raw):
try:
var = re.split('\|+|\:', raw)
fields = OrderedDict({'codename': var[0], 'component': var[1], 'architecture': var[2]})
fields['package'], fields['version'] = var[3].split()
except Exception as e:
logging.warn(e)
logging.warn('unscriptable package => %s' % raw)
fields = {}
return fields
if package_name:
package_strings = reprepro(self.options).action('list', codename, package_name)
else:
package_strings = reprepro(self.options).action('list', codename)
packages = [parse_raw_package_data(p) for p in package_strings]
return packages
def listmatched(self, codename, glob):
return reprepro(self.options).action('listmatched', codename, glob)
class Dist(Repository):
"""
Distribution level class, performs actions on a specific distro within a
repo.
"""
def __init__(self, codename, basedir='.'):
self.codename = codename
Repository.__init__(self, basedir)
def listmatched(self, glob):
return reprepro(self.options).action('listmatched', self.codename, glob)
class Package(object):
def __init__(self, **kwargs):
if 'deb' in kwargs:
self.info_from_deb(kwargs['deb'])