X-Git-Url: https://vcs.maemo.org/git/?a=blobdiff_plain;f=dbuscron%2Fparser.py;h=23654230606dda616428707e9df7c5a429cacf6e;hb=b8c53d42eef70e97791f9fc1f74ae70f5070161c;hp=e38f097c86d9fa276bb0bee63e27237837f203cd;hpb=78113c793b69920f281e1ad53daf6128f7e0047e;p=dbuscron diff --git a/dbuscron/parser.py b/dbuscron/parser.py index e38f097..2365423 100644 --- a/dbuscron/parser.py +++ b/dbuscron/parser.py @@ -1,7 +1,15 @@ +# encoding: utf-8 from __future__ import with_statement import re from dbuscron.bus import DbusBus +def unescape(value): + if not value or r'\x' not in value: + return value + + r = re.compile(r'\\x([0-9A-Fa-f]{2})') + return r.sub(lambda m: chr(int(m.group(1), 16)), value) + def product(*args): if args: head, tail = args[0], args[1:] @@ -13,11 +21,30 @@ def product(*args): yield () class CrontabParserError(SyntaxError): - pass + def __init__(self, message, lineno, expected=None): + if expected: + if isinstance(expected, (tuple, list)): + exp = ' (expected %s or %s)' % (', '.join(expected[:-1]), expected[-1]) + else: + exp = '' + + msg = '%s%s at line %d' % (message, exp, lineno) + + SyntaxError.__init__(self, msg) class CrontabParser(object): __fields_sep = re.compile(r'\s+') __envvar_sep = re.compile(r'\s*=\s*') + __fields_chk = { + 'bus_' : None, + 'type_' : ('signal','method_call','method_return','error'), + 'sender_' : None, + 'interface_' : re.compile(r'^[a-zA-Z][a-zA-Z0-9_.]+$'), + 'path_' : re.compile(r'^/[a-zA-Z0-9_/]+$'), + 'member_' : re.compile(r'^[a-zA-Z][a-zA-Z0-9_]+$'), + 'destination_' : None, + 'args_' : None, + } __fields = [ 'bus_', 'type_', @@ -27,7 +54,6 @@ class CrontabParser(object): 'member_', 'destination_', 'args_', - #'command' ] def __init__(self, fname): @@ -52,14 +78,14 @@ class CrontabParser(object): parts = self.__fields_sep.split(line, 8) if len(parts) < 9: - parts = self.__envvar_sep(line, 1) + parts = self.__envvar_sep.split(line, 1) if len(parts) == 2: self.__environ[parts[0]] = parts[1] continue - raise SyntaxError('Unexpected number of records at line #%d.' % (lineno)) + raise CrontabParserError('Unexpected number of records', lineno) - rule = [('s','S'), ('signal','method_call','method_return','error'), (None,), (None,), (None,), (None,), (None,), (None,)] + rule = [('s','S'), self.__fields_chk['type_'], (None,), (None,), (None,), (None,), (None,), (None,)] for p in range(0, 8): if parts[p] != '*': @@ -74,49 +100,34 @@ class CrontabParser(object): elif r[0] == 's': r[0] = self.__bus.session else: - continue - + raise CrontabParserError('Unexpected bus value', lineno, expected=('S', 's', '*')) + if r[7]: - r[7] = r[7].split(';') + r[7] = map(unescape, r[7].split(';')) ruled = dict() for i, f in enumerate(self.__fields): + if r[i] is not None and self.__fields_chk[f]: + if isinstance(self.__fields_chk[f], tuple): + if r[i] not in self.__fields_chk[f]: + raise CrontabParserError('Unexpected %s value' % (f.strip('_')), lineno, + expected=self.__fields_chk[f]) + else: + if not self.__fields_chk[f].match(r[i]): + raise CrontabParserError('Incorrect %s value' % (f.strip('_')), lineno) ruled[f] = r[i] - yield ruled, command -class OptionsParser(dict): - def __init__(self, opts, args=None): - super(OptionsParser, self).__init__() - - if args is None: - import sys - args = sys.argv[1:] - - from getopt import getopt - go, _ = getopt(args, opts) - - for o, v in go: - k = o.strip('-') - withval = k+':' in opts - - if self.has_key(k): - if withval: - if isinstance(self[k], list): - self[k].append(v) - else: - self[k] = [ self[k], v ] - - else: - self[k] += 1 + yield ruled, command - else: - self[k] = v if withval else 1 +def OptionsParser(args=None, help=u'', **opts): - def __getitem__(self, k): - if not self.has_key(k): - return False - return super(OptionsParser, self).__getitem__(k) + from optparse import OptionParser + import dbuscron + parser = OptionParser(usage=help, version="%prog "+dbuscron.__version__) + for opt, desc in opts.iteritems(): + names = desc.pop('names') + desc['dest'] = opt + parser.add_option(*names, **desc) - def __getattr__(self, k): - return self[k] + return parser.parse_args(args)[0]