X-Git-Url: https://vcs.maemo.org/git/?a=blobdiff_plain;f=dbuscron%2Fparser.py;h=6937e149273551c1e70eee0b1c2a08e97d0dd2fe;hb=4801eafb397f98ed34b37324f31d706bd95d06e4;hp=5f8f97b098b6119d5d4cf7e337bf4d1e2f15030a;hpb=cae5cf923f5076e31f9de843cd5012f798462baf;p=dbuscron diff --git a/dbuscron/parser.py b/dbuscron/parser.py index 5f8f97b..6937e14 100644 --- a/dbuscron/parser.py +++ b/dbuscron/parser.py @@ -1,23 +1,60 @@ +# encoding: utf-8 from __future__ import with_statement import re from dbuscron.bus import DbusBus -try: - from itertools import product -except ImportError: - def product(*args): - if args: - head, tail = args[0], args[1:] - for h in head: - for t in product(*tail): - yield (h,) + t - +def unescape_(): + h = '[0-9A-Fa-f]' + r = re.compile(r'\\x('+h+r'{2})|\\u('+h+'{4})') + def unescape(value): + if not (value and \ + (r'\x' in value or r'\u' in value)): + return value + + return r.sub(\ + lambda m: chr(int(m.group(1), 16)) \ + if m.group(1) is not None else \ + unichr(int(m.group(2), 16))\ + .encode('utf-8'),\ + value) + return unescape +unescape = unescape_() + +def product(*args): + if args: + head, tail = args[0], args[1:] + for h in head: + for t in product(*tail): + yield (h,) + t + + else: + yield () + +class CrontabParserError(SyntaxError): + def __init__(self, message, lineno, expected=None): + if expected: + if isinstance(expected, (tuple, list)): + exp = ' (expected %s or %s)' % (', '.join(expected[:-1]), expected[-1]) else: - yield () + exp = '' + + msg = '%s%s at line %d' % (message, exp, lineno) + + SyntaxError.__init__(self, msg) class CrontabParser(object): __fields_sep = re.compile(r'\s+') __envvar_sep = re.compile(r'\s*=\s*') + __fields_chk = { + 'bus_' : None, + 'type_' : ('signal','method_call','method_return','error'), + 'sender_' : None, + 'interface_' : re.compile(r'^[a-zA-Z][a-zA-Z0-9_.]+$'), + 'path_' : re.compile(r'^/[a-zA-Z0-9_/]+$'), + 'member_' : re.compile(r'^[a-zA-Z][a-zA-Z0-9_]+$'), + 'destination_' : None, + 'args_' : None, + } __fields = [ 'bus_', 'type_', @@ -27,7 +64,6 @@ class CrontabParser(object): 'member_', 'destination_', 'args_', - #'command' ] def __init__(self, fname): @@ -41,8 +77,10 @@ class CrontabParser(object): def __iter__(self): # bus type sender interface path member destination args command + lineno = 0 with open(self.__filename) as f: for line in f: + lineno += 1 line = line.strip() if not line or line.startswith('#'): @@ -50,34 +88,56 @@ class CrontabParser(object): parts = self.__fields_sep.split(line, 8) if len(parts) < 9: - parts = self.__envvar_sep(line, 1) + parts = self.__envvar_sep.split(line, 1) if len(parts) == 2: self.__environ[parts[0]] = parts[1] - continue + continue - rule = [(None,), (None,), (None,), (None,), (None,), (None,), (None,), (None,)] + raise CrontabParserError('Unexpected number of records', lineno) - if parts[1] = '*': - parts[1] = 'signal,method_call,method_return,error' + rule = [('s','S'), self.__fields_chk['type_'], (None,), (None,), (None,), (None,), (None,), (None,)] - for p in range(1, 8): + for p in range(0, 8): if parts[p] != '*': rule[p] = parts[p].split(',') command = parts[8] - - if parts[0] == '*' or parts[0] == 'S,s' or parts[0] == 's,S': - rule[0] = (self.__bus.system, self.__bus.session) - elif parts[0] == 's': - rule[0] = (self.__bus.session,) - elif parts[0] == 'S': - rule[0] = (self.__bus.system,) - + for r in product(*rule): + r = list(r) + if r[0] == 'S': + r[0] = self.__bus.system + elif r[0] == 's': + r[0] = self.__bus.session + else: + raise CrontabParserError('Unexpected bus value', lineno, expected=('S', 's', '*')) + if r[7]: - r[7] = r[7].split(';') + r[7] = map(unescape, r[7].split(';')) + ruled = dict() for i, f in enumerate(self.__fields): + if r[i] is not None and self.__fields_chk[f]: + if isinstance(self.__fields_chk[f], tuple): + if r[i] not in self.__fields_chk[f]: + raise CrontabParserError('Unexpected %s value' % (f.strip('_')), lineno, + expected=self.__fields_chk[f]) + else: + if not self.__fields_chk[f].match(r[i]): + raise CrontabParserError('Incorrect %s value' % (f.strip('_')), lineno) ruled[f] = r[i] + yield ruled, command +def OptionsParser(args=None, help=u'', **opts): + + from optparse import OptionParser + import dbuscron + parser = OptionParser(usage=help, version="%prog "+dbuscron.__version__) + for opt, desc in opts.iteritems(): + names = desc.pop('names') + desc['dest'] = opt + parser.add_option(*names, **desc) + + return parser.parse_args(args)[0] +