more robust syntax checks in crontab parser
authorKonstantin Stepanov <kstep@p-nut.info>
Sun, 12 Dec 2010 11:22:39 +0000 (13:22 +0200)
committerKonstantin Stepanov <kstep@p-nut.info>
Sun, 12 Dec 2010 11:25:43 +0000 (13:25 +0200)
dbuscron/parser.py

index e38f097..a6f1c1d 100644 (file)
@@ -13,11 +13,30 @@ def product(*args):
         yield ()
 
 class CrontabParserError(SyntaxError):
-    pass
+    def __init__(self, message, lineno, expected=None):
+        if expected:
+            if isinstance(expected, (tuple, list)):
+                exp = ' (expected %s or %s)' % (', '.join(expected[:-1]), expected[-1])
+        else:
+            exp = ''
+
+        msg = '%s%s at line %d' % (message, exp, lineno)
+
+        SyntaxError.__init__(self, msg)
 
 class CrontabParser(object):
     __fields_sep = re.compile(r'\s+')
     __envvar_sep = re.compile(r'\s*=\s*')
+    __fields_chk = {
+            'bus_'         : None,
+            'type_'        : ('signal','method_call','method_return','error'),
+            'sender_'      : None,
+            'interface_'   : re.compile(r'^[a-zA-Z][a-zA-Z0-9_.]+$'),
+            'path_'        : re.compile(r'^/[a-zA-Z0-9_/]+$'),
+            'member_'      : re.compile(r'^[a-zA-Z][a-zA-Z0-9_]+$'),
+            'destination_' : None,
+            'args_'        : None,
+            }
     __fields = [
             'bus_',
             'type_',
@@ -27,7 +46,6 @@ class CrontabParser(object):
             'member_',
             'destination_',
             'args_',
-            #'command'
             ]
 
     def __init__(self, fname):
@@ -57,9 +75,9 @@ class CrontabParser(object):
                         self.__environ[parts[0]] = parts[1]
                         continue
 
-                    raise SyntaxError('Unexpected number of records at line #%d.' % (lineno))
+                    raise CrontabParserError('Unexpected number of records', lineno)
 
-                rule = [('s','S'), ('signal','method_call','method_return','error'), (None,), (None,), (None,), (None,), (None,), (None,)]
+                rule = [('s','S'), self.__fields_chk['type_'], (None,), (None,), (None,), (None,), (None,), (None,)]
 
                 for p in range(0, 8):
                     if parts[p] != '*':
@@ -74,14 +92,23 @@ class CrontabParser(object):
                     elif r[0] == 's':
                         r[0] = self.__bus.session
                     else:
-                        continue
-       
+                        raise CrontabParserError('Unexpected bus value', lineno, expected=('S', 's', '*'))
+
                     if r[7]:
                         r[7] = r[7].split(';')
 
                     ruled = dict()
                     for i, f in enumerate(self.__fields):
+                        if self.__fields_chk[f]:
+                            if isinstance(self.__fields_chk[f], tuple):
+                                if r[i] not in self.__fields_chk[f]:
+                                    raise CrontabParserError('Unexpected %s value' % (f.strip('_')), lineno,
+                                            expected=self.__fields_chk[f])
+                            else:
+                                if not self.__fields_chk[f].match(r[i]):
+                                    raise CrontabParserError('Incorrect %s value' % (f.strip('_')), lineno)
                         ruled[f] = r[i]
+
                     yield ruled, command
 
 class OptionsParser(dict):