1 # Simple UPnP control point using BRisa framework
3 from brisa.core.reactors import install_default_reactor
4 reactor = install_default_reactor()
6 from brisa.core.network import parse_url
7 from brisa.core.threaded_call import run_async_function
9 from brisa.upnp.control_point.control_point import ControlPoint
10 from brisa.upnp.control_point.control_point_av import ControlPointAV
12 service = ('u','urn:schemas-upnp-org:service:PlayList:1')
13 zukebox_type = 'urn:schemas-upnp-org:device:ZukeBoxServer:1'
15 class ZukeboxControlPoint(ControlPointAV):
20 ControlPointAV.__init__(self)
25 self.cp = self.create_control_point()
27 print 'Create control point'
28 #reactor.add_after_stop_func(self.cp.stop)
30 #print 'Reactor main() criado'
31 self.cp.start_search(300, 'upnp:rootdevice')
34 if self.set_zukebox_server(self.cp):
35 print 'Searching Zukebox Server'
38 print 'After searching'
39 print self.cp.get_current_server().services()
43 def create_control_point(self):
44 """ Creates the control point and binds callbacks to device events.
47 cp.subscribe('new_device_event', self.on_new_device)
48 cp.subscribe('removed_device_event', self.on_removed_device)
52 def on_new_device(self, dev):
60 def on_removed_device(self, udn):
67 def get_switch_service(self, device):
68 return device.services[service[1]]
71 def set_zukebox_server(self, cp):
73 for device in cp.get_devices():
74 print 'set_zukebox_server'
75 if device.values().friendly_name == 'Zukebox-Server':
76 cp.set_current_server(device)
82 def get_playlist(self, cp):
86 device = cp.get_current_server()
87 service = device.get_service_by_type('urn:schemas-upnp-org:service:Playlist:1')
88 playlist = service.GetPlaylist()
91 print 'Service not discovered. Exception: %s' %e
93 print 'Error in get current playlist. Exception: %s' %e
101 def searchGenreList(self, cp, genreType):
105 device = cp.get_current_server()
106 service = device.get_service_by_type('urn:schemas-upnp-org:service:Search:1')
107 GenreType = {'GenreType':genreType}
108 genre = service.SearchGenreList(GenreType)
111 print 'Service not discovered. Exception: %s' %e
113 print 'Error in get genre type list. Exception: %s' %e
121 def searchMusicList(self, cp, music_name):
125 device = cp.get_current_server()
126 service = device.get_service_by_type('urn:schemas-upnp-org:service:Search:1')
127 MusicName = {'MusicName':music_name}
128 music = service.SearchMusicList(MusicName)
131 print 'Service not discovered. Exception: %s' %e
133 print 'Error in get music list by name. Exception: %s' %e
141 def searchArtistList(self, cp, artist_name):
145 device = cp.get_current_server()
146 service = device.get_service_by_type('urn:schemas-upnp-org:service:Search:1')
147 ArtistName = {'ArtistName':artist_name}
148 artist = service.SearchMusicList(ArtistName)
151 print 'Service not discovered. Exception: %s' %e
153 print 'Error in get artist list by name. Exception: %s' %e
161 def searchAllAudioList(self, cp):
165 device = cp.get_current_server()
166 service = device.get_service_by_type('urn:schemas-upnp-org:service:Search:1')
167 all_audio = service.SearchMusicList()
170 print 'Service not discovered. Exception: %s' %e
172 print 'Error in get music list by name. Exception: %s' %e
180 def content_directory(self, cp):
181 print 'Testing content_directory acess '