-
-
Save egorf/66d88056a9d703928f93 to your computer and use it in GitHub Desktop.
| # ReachView code is placed under the GPL license. | |
| # Written by Egor Fedorov (egor.fedorov@emlid.com) | |
| # Copyright (c) 2015, Emlid Limited | |
| # All rights reserved. | |
| # If you are interested in using ReachView code as a part of a | |
| # closed source project, please contact Emlid Limited (info@emlid.com). | |
| # This file is part of ReachView. | |
| # ReachView is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # ReachView is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU General Public License for more details. | |
| # You should have received a copy of the GNU General Public License | |
| # along with ReachView. If not, see <http://www.gnu.org/licenses/>. | |
| import time | |
| import pexpect | |
| import subprocess | |
| import sys | |
| class BluetoothctlError(Exception): | |
| """This exception is raised, when bluetoothctl fails to start.""" | |
| pass | |
| class Bluetoothctl: | |
| """A wrapper for bluetoothctl utility.""" | |
| def __init__(self): | |
| out = subprocess.check_output("rfkill unblock bluetooth", shell = True) | |
| self.child = pexpect.spawn("bluetoothctl", echo = False) | |
| def get_output(self, command, pause = 0): | |
| """Run a command in bluetoothctl prompt, return output as a list of lines.""" | |
| self.child.send(command + "\n") | |
| time.sleep(pause) | |
| start_failed = self.child.expect(["bluetooth", pexpect.EOF]) | |
| if start_failed: | |
| raise BluetoothctlError("Bluetoothctl failed after running " + command) | |
| return self.child.before.split("\r\n") | |
| def start_scan(self): | |
| """Start bluetooth scanning process.""" | |
| try: | |
| out = self.get_output("scan on") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| def make_discoverable(self): | |
| """Make device discoverable.""" | |
| try: | |
| out = self.get_output("discoverable on") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| def parse_device_info(self, info_string): | |
| """Parse a string corresponding to a device.""" | |
| device = {} | |
| block_list = ["[\x1b[0;", "removed"] | |
| string_valid = not any(keyword in info_string for keyword in block_list) | |
| if string_valid: | |
| try: | |
| device_position = info_string.index("Device") | |
| except ValueError: | |
| pass | |
| else: | |
| if device_position > -1: | |
| attribute_list = info_string[device_position:].split(" ", 2) | |
| device = { | |
| "mac_address": attribute_list[1], | |
| "name": attribute_list[2] | |
| } | |
| return device | |
| def get_available_devices(self): | |
| """Return a list of tuples of paired and discoverable devices.""" | |
| try: | |
| out = self.get_output("devices") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| available_devices = [] | |
| for line in out: | |
| device = self.parse_device_info(line) | |
| if device: | |
| available_devices.append(device) | |
| return available_devices | |
| def get_paired_devices(self): | |
| """Return a list of tuples of paired devices.""" | |
| try: | |
| out = self.get_output("paired-devices") | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| paired_devices = [] | |
| for line in out: | |
| device = self.parse_device_info(line) | |
| if device: | |
| paired_devices.append(device) | |
| return paired_devices | |
| def get_discoverable_devices(self): | |
| """Filter paired devices out of available.""" | |
| available = self.get_available_devices() | |
| paired = self.get_paired_devices() | |
| return [d for d in available if d not in paired] | |
| def get_device_info(self, mac_address): | |
| """Get device info by mac address.""" | |
| try: | |
| out = self.get_output("info " + mac_address) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| return out | |
| def pair(self, mac_address): | |
| """Try to pair with a device by mac address.""" | |
| try: | |
| out = self.get_output("pair " + mac_address, 4) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["Failed to pair", "Pairing successful", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| def remove(self, mac_address): | |
| """Remove paired device by mac address, return success of the operation.""" | |
| try: | |
| out = self.get_output("remove " + mac_address, 3) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["not available", "Device has been removed", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| def connect(self, mac_address): | |
| """Try to connect to a device by mac address.""" | |
| try: | |
| out = self.get_output("connect " + mac_address, 2) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["Failed to connect", "Connection successful", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| def disconnect(self, mac_address): | |
| """Try to disconnect to a device by mac address.""" | |
| try: | |
| out = self.get_output("disconnect " + mac_address, 2) | |
| except BluetoothctlError, e: | |
| print(e) | |
| return None | |
| else: | |
| res = self.child.expect(["Failed to disconnect", "Successful disconnected", pexpect.EOF]) | |
| success = True if res == 1 else False | |
| return success | |
| if __name__ == "__main__": | |
| print("Init bluetooth...") | |
| bl = Bluetoothctl() | |
| print("Ready!") | |
| bl.start_scan() | |
| print("Scanning for 10 seconds...") | |
| for i in range(0, 10): | |
| print(i) | |
| time.sleep(1) | |
| print(bl.get_discoverable_devices()) | |
Why do you have to wait after sending a pair or connect request to bluetoothctl? It returns instantly, as far as I can see, and pexpect.expect() will wait for the result anyway.
I'm getting this error.... how can i resolve it...
Traceback (most recent call last):
File "/home/pi/Desktop/bluetoothctl.py", line 190, in
bl = Bluetoothctl()
File "/home/pi/Desktop/bluetoothctl.py", line 38, in init
out = subprocess.check_output("rfkill unblock bluetooth", shell = True)
File "/usr/lib/python2.7/subprocess.py", line 573, in check_output
raise CalledProcessError(retcode, cmd, output=output)
CalledProcessError: Command 'rfkill unblock bluetooth' returned non-zero exit status 1
... thank you, it's very helpful :)
I appreciate for sharing :).
I am unable to figure out to pair end device which has pin-code and no GUI. If you can give any suggestion it would be helpful.
For error: CalledProcessError: Command 'rfkill unblock bluetooth' returned non-zero exit status 1
change line 42:
out = subprocess.check_output("PATH=/usr/sbin:$PATH; rfkill unblock bluetooth", shell = True)
@Taufeeq25 typically such a device has a hardcoded default pin, e.g. 0000, 1234 etc. then once paired some devices accept a command to set a custom one, if desired.
How to stop the scan. I copied the function of start scan and modified it for the "scan off" command but it does not work and throws an exception.
def stop_scan(self):
"""Stop bluetooth scanning process."""
try:
out = self.get_output("scan off")
except Exception, e:
print(e)
return None
Hello thanks for the code,
but when i run agent on , there is a pairing code that will be sent to the phone and then how can i confrim the pairing code from the raspberry pi ?
any suggestions would be appreciated !!
Hi @egor,
Is there any chance you could enable RSSI as JSON field as well? so it will be like:
{"name":"Example","mac_address":"00:00:00:00:00","RSSI":"100"} ? that would help for distance determination.
Thanks in Advance.
Thanks alot. It saves my time.
Hello,
Thank you very much for this code.
I am using python3 and there are some errors to compile it.
Is it possible to get this wrapper for python3?
Thanks in advance