-
Notifications
You must be signed in to change notification settings - Fork 14
/
hb.py
82 lines (59 loc) · 2.29 KB
/
hb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""
An abstraction layer for pulling download metrics out of hbase.
The Thrift/Hbase API is a mess only a Java programmer could love. This is not a
complete wrapper; pieces are implemented as needed.
It's assumed that the value of TCells (which are returned as byte arrays)
should be converted to unsigned long longs.
"""
import struct
from thrift import Thrift
from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase, ttypes
exceptions = (Thrift.TException, ttypes.IOError, ttypes.IllegalArgument,
ttypes.AlreadyExists)
def convert(rows):
"""Unpack each value in the TCell to an unsigned long long."""
# It may be wiser to do this lazily.
for row in rows:
columns = row.columns
for key, tcell in columns.iteritems():
columns[key] = struct.unpack('!Q', tcell.value)[0]
return rows
class Client(object):
def __init__(self, host, port, table):
self.host = host
self.port = port
self.table = table
self.open()
def open(self):
socket = TSocket.TSocket(self.host, self.port)
self.transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
self.client = Hbase.Client(protocol)
self.transport.open()
def close(self):
self.transport.close()
def recycle(self):
self.close()
self.open()
def __del__(self):
self.close()
def scanner(self, start='', columns=None):
"""Get a new scanner on the table."""
id = self.client.scannerOpen(self.table, start, columns)
return Scanner(self, id)
def row(self, row_, columns=None):
"""Fetch the row_, optionally constrained to a list of columns."""
rv = self.client.getRowWithColumns(self.table, row_, columns)
return convert(rv) if rv else []
class Scanner(object):
def __init__(self, client, id):
self.client = client
self.id = id
def next(self):
"""Fetch the next row from the scanner."""
return convert(self.client.client.scannerGet(self.id))[0]
def list(self, num):
"""Fetch the next ``num`` rows from the scanner."""
return convert(self.client.client.scannerGetList(self.id, num))