# -*- coding: utf-8 -*- # **************************************************************************** # Copyright (c) 2008 INdT/Fucapi. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program. If not, see . # # ============================================================================ # Project Name : PC Remote # Author : Jônatas Isvi # Email : jonatas.nona@gmail.com # Reviewer : # Email : # Version : 1.0 # Package : connection # Description : BluetoothConnectionManager # ============================================================================ import bluetooth from exceptions import * from genericconnectionmanager import * class BluetoothConnectionManager(GenericConnectionManager): """ BluetoothConnectionManager manages objects and operations for bluetooth connection. Subclass of GerericConnectionManager. """ def __init__(self): GenericConnectionManager.__init__(self) self.sock = None self.port = None self.address = None self.client_sock = None self.client_address = None # fast way to create a simple server def create_server(self, protocol, port): self.create_socket(protocol) self.set_port(port) self.bind() self.listen() self.accept() # fast way to create a simple client def create_client(self, protocol, address, port): self.create_socket(protocol) self.set_address(address) self.set_port(port) self.connect() # search for all devices def find_devices(self, time=8): list_devices = bluetooth.discover_devices(lookup_names = True, duration=time) if list_devices: return list_devices else: raise BluetoothConnectionError, "Device were not found." # search the device port def find_port(self, addr): port = None aux = addr.split(":") if len(aux) == 6: services = bluetooth.find_service(address=addr) for i in range(len(services)): port = services[i]['port'] if port != None: return port else: raise BluetoothConnectionError, "Port not found." else: raise BluetoothConnectionError, "Invalid address." # search device services def find_services(self, service=None, addr=None): if service == None and addr == None: list = bluetooth.find_service() return list elif service != None and addr == None: list = bluetooth.find_service(name=service) if list != []: return list else: raise BluetoothConnectionError, "Name of the service does not exist." elif service == None and addr != None: number = addr.split(":") if(len(number) == 6): list = bluetooth.find_service(address=addr) if list != []: return list else: raise BluetoothConnectionError, "Services not found." else: raise BluetoothConnectionError, "Invalid address." elif service != None and addr != None: number = addr.split(":") if(len(number) == 6): list = bluetooth.find_service(name=service, address=addr) if list != []: return list else: raise BluetoothConnectionError, "Services not found." else: raise BluetoothConnectionError, "Invalid address." # search the device indicated by name def find_device_address_by_name(self, device_name): list = bluetooth.discover_devices() addr = None for address in list: if device_name == bluetooth.lookup_name(address): addr = address break if addr: return addr else: raise BluetoothConnectionError, "Device name not found." # search only device names def find_devices_only_names(self): list = self.find_devices() list_names = [] for address, names in list: list_names += [names] if list_names: return list_names else: raise BluetoothConnectionError, "Devices were not found." # get the client address def get_client_address(self): return self.client_address # set the port to communicate def set_port(self, port): self.port = port # get the port to communicate def get_port(self): return self.port # set the device address def set_address(self, address): aux = address.split(":") if len(aux) == 6: self.address = address else: raise BluetoothConnectionError, "Invalid address." # get the device address def get_address(self): return self.address # create a socket with a determinated protocol def create_socket(self, protocol=None): if protocol == 'rfcomm' or protocol == 'RFCOMM': self.sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) elif protocol == 'l2cap' or protocol == 'L2CAP': self.sock = bluetooth.BluetoothSocket(bluetooth.L2CAP) else: raise BluetoothConnectionError, "Undefined Protocol." # bind the communication def bind(self): self.sock.bind(("", self.get_port())) # just listen the tube, only to server def listen(self): self.sock.listen(1) # accept the client communication def accept(self): self.client_sock, self.client_address = self.sock.accept() # connect devices def connect(self): self.sock.connect((self.get_address(), self.get_port())) # send string message def send_message(self, msg=None): self.sock.send(msg) # receive string message def received_message(self): return self.client_sock.recv(1024) # close connection def close(self): if self.sock != None and self.client_sock != None: self.client_sock.close() self.sock.close() elif self.sock != None and self.client_sock == None: self.sock.close() else: self.client_sock.close()