support fnmatch() args match
[dbuscron] / dbuscron / bus.py
1 from __future__ import with_statement
2
3 import dbus, os
4 from fnmatch import fnmatch
5
6 from dbuscron.logger import Logger
7 log = Logger(__name__)
8
9 def dbus_to_str(value):
10     try:
11         if isinstance(value, dbus.Byte):
12             result = str(int(value))
13         elif isinstance(value, dbus.ByteArray):
14             result = ','.join(str(ord(v)) for v in value)
15         elif isinstance(value, dbus.Array):
16             result = ','.join(dbus_to_str(v) for v in value)
17         elif isinstance(value, dbus.Dictionary):
18             result = ','.join('%s:%s' % (k, dbus_to_str(v)) for k, v in value.iteritems())
19         elif isinstance(value, dbus.String):
20             result = value.encode('utf-8')
21         else:
22             result = str(value)
23         return result
24     except Exception, e:
25         log.error('convert exception', e)
26         raise e
27
28 def get_dbus_message_type(message):
29     result = message.__class__.__name__[0:-7]
30     for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
31         result = result.replace(c, '_'+c.lower())
32     return result.strip('_')
33
34 class DbusBus(object):
35     __bus = None
36     __system_bus = None
37     __session_bus = None
38
39     def __new__(cls, *args, **kw):
40         if not cls.__bus:
41             cls.__bus = super(DbusBus, cls).__new__(cls)
42         return cls.__bus
43
44     def __init__(self, session_bus_address=None):
45         if session_bus_address:
46             self.set_session_bus_address(session_bus_address)
47         else:
48             self.get_session_bus_address()
49
50         from dbus.mainloop.glib import DBusGMainLoop
51         DBusGMainLoop(set_as_default=True)
52
53     def set_session_bus_address(self, value):
54         os.environ['DBUS_SESSION_BUS_ADDRESS'] = str(value)
55
56     def get_session_bus_address(self):
57         try:
58             return os.environ['DBUS_SESSION_BUS_ADDRESS']
59         except KeyError:
60             with open('/tmp/session_bus_address.user', 'rb') as f:
61                 session_bus_address = f.readline().strip().split('=', 1).pop().strip("'\"")
62                 os.environ['DBUS_SESSION_BUS_ADDRESS'] = session_bus_address
63                 log('session bus address aquired', session_bus_address)
64                 return session_bus_address
65
66     session_bus_address = property(get_session_bus_address, set_session_bus_address)
67
68     @property
69     def system(self):
70         if not self.__system_bus:
71             self.__system_bus = dbus.SystemBus()
72         return self.__system_bus
73
74     @property
75     def session(self):
76         if not self.__session_bus:
77             self.__session_bus = dbus.SessionBus()
78         return self.__session_bus
79
80     def attach_handler(self, handler):
81         if self.__system_bus:
82             self.__system_bus.add_message_filter(handler)
83         if self.__session_bus:
84             self.__session_bus.add_message_filter(handler)
85     def listen(self):
86         from gobject import MainLoop
87         loop = MainLoop()
88         loop.run()
89
90 class DbusRule(object):
91     def __init__(self, bus_=None, type_=None, sender_=None, interface_=None, path_=None, member_=None, destination_=None, args_=[]):
92         self._bus         = bus_
93         self._type        = type_
94         self._sender      = sender_
95         self._interface   = interface_
96         self._path        = path_
97         self._member      = member_
98         self._destination = destination_
99         self._args        = args_
100
101     def register(self):
102         rule = str(self)
103         if rule:
104             self._bus.add_match_string(rule)
105
106     def unregister(self):
107         rule = str(self)
108         if rule:
109             self._bus.remove_match_string(rule)
110
111     def __del__(self):
112        self.unregister()
113
114     def __str__(self):
115         rule = []
116         for key in ['type', 'sender', 'interface', 'path', 'member', 'destination']:
117             value = getattr(self, '_'+key)
118             if value is not None:
119                 rule.append("%s='%s'" % (key, value))
120
121         if self._args:
122             for i, arg in enumerate(self._args):
123                 rule.append("arg%d%s='%s'" % (i, 'path' if arg.startswith('/') else '', arg))
124
125         return ','.join(rule)
126
127     def match(self, bus, message):
128         if self._bus not in (None, bus):
129             return False
130
131         if self._type is not None:
132             type_ = get_dbus_message_type(message)
133             if self._type != type_:
134                 return False
135
136         if self._sender not in (None, message.get_sender()):
137             return False
138
139         if self._interface not in (None, message.get_interface()):
140             return False
141
142         if self._path not in (None, message.get_path()):
143             return False
144
145         if self._member not in (None, message.get_member()):
146             return False
147
148         if self._destination not in (None, message.get_destination()):
149             return False
150
151         if self._args is not None:
152             args_ = message.get_args_list()
153             for i, arg in enumerate(args_):
154                 if i >= len(self._args):
155                     break
156                 a = dbus_to_str(arg)
157                 if self._args[i] not in (None, a):
158                     if not fnmatch(a, self._args[i]):
159                         return False
160
161         return True
162