--- /dev/null
+import httplib
+import gnome.gconf
+from oauth import oauth
+from xml.dom.minidom import parseString
+from org.maemo.hermes.engine.friend import Friend
+
+# httplib.HTTPSConnection.debuglevel = 1
+
+class LinkedInApi():
+ """LinkedIn API for Hermes.
+
+ Copyright (c) Fredrik Wendt <fredrik@wendt.se> 2010.
+ Released under the Artistic Licence."""
+
+
+ LI_SERVER = "api.linkedin.com"
+ LI_API_URL = "https://api.linkedin.com"
+ LI_CONN_API_URL = LI_API_URL + "/v1/people/~/connections"
+
+ REQUEST_TOKEN_URL = LI_API_URL + "/uas/oauth/requestToken"
+ AUTHORIZE_URL = LI_API_URL + "/uas/oauth/authorize"
+ ACCESS_TOKEN_URL = LI_API_URL + "/uas/oauth/accessToken"
+
+
+ # -----------------------------------------------------------------------
+ def __init__(self):
+ """Initialize the LinkedIn service, finding LinkedIn API keys in gconf and
+ having a gui_callback available."""
+
+ self._gc = gnome.gconf.client_get_default()
+
+ # -- Check the environment is going to work...
+ #
+ if (self._gc.get_string('/desktop/gnome/url-handlers/http/command') == 'epiphany %s'):
+ raise Exception('Browser in gconf invalid (see NB#136012). Installation error.')
+
+ api_key = self._gc.get_string('/apps/maemo/hermes/linkedin_api_key')
+ secret_key = self._gc.get_string('/apps/maemo/hermes/linkedin_key_secret')
+
+ # FIXME: remove this
+ api_key = '1et4G-VtmtqNfY7gF8PHtxMOf0KNWl9ericlTEtdKJeoA4ubk4wEQwf8lSL8AnYE'
+ secret_key = 'uk--OtmWcxER-Yh6Py5p0VeLPNlDJSMaXj1xfHILoFzrK7fM9eepNo5RbwGdkRo_'
+
+ if api_key is None or secret_key is None:
+ raise Exception('No LinkedIn application keys found. Installation error.')
+
+ self.api_key = api_key
+ self.secret_key = secret_key
+
+ # FIXME: move this
+ token_str = "oauth_token_secret=60f817af-6437-4015-962f-cc3aefee0264&oauth_token=f89c2b7b-1c12-4f83-a469-838e78901716"
+ self.access_token = oauth.OAuthToken.from_string(token_str)
+
+ self.consumer = oauth.OAuthConsumer(api_key, secret_key)
+ self.sig_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
+
+ # -----------------------------------------------------------------------
+ def get_friends(self):
+ """ Returns a Friend object for each LinkedIn connection."""
+
+ xml = self._make_api_request(self.LI_CONN_API_URL)
+ dom = parseString(xml)
+ friends = self._parse_dom(dom)
+
+ return friends
+
+ # -----------------------------------------------------------------------
+ def _make_api_request(self, url):
+ print "_make_api_request", url
+ oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, token=self.access_token, http_url=url)
+ oauth_request.sign_request(self.sig_method, self.consumer, self.access_token)
+ connection = httplib.HTTPSConnection(self.LI_SERVER)
+ try:
+ connection.request(oauth_request.http_method, url, headers=oauth_request.to_header())
+ return connection.getresponse().read()
+ except:
+ raise Exception("Failed to contact LinkedIn at " + url)
+
+
+ # -----------------------------------------------------------------------
+ def _parse_dom(self, dom):
+ print "parse_dom", dom
+ def get_first_tag(node, tagName):
+ tags = node.getElementsByTagName(tagName)
+ if tags and len(tags) > 0:
+ return tags[0]
+
+ def extract(node, tagName):
+ tag = get_first_tag(node, tagName)
+ if tag:
+ return tag.firstChild.nodeValue
+
+ def extract_public_url(node):
+ tag = get_first_tag(node, 'site-standard-profile-request')
+ if tag:
+ url = extract(tag, 'url')
+ return url.replace("&", "&")
+
+ # look for errors
+ errors = dom.getElementsByTagName('error')
+ if (len(errors) > 0):
+ print "Error" # FIXME: handle this better
+ return []
+
+ friends = []
+ people = dom.getElementsByTagName('person')
+ for p in people:
+ try:
+ fn = extract(p, 'first-name')
+ ln = extract(p, 'last-name')
+ photo_url = extract(p, 'picture-url')
+ id = extract(p, 'id')
+ public_url = extract_public_url(p)
+
+ name = fn + " " + ln
+ friend = Friend(name)
+ friend.add_url(public_url)
+ if photo_url:
+ friend.set_photo_url(photo_url)
+
+ friends.append(friend)
+
+ except:
+ pass
+
+
+ # -----------------------------------------------------------------------
+ def _get_access_token(self, token, verifier):
+ """user provides the verifier, which was displayed in the browser window"""
+
+ oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, token=token, verifier=verifier, http_url=self.ACCESS_TOKEN_URL)
+ oauth_request.sign_request(self.sig_method, self.consumer, token)
+
+ connection = httplib.HTTPSConnection(self.LI_SERVER)
+ connection.request(oauth_request.http_method, self.ACCESS_TOKEN_URL, headers=oauth_request.to_header())
+ response = connection.getresponse()
+ return oauth.OAuthToken.from_string(response.read())
+
+
+ # -----------------------------------------------------------------------
+ def _get_authorize_url(self, token):
+ """The URL that the user should browse to, in order to authorize the application to acess data"""
+
+ oauth_request = oauth.OAuthRequest.from_token_and_callback(token=token, http_url=self.AUTHORIZE_URL)
+ return oauth_request.to_url()
+
+
+ # -----------------------------------------------------------------------
+ def _get_request_token(self, callback):
+ """Get a request token from LinkedIn"""
+
+ oauth_consumer_key = self.api_key
+ oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, callback=callback, http_url=self.REQUEST_TOKEN_URL)
+ oauth_request.sign_request(self.sig_method, self.consumer, None)
+
+ connection = httplib.HTTPSConnection(self.LI_SERVER)
+ connection.request(oauth_request.http_method, self.REQUEST_TOKEN_URL, headers=oauth_request.to_header())
+ response = self.connection.getresponse().read()
+
+ token = oauth.OAuthToken.from_string(response)
+ return token
+
+
+ # -----------------------------------------------------------------------
+ def _do_li_login(self):
+ """Perform authentication against LinkedIn and store the result in gconf
+ for later use. Uses the 'need_auth' and 'block_for_auth' methods on
+ the callback class. The former allows a message to warn the user
+ about what is about to happen to be shown; the second is to wait
+ for the user to confirm they have logged in."""
+ # FIXME
-import httplib
-import gnome.gconf
-from oauth import oauth
-from xml.dom.minidom import parseString
-
-import org.maemo.hermes.engine.service
-from org.maemo.hermes.engine.friend import Friend
+import org.maemo.hermes.engine.facebook.service
from org.maemo.hermes.engine.names import canonical
-# httplib.HTTPSConnection.debuglevel = 1
-class Service(org.maemo.hermes.engine.service.Service):
+class Service(org.maemo.hermes.engine.facebook.service.Service):
"""LinkedIn backend for Hermes.
- This sets up two gconf paths to contain LinkedIn OAuth keys:
- /apps/maemo/hermes/linkedin_oauth
- /apps/maemo/hermes/linkedin_verifier
-
Copyright (c) Fredrik Wendt <fredrik@wendt.se> 2010.
Released under the Artistic Licence."""
- LI_SERVER = "api.linkedin.com"
- LI_API_URL = "https://api.linkedin.com"
- LI_CONN_API_URL = LI_API_URL + "/v1/people/~/connections"
-
- REQUEST_TOKEN_URL = LI_API_URL + "/uas/oauth/requestToken"
- AUTHORIZE_URL = LI_API_URL + "/uas/oauth/authorize"
- ACCESS_TOKEN_URL = LI_API_URL + "/uas/oauth/accessToken"
-
-
# -----------------------------------------------------------------------
- def __init__(self, autocreate=False, gui_callback=None):
+ def __init__(self, linkedInApi):
"""Initialize the LinkedIn service, finding LinkedIn API keys in gconf and
having a gui_callback available."""
- self._gc = gnome.gconf.client_get_default()
- self._gui = gui_callback
- self._autocreate = autocreate
-
- # -- Check the environment is going to work...
- #
- if (self._gc.get_string('/desktop/gnome/url-handlers/http/command') == 'epiphany %s'):
- raise Exception('Browser in gconf invalid (see NB#136012). Installation error.')
-
- api_key = self._gc.get_string('/apps/maemo/hermes/linkedin_api_key')
- secret_key = self._gc.get_string('/apps/maemo/hermes/linkedin_key_secret')
-
- # FIXME: remove this
- api_key = '1et4G-VtmtqNfY7gF8PHtxMOf0KNWl9ericlTEtdKJeoA4ubk4wEQwf8lSL8AnYE'
- secret_key = 'uk--OtmWcxER-Yh6Py5p0VeLPNlDJSMaXj1xfHILoFzrK7fM9eepNo5RbwGdkRo_'
+ org.maemo.hermes.engine.facebook.service.Service.__init__(self, None)
+ self.linkedInApi = linkedInApi
- if api_key is None or secret_key is None:
- raise Exception('No LinkedIn application keys found. Installation error.')
-
- self.api_key = api_key
- self.secret_key = secret_key
-
- # FIXME: move this
- token_str = "oauth_token_secret=60f817af-6437-4015-962f-cc3aefee0264&oauth_token=f89c2b7b-1c12-4f83-a469-838e78901716"
- self.access_token = oauth.OAuthToken.from_string(token_str)
-
- self.consumer = oauth.OAuthConsumer(api_key, secret_key)
- self.sig_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
- self._friends_by_name = None
- self._friends_by_url = {}
- self._friends_by_contact = {}
-
- self.get_friends()
-
-
# -----------------------------------------------------------------------
- def get_name(self):
- return "LinkedIn"
-
-
- # -----------------------------------------------------------------------
- def get_friends(self):
- """Return a list of friends from this service, or 'None' if manual mapping
- is not supported."""
-
- if self._friends_by_name:
- return self._friends_by_name.values()
-
- # FIXME: Check the available OAuth session is still valid...
-
- # get data
- self._friends_by_name = {}
-
- xml = self._make_api_request(self.LI_CONN_API_URL)
- dom = parseString(xml)
-
- self._parse_dom(dom)
+ def process_friends(self):
+ friends = self.linkedInApi.get_friends()
+ for friend in friends:
+ for url in friend.get_urls():
+ self._friends_by_url[url] = friend
+ if self._allow_friend_to_match_by_name(friend):
+ key = canonical(friend.get_name())
+ self._friends_by_name[key] = friend
- print "get_friends found", len(self._friends_by_name)
- return self._friends_by_name.values()
-
-
# -----------------------------------------------------------------------
- def pre_process_contact(self, contact):
- """Makes sure that if the contact has been mapped to a friend before,
- remove the friend from "new friends" list."""
-
- for url in contact.get_urls():
- if url in self._friends_by_url:
- matched_friend = self._friends_by_url[url]
- print "Contact is known as a friend on this service (by URL) %s - %s" % (url, matched_friend)
- self._friends_by_contact[contact] = matched_friend
-
+ def _allow_friend_to_match_by_name(self, friend):
+ for url in friend.get_urls():
+ if url in self._known_urls:
+ return False
+ return True
- # -----------------------------------------------------------------------
- def process_contact(self, contact, friend):
- """Updates friend if the contact can be mapped to a friend on LinkedIn,
- either by previous mapping or by identifiers."""
-
- if self._friends_by_contact.has_key(contact):
- friend.update(self._friends_by_contact[contact])
- else:
- self._match_contact_by_identifiers(contact, friend)
- # -----------------------------------------------------------------------
- def finalise(self, updated):
- if self._autocreate or True:
- for f in self._friends_by_name.values():
- if f not in self._friends_by_contact.values():
- # FIXME: create friends as contact
- print "Could/should create contact here for %s" % f
-
-
- # -----------------------------------------------------------------------
- def _get_access_token(self, token, verifier):
- """user provides the verifier, which was displayed in the browser window"""
-
- oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, token=token, verifier=verifier, http_url=self.ACCESS_TOKEN_URL)
- oauth_request.sign_request(self.sig_method, self.consumer, token)
-
- connection = httplib.HTTPSConnection(self.LI_SERVER)
- connection.request(oauth_request.http_method, self.ACCESS_TOKEN_URL, headers=oauth_request.to_header())
- response = connection.getresponse()
- return oauth.OAuthToken.from_string(response.read())
-
-
- # -----------------------------------------------------------------------
- def _get_authorize_url(self, token):
- """The URL that the user should browse to, in order to authorize the application to acess data"""
-
- oauth_request = oauth.OAuthRequest.from_token_and_callback(token=token, http_url=self.AUTHORIZE_URL)
- return oauth_request.to_url()
-
-
- # -----------------------------------------------------------------------
- def _get_request_token(self, callback):
- """Get a request token from LinkedIn"""
-
- oauth_consumer_key = self.api_key
- oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, callback=callback, http_url=self.REQUEST_TOKEN_URL)
- oauth_request.sign_request(self.sig_method, self.consumer, None)
-
- connection = httplib.HTTPSConnection(self.LI_SERVER)
- connection.request(oauth_request.http_method, self.REQUEST_TOKEN_URL, headers=oauth_request.to_header())
- response = self.connection.getresponse().read()
-
- token = oauth.OAuthToken.from_string(response)
- return token
-
-
- # -----------------------------------------------------------------------
- def _make_api_request(self, url):
- print "_make_api_request", url
- oauth_request = oauth.OAuthRequest.from_consumer_and_token(self.consumer, token=self.access_token, http_url=url)
- oauth_request.sign_request(self.sig_method, self.consumer, self.access_token)
- connection = httplib.HTTPSConnection(self.LI_SERVER)
- try:
- connection.request(oauth_request.http_method, url, headers=oauth_request.to_header())
- return connection.getresponse().read()
- except:
- raise Exception("Failed to contact LinkedIn at " + url)
-
-
- # -----------------------------------------------------------------------
- def _do_li_login(self):
- """Perform authentication against LinkedIn and store the result in gconf
- for later use. Uses the 'need_auth' and 'block_for_auth' methods on
- the callback class. The former allows a message to warn the user
- about what is about to happen to be shown; the second is to wait
- for the user to confirm they have logged in."""
- # FIXME
-
-
- # -----------------------------------------------------------------------
- def _match_contact_by_identifiers(self, contact, friend):
- for id in contact.get_identifiers():
- if id in self._friends_by_name:
- matched_friend = self._friends_by_name[id]
- if matched_friend in self._friends_by_contact.values():
- print "Avoiding assigning same friend to two contacts: %s " % matched_friend
- else:
- self._friends_by_contact[contact] = matched_friend
- friend.update(matched_friend)
-
-
- # -----------------------------------------------------------------------
- def _parse_dom(self, dom):
- print "parse_dom", dom
- def get_first_tag(node, tagName):
- tags = node.getElementsByTagName(tagName)
- if tags and len(tags) > 0:
- return tags[0]
-
- def extract(node, tagName):
- tag = get_first_tag(node, tagName)
- if tag:
- return tag.firstChild.nodeValue
-
- def extract_public_url(node):
- tag = get_first_tag(node, 'site-standard-profile-request')
- if tag:
- url = extract(tag, 'url')
- return url.replace("&", "&")
-
- # FIXME: look for <error>
- people = dom.getElementsByTagName('person')
- for p in people:
- # try:
- if True:
- fn = extract(p, 'first-name')
- ln = extract(p, 'last-name')
- photo_url = extract(p, 'picture-url')
- id = extract(p, 'id')
- public_url = extract_public_url(p)
-
- name = fn + " " + ln
- friend = Friend(name)
- friend.add_url(public_url)
- if photo_url: friend.set_photo_url(photo_url)
-
- key = canonical(name)
- self._friends_by_name[key] = friend
- self._friends_by_url[public_url] = friend
-# except:
-# continue
--- /dev/null
+from org.maemo.hermes.engine.linkedin.service import Service
+from org.maemo.hermes.engine.names import canonical
+from org.maemo.hermes.engine.friend import Friend
+import unittest
+
+
+class FakeContact():
+ id_counter = 0
+ def __init__(self, name, addr, id=None):
+ self.name = name;
+ self.urls = addr
+ self.id = id or FakeContact.id_counter
+ FakeContact.id_counter = FakeContact.id_counter + 1;
+ def get_urls(self):
+ return self.urls
+ def get_name(self):
+ return self.name
+ def get_identifiers(self):
+ return [canonical(self.name)]
+
+class FakeLinkedInApi():
+ def get_friends(self):
+ return self._friends
+ def set_friends(self, data):
+ self._friends = data
+
+class UrlFriend(Friend):
+ def __init__(self, name, url):
+ Friend.__init__(self, name)
+ self.add_url(url)
+
+class TestLinkedInService(unittest.TestCase):
+
+ def setUp(self):
+ self.linkedInApi = FakeLinkedInApi()
+ self.testee = Service(self.linkedInApi)
+
+ def test_main_flow_one_match_by_url_one_by_name(self):
+ known_url = "http://linkedin/id=1"
+ contact_by_url = FakeContact("By Url", [known_url], 1)
+ friend_by_url = UrlFriend("Unimportant", known_url)
+
+ known_name = "By Name"
+ contact_by_name = FakeContact(known_name, [], 2)
+ friend_by_name = Friend(known_name)
+
+ self._fake_server_response([friend_by_name, friend_by_url])
+
+ self._run_service([contact_by_name, contact_by_url])
+
+ assert len(self.testee.get_unmatched_friends()) == 0
+ matchers = self.testee.get_contacts_with_match()
+ assert contact_by_name in matchers
+ assert contact_by_url in matchers
+ assert known_url in matchers[contact_by_url].get_urls()
+
+ def _run_service(self, contacts):
+ for contact in contacts:
+ self.testee.pre_process_contact(contact)
+ self.testee.process_friends()
+ for contact in contacts:
+ self.testee.process_contact(contact)
+
+ def _fake_server_response(self, data):
+ self.linkedInApi.set_friends(data)
+
+
+if __name__ == '__main__':
+ unittest.main()