1 # Simple UPnP control point using BRisa framework
3 from brisa.core.reactors import install_default_reactor
4 reactor = install_default_reactor()
6 from brisa.core.network import parse_url
7 from brisa.core.threaded_call import run_async_function
9 from brisa.upnp.control_point.control_point import ControlPoint
10 from brisa.upnp.control_point.control_point_av import ControlPointAV
13 service = ('u','urn:schemas-upnp-org:service:PlayList:1')
14 zukebox_type = 'urn:schemas-upnp-org:device:ZukeBoxServer:1'
16 def on_new_device(dev):
18 print 'Got new device: ', dev.udn
19 print "Type 'list' to see the whole list"
27 def on_removed_device(udn):
29 print 'Device is gone: ', udn
34 def get_switch_service(device):
35 return device.services[service[1]]
37 def create_control_point():
38 """ Creates the control point and binds callbacks to device events.
41 c.subscribe('new_device_event', on_new_device)
42 c.subscribe('removed_device_event', on_removed_device)
47 """ Main loop iteration receiving input commands.
49 c = create_control_point()
51 run_async_function(_handle_cmds, (c, ))
52 reactor.add_after_stop_func(c.stop)
58 input= raw_input('(in doubt use help)command: ')
59 except KeyboardInterrupt, EOFError:
66 print 'Available commands: '
67 for x in ['help', 'search', 'set_zukebox <dev number>', 'get_playlist', 'stop', 'list', 'exit']:
73 elif input == 'search':
75 c.start_search(300, 'upnp:rootdevice')
80 for d in c.get_devices().values():
81 print 'Device no.:', k
83 print 'Name: ', d.friendly_name
84 print 'Device type', d.device_type
85 print 'Services: ', d.services.keys()
86 print 'Embedded devices:', [dev.friendly_name for dev in d.devices.values()]
89 elif input.startswith('set_zukebox'):
91 device_number = int(input.split(' ')[1])
92 c.set_current_server(devices[device_number])
94 print 'Zukebox number not found. Please run list and check again. Exception: \n %s' % e
95 c.set_current_server(None)
96 elif input == 'get_playlist':
98 print c.get_current_server().services
99 serviceTest = get_switch_service(c.get_current_server())
100 print dir(serviceTest.get_actions.__doc__)
101 playlist = serviceTest.get_actions()
102 print 'Playlist: %s' % playlist
103 #for music in playlist:
104 #print '%s', music.name
106 if not hasattr(c, 'get_current_server()') or not c.get_current_server():
107 print 'Zukebox device not set. Please use set zukebox <n>. Exception: %s' % e
109 print 'Erro in get_playlist: %s' % e
110 elif input == 'exit':
115 if __name__ == '__main__':