X-Git-Url: https://vcs.maemo.org/git/?a=blobdiff_plain;f=dbuscron%2Fparser.py;h=6937e149273551c1e70eee0b1c2a08e97d0dd2fe;hb=4801eafb397f98ed34b37324f31d706bd95d06e4;hp=b004fb79ef0e7d2d50766d6e79d9903e2d55ee7b;hpb=aa6fc1c7232ba7256d27617d11f3c7bca43589d7;p=dbuscron diff --git a/dbuscron/parser.py b/dbuscron/parser.py index b004fb7..6937e14 100644 --- a/dbuscron/parser.py +++ b/dbuscron/parser.py @@ -1,7 +1,25 @@ +# encoding: utf-8 from __future__ import with_statement import re from dbuscron.bus import DbusBus +def unescape_(): + h = '[0-9A-Fa-f]' + r = re.compile(r'\\x('+h+r'{2})|\\u('+h+'{4})') + def unescape(value): + if not (value and \ + (r'\x' in value or r'\u' in value)): + return value + + return r.sub(\ + lambda m: chr(int(m.group(1), 16)) \ + if m.group(1) is not None else \ + unichr(int(m.group(2), 16))\ + .encode('utf-8'),\ + value) + return unescape +unescape = unescape_() + def product(*args): if args: head, tail = args[0], args[1:] @@ -12,9 +30,31 @@ def product(*args): else: yield () +class CrontabParserError(SyntaxError): + def __init__(self, message, lineno, expected=None): + if expected: + if isinstance(expected, (tuple, list)): + exp = ' (expected %s or %s)' % (', '.join(expected[:-1]), expected[-1]) + else: + exp = '' + + msg = '%s%s at line %d' % (message, exp, lineno) + + SyntaxError.__init__(self, msg) + class CrontabParser(object): __fields_sep = re.compile(r'\s+') __envvar_sep = re.compile(r'\s*=\s*') + __fields_chk = { + 'bus_' : None, + 'type_' : ('signal','method_call','method_return','error'), + 'sender_' : None, + 'interface_' : re.compile(r'^[a-zA-Z][a-zA-Z0-9_.]+$'), + 'path_' : re.compile(r'^/[a-zA-Z0-9_/]+$'), + 'member_' : re.compile(r'^[a-zA-Z][a-zA-Z0-9_]+$'), + 'destination_' : None, + 'args_' : None, + } __fields = [ 'bus_', 'type_', @@ -24,7 +64,6 @@ class CrontabParser(object): 'member_', 'destination_', 'args_', - #'command' ] def __init__(self, fname): @@ -38,8 +77,10 @@ class CrontabParser(object): def __iter__(self): # bus type sender interface path member destination args command + lineno = 0 with open(self.__filename) as f: for line in f: + lineno += 1 line = line.strip() if not line or line.startswith('#'): @@ -47,12 +88,14 @@ class CrontabParser(object): parts = self.__fields_sep.split(line, 8) if len(parts) < 9: - parts = self.__envvar_sep(line, 1) + parts = self.__envvar_sep.split(line, 1) if len(parts) == 2: self.__environ[parts[0]] = parts[1] - continue + continue - rule = [('s','S'), ('signal','method_call','method_return','error'), (None,), (None,), (None,), (None,), (None,), (None,)] + raise CrontabParserError('Unexpected number of records', lineno) + + rule = [('s','S'), self.__fields_chk['type_'], (None,), (None,), (None,), (None,), (None,), (None,)] for p in range(0, 8): if parts[p] != '*': @@ -67,44 +110,34 @@ class CrontabParser(object): elif r[0] == 's': r[0] = self.__bus.session else: - continue - + raise CrontabParserError('Unexpected bus value', lineno, expected=('S', 's', '*')) + if r[7]: - r[7] = r[7].split(';') + r[7] = map(unescape, r[7].split(';')) ruled = dict() for i, f in enumerate(self.__fields): + if r[i] is not None and self.__fields_chk[f]: + if isinstance(self.__fields_chk[f], tuple): + if r[i] not in self.__fields_chk[f]: + raise CrontabParserError('Unexpected %s value' % (f.strip('_')), lineno, + expected=self.__fields_chk[f]) + else: + if not self.__fields_chk[f].match(r[i]): + raise CrontabParserError('Incorrect %s value' % (f.strip('_')), lineno) ruled[f] = r[i] - yield ruled, command - -class OptionsParser(dict): - def __init__(self, args, opts): - super(OptionsParser, self).__init__() - from getopt import getopt - go, _ = getopt(args, opts) - - for o, v in go: - k = o.strip('-') - withval = k+':' in opts - if self.has_key(k): - if withval: - if isinstance(self[k], list): - self[k].append(v) - else: - self[k] = [ self[k], v ] - - else: - self[k] += 1 + yield ruled, command - else: - self[k] = v if withval else 1 +def OptionsParser(args=None, help=u'', **opts): - def __getitem__(self, k): - if not self.has_key(k): - return False - return super(OptionsParser, self).__getitem__(k) + from optparse import OptionParser + import dbuscron + parser = OptionParser(usage=help, version="%prog "+dbuscron.__version__) + for opt, desc in opts.iteritems(): + names = desc.pop('names') + desc['dest'] = opt + parser.add_option(*names, **desc) - def __getattr__(self, k): - return self[k] + return parser.parse_args(args)[0]