-
-
Notifications
You must be signed in to change notification settings - Fork 35
/
jm_dbus.py
159 lines (134 loc) · 5.28 KB
/
jm_dbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""
This module handles interacting with Bluez over DBus for JoustMania
"""
import os
import dbus
from xml.etree import ElementTree
BUS = dbus.SystemBus()
ORG_BLUEZ = 'org.bluez'
ORG_BLUEZ_PATH = '/org/bluez'
def get_hci_dict():
"""Get dictionary mapping hci number to address"""
proxy = get_root_proxy()
hcis = get_node_child_names(proxy)
hci_dict = {}
for hci in hcis:
proxy2 = get_adapter_proxy(hci)
addr = get_adapter_attrib(proxy2, 'Address')
hci_dict[hci] = str(addr)
return hci_dict
def get_attached_addresses(hci):
"""Get the addresses of devices known by hci"""
proxy = get_adapter_proxy(hci)
devices = get_node_child_names(proxy)
known_devices = []
for dev in devices:
proxy2 = get_device_proxy(hci, dev)
dev_addr = str(get_device_attrib(proxy2, 'Address'))
known_devices.append(dev_addr)
return known_devices
def get_bus():
"""Get DBus hook"""
return BUS
def get_root_proxy():
"""Get root Bluez DBus node"""
return BUS.get_object(ORG_BLUEZ, ORG_BLUEZ_PATH)
def enable_pairable(hci):
"""Allow devices to pair with the HCI"""
proxy = get_adapter_proxy(hci)
iface = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
if not iface.Get('org.bluez.Adapter1', 'Pairable'):
iface.Set('org.bluez.Adapter1', 'Pairable', True)
def disable_pairable(hci):
"""Prevent devices from pairing with the HCI"""
proxy = get_adapter_proxy(hci)
iface = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
if iface.Get('org.bluez.Adapter1', 'Pairable'):
iface.Set('org.bluez.Adapter1', 'Pairable', False)
def get_discovery_filters(hci):
"""Get information about discovery options"""
proxy = get_adapter_proxy(hci)
iface = dbus.Interface(proxy, 'org.bluez.Adapter1')
return iface.GetDiscoveryFilters()
def start_discovery(hci):
"""Start scanning for devices"""
proxy = get_adapter_proxy(hci)
iface = dbus.Interface(proxy, 'org.bluez.Adapter1')
if not get_adapter_attrib(proxy, 'Discovering').real:
try:
return iface.StartDiscovery()
except dbus.exceptions.DBusException as e:
if "InProgress" in str(e) or "NotReady" in str(e):
pass
else:
raise e
def stop_discovery(hci):
"""Stop scanning for devices"""
proxy = get_adapter_proxy(hci)
iface = dbus.Interface(proxy, 'org.bluez.Adapter1')
if get_adapter_attrib(proxy, 'Discovering').real:
try:
return iface.StopDiscovery()
except dbus.exceptions.DBusException as e:
if "InProgress" in str(e) or "NotReady" in str(e):
pass
else:
raise e
def remove_device(hci, dev):
hci_proxy = get_adapter_proxy(hci)
dev_proxy = get_device_proxy(hci, dev)
iface = dbus.Interface(hci_proxy, 'org.bluez.Adapter1')
return iface.RemoveDevice(dev_proxy)
def enable_adapter(hci):
"""Set the HCI's Powered attribute to true"""
proxy = get_adapter_proxy(hci)
iface = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
if iface.Get('org.bluez.Adapter1', 'Powered').real:
return False
else:
try:
print('Enabling adapter')
iface.Set('org.bluez.Adapter1', 'Powered', True)
return True
except dbus.exceptions.DBusException as e:
if "rfkill" in str(e):
rfkill_unblock(hci)
# Recurse after unblocking the bluetooth adapter
return enable_adapter(hci)
else:
raise e
def rfkill_unblock(hci):
hci_id = os.popen('rfkill list | grep {0} | cut -d ":" -f 1'.format(hci)).read().split('\n')[0]
os.popen('rfkill unblock {0}'.format(hci_id)).read()
def disable_adapter(hci):
"""Set the HCI's Powered attribute to false"""
proxy = get_adapter_proxy(hci)
iface = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
if iface.Get('org.bluez.Adapter1', 'Powered').real:
iface.Set('org.bluez.Adapter1', 'Powered', False)
return True
else:
return False
def get_adapter_proxy(hci):
"""Abstract getting Bluez DBus adapter nodes"""
hci_path = os.path.join(ORG_BLUEZ_PATH, hci)
return BUS.get_object(ORG_BLUEZ, hci_path)
def get_device_proxy(hci, dev):
"""Abstract getting Bluez DBus device nodes"""
device_path = os.path.join(ORG_BLUEZ_PATH, hci, dev)
return BUS.get_object(ORG_BLUEZ, device_path)
def get_bluez_attrib(proxy, kind, attrib):
"""Abstract getting attributes from Bluez DBus Interfaces"""
iface = dbus.Interface(proxy, 'org.freedesktop.DBus.Properties')
return iface.Get('org.bluez.{0}'.format(kind), attrib)
def get_adapter_attrib(proxy, attrib):
"""Abstract getting attributes from Bluez Adapter1 Interfaces"""
return get_bluez_attrib(proxy, 'Adapter1', attrib)
def get_device_attrib(proxy, attrib):
"""Abstract getting attributes from Bluez Device1 Interfaces"""
return get_bluez_attrib(proxy, 'Device1', attrib)
def get_node_child_names(proxy):
"""Abstract finding child nodes of a DBus Node"""
iface = dbus.Interface(proxy, 'org.freedesktop.DBus.Introspectable')
tree = ElementTree.fromstring(iface.Introspect())
return [child.attrib['name'] for child in tree if child.tag == 'node']