-
Notifications
You must be signed in to change notification settings - Fork 1
/
_client.py
121 lines (94 loc) · 3.86 KB
/
_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import argparse
import sys
import pyarrow as pa
import pyarrow.flight as fl
def get_by_ticket(args, client):
ticket_name = args.name
response = client.do_get(fl.Ticket(ticket_name)).read_all()
print_response(response)
def get_by_ticket_pandas(args, client):
ticket_name = args.name
response = client.do_get(fl.Ticket(ticket_name)).read_pandas()
print_response(response)
def get_schema(args, client):
path = args.path
response = client.get_schema(fl.FlightDescriptor.for_path(path))
print_response(response.schema)
def get_endpoints(args, client):
path = args.path
response = client.get_flight_info(fl.FlightDescriptor.for_path(path))
print_response(response.endpoints)
def do_put(args, client):
path = args.path
values = args.values.split(',')
table = pa.Table.from_arrays([pa.array(values)], names=['column1'])
writer, _ = client.do_put(fl.FlightDescriptor.for_path(path), table.schema)
writer.write_table(table, len(values))
writer.close()
def list_actions(args, client):
response = client.list_actions()
print_response(response)
def do_action(args, client):
action_type = args.type
response = client.do_action(pa.flight.Action(action_type, pa.allocate_buffer(0)))
print("=== Response ===")
for r in response:
print(r.body.to_pybytes())
print("================")
def list_flights(args, client):
response = client.list_flights()
print("=== Response ===")
for r in response:
print(r.descriptor)
print(r.schema)
print(r.endpoints)
print(r.total_records)
print("================")
def print_response(data):
print("=== Response ===")
print(data)
print("================")
def main():
parser = argparse.ArgumentParser()
subcommands = parser.add_subparsers()
cmd_get_by_t = subcommands.add_parser('get_by_ticket')
cmd_get_by_t.set_defaults(action='get_by_ticket')
cmd_get_by_t.add_argument('-n', '--name', type=str, help="Name of the ticket to fetch.")
cmd_get_by_tp = subcommands.add_parser('get_by_ticket_pandas')
cmd_get_by_tp.set_defaults(action='get_by_ticket_pandas')
cmd_get_by_tp.add_argument('-n', '--name', type=str, help="Name of the ticket to fetch.")
cmd_get_schema = subcommands.add_parser('get_schema')
cmd_get_schema.set_defaults(action='get_schema')
cmd_get_schema.add_argument('-p', '--path', type=str, help="Descriptor path.")
cmd_get_endpoints = subcommands.add_parser('get_endpoints')
cmd_get_endpoints.set_defaults(action='get_endpoints')
cmd_get_endpoints.add_argument('-p', '--path', type=str, help="Descriptor path.")
cmd_do_put = subcommands.add_parser('do_put')
cmd_do_put.set_defaults(action='do_put')
cmd_do_put.add_argument('-p', '--path', type=str, help="Descriptor path.")
cmd_do_put.add_argument('-v', '--values', type=str, help="Values to put on server.")
cmd_list_actions = subcommands.add_parser('list_actions')
cmd_list_actions.set_defaults(action='list_actions')
cmd_do_action = subcommands.add_parser('do_action')
cmd_do_action.set_defaults(action='do_action')
cmd_do_action.add_argument('-t', '--type', type=str, help="Type of action.")
cmd_list_flights = subcommands.add_parser('list_flights')
cmd_list_flights.set_defaults(action='list_flights')
args = parser.parse_args()
if not hasattr(args, 'action'):
parser.print_help()
sys.exit(1)
commands = {
'get_by_ticket': get_by_ticket,
'get_by_ticket_pandas': get_by_ticket_pandas,
'get_schema': get_schema,
'get_endpoints': get_endpoints,
'list_flights': list_flights,
'do_put': do_put,
'list_actions': list_actions,
'do_action': do_action,
}
client = fl.connect("grpc://0.0.0.0:8815")
commands[args.action](args, client)
if __name__ == '__main__':
main()