3e6dd5cc8a5f7c95e31f57ba7f0ec0722a27a9ab
[pywienerlinien] / gotovienna / realtime.py
1 # -*- coding: utf-8 -*-
2
3 from gotovienna.BeautifulSoup import BeautifulSoup
4 from urllib2 import urlopen
5 from datetime import time
6 import re
7 import collections
8 from errors import LineNotFoundError, StationNotFoundError
9
10 from gotovienna import defaults
11
12 class ITipParser:
13     def __init__(self):
14         self._stations = {}
15         self._lines = {}
16
17     def get_stations(self, name):
18         """ Get station by direction
19         {'Directionname': [('Station name', 'url')]}
20         """
21         if not self._stations.has_key(name):
22             st = {}
23
24             if not self.lines.has_key(name):
25                 return None
26
27             bs = BeautifulSoup(urlopen(self.lines[name]))
28             tables = bs.findAll('table', {'class': 'text_10pix'})
29             for i in range(2):
30                 dir = tables[i].div.contents[-1].strip()[6:-6]
31
32                 sta = []
33                 for tr in tables[i].findAll('tr', {'onmouseout': 'obj_unhighlight(this);'}):
34                     if tr.a:
35                         sta.append((tr.a.text, defaults.line_overview + tr.a['href']))
36                     else:
37                         sta.append((tr.text.strip(' '), None))
38
39                 st[dir] = sta
40             self._stations[name] = st
41
42         return self._stations[name]
43
44     @property
45     def lines(self):
46         """ Dictionary of Line names with url as value
47         """
48         if not self._lines:
49             bs = BeautifulSoup(urlopen(defaults.line_overview))
50             # get tables
51             lines = bs.findAll('td', {'class': 'linie'})
52
53             for line in lines:
54                 if line.a:
55                     href = defaults.line_overview + line.a['href']
56                     if line.text:
57                         self._lines[line.text] = href
58                     elif line.img:
59                         self._lines[line.img['alt']] = href
60
61         return self._lines
62
63     def get_url_from_direction(self, line, direction, station):
64         stations = self.get_stations(line)
65
66         for stationname, url in stations.get(direction, []):
67             if stationname == station:
68                 return url
69
70         return None
71
72     def get_departures(self, url):
73         """ Get list of next departures
74         integer if time until next departure
75         time if time of next departure
76         """
77
78         #TODO parse line name and direction for station site parsing
79
80         if not url:
81             # FIXME prevent from calling this method with None
82             return []
83
84         # open url for 90 min timeslot / get departure for next 90 min
85         bs = BeautifulSoup(urlopen(url + "&departureSizeTimeSlot=90"))
86         result_lines = bs.findAll('table')[-1].findAll('tr')
87
88         dep = []
89         for tr in result_lines[1:]:
90             th = tr.findAll('th')
91             if len(th) < 2:
92                 #TODO replace with logger
93                 print "[DEBUG] Unable to find th in:\n%s" % str(tr)
94             elif len(th) == 2:
95                 t = th[-1]
96             else:
97                 t = th[-2]
98             # parse time
99             time = t.text.split(' ')
100             if len(time) < 2:
101                 #print 'Invalid time: %s' % time
102                 # TODO: Issue a warning OR convert "HH:MM" format to countdown
103                 continue
104
105             time = time[1]
106
107             if time.find('rze...') >= 0:
108                     dep.append(0)
109             elif time.isdigit():
110                 # if time to next departure in cell convert to int
111                 dep.append(int(time))
112             else:
113                 # check if time of next departue in cell
114                 t = time.strip('&nbsp;').split(':')
115                 if len(t) == 2 and all(map(lambda x: x.isdigit(), t)):
116                     t = map(int, t)
117                     dep.append(time(*t))
118                 else:
119                     # Unexpected content
120                     #TODO replace with logger
121                     print "[DEBUG] Invalid data:\n%s" % time
122
123         return dep
124
125
126 UBAHN, TRAM, BUS, NIGHTLINE, OTHER = range(5)
127 LINE_TYPE_NAMES = ['U-Bahn', 'Strassenbahn', 'Bus', 'Nightline', 'Andere']
128
129 def get_line_sort_key(name):
130     """Return a sort key for a line name
131
132     >>> get_line_sort_key('U6')
133     ('U', 6)
134
135     >>> get_line_sort_key('D')
136     ('D', 0)
137
138     >>> get_line_sort_key('59A')
139     ('A', 59)
140     """
141     txt = ''.join(x for x in name if not x.isdigit())
142     num = ''.join(x for x in name if x.isdigit()) or '0'
143
144     return (txt, int(num))
145
146 def get_line_type(name):
147     """Get the type of line for the given name
148
149     >>> get_line_type('U1')
150     UBAHN
151     >>> get_line_type('59A')
152     BUS
153     """
154     if name.isdigit():
155         return TRAM
156     elif name.endswith('A') or name.endswith('B') and name[1].isdigit():
157         return BUS
158     elif name.startswith('U'):
159         return UBAHN
160     elif name.startswith('N'):
161         return NIGHTLINE
162     elif name in ('D', 'O', 'VRT', 'WLB'):
163         return TRAM
164
165     return OTHER
166
167 def categorize_lines(lines):
168     """Return a categorized version of a list of line names
169
170     >>> categorize_lines(['U4', 'U3', '59A'])
171     [('U-Bahn', ['U3', 'U4']), ('Bus', ['59A'])]
172     """
173     categorized_lines = collections.defaultdict(list)
174
175     for line in sorted(lines):
176         line_type = get_line_type(line)
177         categorized_lines[line_type].append(line)
178
179     for lines in categorized_lines.values():
180         lines.sort(key=get_line_sort_key)
181
182     return [(LINE_TYPE_NAMES[key], categorized_lines[key])
183             for key in sorted(categorized_lines)]
184
185
186 class Line:
187     def __init__(self, name):
188         self._stations = None
189         self.parser = ITipParser()
190         if name.strip() in self.parser.lines():
191             self.name = name.strip()
192         else:
193             raise LineNotFoundError('There is no line "%s"' % name.strip())
194
195     @property
196     def stations(self):
197         if not self._stations:
198             self._stations = parser.get_stations(self.name)
199         return self._stations
200
201     def get_departures(self, stationname):
202         stationname = stationname.strip().lower()
203         stations = self.stations
204
205         found = false
206
207         for direction in stations.keys():
208             # filter stations starting with stationname
209             stations[direction] = filter(lambda station: station[0].lower().starts_with(stationname), stations)
210             found = found or bool(stations[direction])
211
212         if found:
213             # TODO return departures
214             raise NotImplementedError()
215         else:
216             raise StationNotFoundError('There is no stationname called "%s" at route of line "%s"' % (stationname, self.name))