package/debian/files
package/debian/hermes/
*~
- *.pyc
*.pyo
++*.pyc
++.project
++.settings
++.pydevproject
++.mica_settings
++.gitignore
+ core
</buildSpec>
<natures>
<nature>org.python.pydev.pythonNature</nature>
+ <nature>org.eclipse.wst.common.project.facet.core.nature</nature>
+ <nature>org.maemo.mica.python.project.core.pythonNature</nature>
+ <nature>org.maemo.esbox.python.project.pythonNature</nature>
</natures>
</projectDescription>
<pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.5</pydev_property>
- <pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
-<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">FREMANTLE_X86 - Python Interpreter</pydev_property>
++<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">python</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/Hermes/src</path>
- </pydev_pathproperty>
- <pydev_pathproperty name="org.python.pydev.PROJECT_EXTERNAL_SOURCE_PATH">
- <path>/home/ceda/pyworkspace/maemo-python-site-packages</path>
++<path>/Hermes/test</path>
</pydev_pathproperty>
</pydev_project>
-import gnome.gconf
import org.maemo.hermes.engine.service
-from facebook import Facebook,FacebookError
from org.maemo.hermes.engine.names import canonical
+ from org.maemo.hermes.engine.friend import Friend
class Service(org.maemo.hermes.engine.service.Service):
"""Facebook backend for Hermes.
- This requires two gconf paths to contain Facebook application keys:
- /apps/maemo/hermes/key_app
- /apps/maemo/hermes/key_secret
-
Copyright (c) Andrew Flegg <andrew@bleb.org> 2010.
+ Copyright (c) Fredrik Wendt <fredrik@wendt.se> 2010.
Released under the Artistic Licence."""
-
+ attrs = ['uid', 'name', 'pic_big', 'birthday_date', 'profile_url', 'first_name', 'last_name', 'website']
-
++
++
# -----------------------------------------------------------------------
- def __init__(self, facebook, autocreate = False, gui_callback = None):
- def __init__(self, autocreate = False, gui_callback = None):
++ def __init__(self, facebook, autocreate=False, gui_callback=None):
"""Initialise the Facebook service, finding Facebook API keys in gconf and
having a gui_callback available."""
- self._gc = gnome.gconf.client_get_default()
self._gui = gui_callback
self._autocreate = autocreate
+ self.fb = facebook
- self._friends_by_id = {}
- # -- Check the environment is going to work...
- #
- if (self._gc.get_string('/desktop/gnome/url-handlers/http/command') == 'epiphany %s'):
- raise Exception('Browser in gconf invalid (see NB#136012). Installation error.')
-
- key_app = self._gc.get_string('/apps/maemo/hermes/key_app')
- key_secret = self._gc.get_string('/apps/maemo/hermes/key_secret')
- if key_app is None or key_secret is None:
- raise Exception('No Facebook application keys found. Installation error.')
-
- self.fb = Facebook(key_app, key_secret)
- self.fb.desktop = True
-
- self._friends = None
++ self._friends_by_name = {}
self._friends_by_url = {}
self._friends_by_contact = {}
- self._should_create = set()
-
- self.get_friends()
+ self._contacts_by_friend = {}
+ self._friends_without_contact = set()
+ self._known_urls = set()
# -----------------------------------------------------------------------
def get_name(self):
return "Facebook"
-
+
# -----------------------------------------------------------------------
- def _do_fb_login(self):
- """Perform authentication against Facebook and store the result in gconf
- for later use. Uses the 'need_auth' and 'block_for_auth' methods on
- the callback class. The former allows a message to warn the user
- about what is about to happen to be shown; the second is to wait
- for the user to confirm they have logged in."""
- self.fb.session_key = None
- self.fb.secret = None
- self.fb.uid = None
-
- if self._gui:
- self._gui.need_auth()
-
- self.fb.auth.createToken()
- self.fb.login()
-
- if self._gui:
- self._gui.block_for_auth()
-
- session = self.fb.auth.getSession()
- self._gc.set_string('/apps/maemo/hermes/session_key', session['session_key'])
- self._gc.set_string('/apps/maemo/hermes/secret_key', session['secret'])
- self._gc.set_string('/apps/maemo/hermes/uid', str(session['uid']))
+ def get_friends(self):
+ return self._contacts_by_friend.keys()
+
+
+ def get_contacts_with_match(self):
+ return self._friends_by_contact
+
+ def get_unmatched_friends(self):
- return self._friends_by_id.values()
++ return self._friends_by_name.values()
++
-
# -----------------------------------------------------------------------
- def get_friends(self):
- """Return a list of friends from this service, or 'None' if manual mapping
- is not supported."""
-
- if self._friends:
- return self._friends.values()
-
- if self.fb.session_key is None:
- self.fb.session_key = self._gc.get_string('/apps/maemo/hermes/session_key')
- self.fb.secret = self._gc.get_string('/apps/maemo/hermes/secret_key')
- self.fb.uid = self._gc.get_string('/apps/maemo/hermes/uid')
+ def pre_process_contact(self, contact):
+ """Registers URLs of all previous mappings, and makes sure that any Friends with those
+ URLs don't get match by name."""
+ for url in contact.get_urls():
+ self._known_urls.add(url)
+
+
+ # -----------------------------------------------------------------------
+ def process_friends(self):
- # Check the available session is still valid...
- while True:
- try:
- if self.fb.users.getLoggedInUser() and self.fb.session_key:
- break
- except FacebookError:
- pass
- self._do_fb_login()
-
-
def if_defined(data, key, callback):
if key in data and data[key]:
callback(data[key])
if_defined(data, 'profile_url', friend.add_url)
if_defined(data, 'birthday_date', friend.set_birthday_date)
- # exception from the rule:
-# if_defined(data, 'pic_big', friend.set_photo_url)
- friend.set_photo_url(data[attrs[2]])
+ if_defined(data, 'pic_big', friend.set_photo_url)
- if friend.has_birthday_date():
- self._should_create.add(friend)
-
- return self._friends.values()
+ if friend.has_birthday_date(): # FIXME: remove this, either you want to add your contacts or not?
+ self._friends_without_contact.add(friend)
+ url = data['profile_url']
+ self._friends_by_url[url] = friend
+
+ if url not in self._known_urls:
- self._friends_by_id[key] = friend
++ self._friends_by_name[key] = friend
+
# -----------------------------------------------------------------------
- def pre_process_contact(self, contact):
-
- if not self._friends:
- self.get_friends()
-
- for url in contact.get_urls():
- if url in self._friends_by_url:
- matched_friend = self._friends_by_url[url]
- self._friends_by_contact[contact] = matched_friend
- self._should_create.discard(matched_friend)
-
-
- # -----------------------------------------------------------------------
- def process_contact(self, contact, friend):
-
- if not self._friends:
- self.get_friends()
-
+ def process_contact(self, contact):
if self._friends_by_contact.has_key(contact):
- friend.update(self._friends_by_contact[contact])
return
matched_friend = None
if not matched_friend:
for id in contact.get_identifiers():
- if id in self._friends_by_id:
- matched_friend = self._friends_by_id.pop(id)
- if id in self._friends:
- matched_friend = self._friends[id]
- self._friends_by_contact[contact] = matched_friend
++ if id in self._friends_by_name:
++ matched_friend = self._friends_by_name.pop(id)
+ self._register_match(contact, matched_friend)
+ print contact.get_name(), " -> match by name -> ", matched_friend
break
-
- if matched_friend:
- print contact.get_name(), " -> ", matched_friend
- self._should_create.discard(matched_friend)
-
-
-
+
# -----------------------------------------------------------------------
- def finalise(self, updated, overwrite = False):
- """Once all contacts have been processed, allows for any tidy-up/additional
- enrichment. If any contacts are updated at this stage, 'updated' should
- be added to."""
+ def _register_match(self, contact, friend):
+ self._friends_without_contact.discard(friend)
+ self._friends_by_contact[contact] = friend
+ self._contacts_by_friend[friend] = contact
- if True or self._autocreate: # FIXME - remove True or
- for friend in self._should_create:
- print "Need to autocreate: %s (%s)" % (friend._attributes, friend._multi_attributes)
+ # -----------------------------------------------------------------------
+ def _get_friends_data(self):
+ """Returns a list of dicts, where each dict represents a friend/contact"""
+
+ return self.fb.users.getInfo(self.fb.friends.get(), Service.attrs)
# public accessors -----------------
+ def get_name(self):
+ return self._attributes['fn']
+
++ def get_nickname(self):
++ return self._attributes["nickname"]
++
def add_url(self, url):
self._add('url', url)
self.consumer = oauth.OAuthConsumer(api_key, secret_key)
self.sig_method = oauth.OAuthSignatureMethod_HMAC_SHA1()
-- self._friends = None
++ self._friends_by_name = None
self._friends_by_url = {}
self._friends_by_contact = {}
"""Return a list of friends from this service, or 'None' if manual mapping
is not supported."""
-- if self._friends:
-- return self._friends.values()
++ if self._friends_by_name:
++ return self._friends_by_name.values()
# FIXME: Check the available OAuth session is still valid...
# get data
-- self._friends = {}
++ self._friends_by_name = {}
xml = self._make_api_request(self.LI_CONN_API_URL)
dom = parseString(xml)
self._parse_dom(dom)
-- print "get_friends found", len(self._friends)
-- return self._friends.values()
++ print "get_friends found", len(self._friends_by_name)
++ return self._friends_by_name.values()
# -----------------------------------------------------------------------
def finalise(self, updated):
if self._autocreate or True:
-- for f in self._friends.values():
++ for f in self._friends_by_name.values():
if f not in self._friends_by_contact.values():
# FIXME: create friends as contact
print "Could/should create contact here for %s" % f
# -----------------------------------------------------------------------
def _match_contact_by_identifiers(self, contact, friend):
for id in contact.get_identifiers():
-- if id in self._friends:
-- matched_friend = self._friends[id]
++ if id in self._friends_by_name:
++ matched_friend = self._friends_by_name[id]
if matched_friend in self._friends_by_contact.values():
print "Avoiding assigning same friend to two contacts: %s " % matched_friend
else:
if photo_url: friend.set_photo_url(photo_url)
key = canonical(name)
-- self._friends[key] = friend
++ self._friends_by_name[key] = friend
self._friends_by_url[public_url] = friend
# except:
# continue
import gnome.gconf
import org.maemo.hermes.engine.provider
import org.maemo.hermes.engine.twitter.service
++import twitter
class Provider(org.maemo.hermes.engine.provider.Provider):
"""Twitter provider for Hermes.
username = self._gconf.get_string("/apps/maemo/hermes/twitter_user") or ''
password = self._gconf.get_string("/apps/maemo/hermes/twitter_pwd") or ''
--
++
++ api = twitter.Api(username=username, password=password)
++
return org.maemo.hermes.engine.twitter.service.Service(username, password, gui_callback)
--import org.maemo.hermes.engine.service
--
--import twitter
from org.maemo.hermes.engine.names import canonical
--from org.maemo.hermes.engine.friend import Friend
++import org.maemo.hermes.engine.service
class Service(org.maemo.hermes.engine.service.Service):
"""Twitter backend for Hermes.
Copyright (c) Andrew Flegg <andrew@bleb.org> 2010.
++ Copyright (c) Fredrik Wendt <fredrik@wendt.se> 2010.
Released under the Artistic Licence."""
# -----------------------------------------------------------------------
-- def __init__(self, username, password, gui_callback = None):
-- """Initialise the Twitter service, using the given credentials and
-- having a gui_callback available."""
++ def __init__(self, twitterApi):
++ self._twitter = twitterApi
-- self._gui = gui_callback
-- self._username = username
-- self._password = password
--
-- self._friends = None
++ self._friends_by_name = {}
self._friends_by_url = {}
self._friends_by_contact = {}
++ self._friends = []
++ self._known_urls = set()
# -----------------------------------------------------------------------
# -----------------------------------------------------------------------
- def get_friends(self):
- """Return a list of friends from this service, or 'None' if manual mapping
- is not supported."""
-
- if self._friends:
- return self._friends.values()
-
- self._friends = {}
-
- api = twitter.Api(username=self._username, password=self._password)
- for tweeter in api.GetFriends():
- key = canonical(tweeter.name)
- url = 'http://twitter.com/%s' % (tweeter.screen_name)
- friend = Friend(tweeter.name)
++ def pre_process_contact(self, contact):
++ """Registers URLs of all previous mappings, and makes sure that any Friends with those
++ URLs don't get match by name."""
++ for url in contact.get_urls():
++ self._known_urls.add(url)
++
++
++ # -----------------------------------------------------------------------
+ def process_friends(self):
- """Return a list of friends from this service, or 'None' if manual mapping
- is not supported."""
-
- if self._friends:
- return self._friends.values()
-
- self._friends = {}
-
- api = twitter.Api(username=self._username, password=self._password)
- for tweeter in api.GetFriends():
- key = canonical(tweeter.name)
- url = 'http://twitter.com/%s' % (tweeter.screen_name)
- friend = Friend(tweeter.name)
++ for tweeter in self._get_tweeters():
++ key = canonical(tweeter.name)
++ url = 'http://twitter.com/%s' % (tweeter.screen_name)
++ friend = self._create_friend(tweeter.name)
friend.set_nickname(tweeter.screen_name)
friend.add_url(url)
friend.add_url(tweeter.url)
if '/default_profile' not in tweeter.profile_image_url:
friend.set_photo_url(tweeter.profile_image_url)
-- self._friends[key] = friend
++ self._friends.append(friend)
self._friends_by_url[url] = friend
--
-- return self._friends.values()
++ if url not in self._known_urls:
++ self._friends_by_name[key] = friend
# -----------------------------------------------------------------------
-- def pre_process_contact(self, contact):
-- if not self._friends:
-- self.get_friends()
--
-- for url in contact.get_urls():
-- if url in self._friends_by_url:
-- matched_friend = self._friends_by_url[url]
-- self._friends_by_contact[contact] = matched_friend
--
--
-- # -----------------------------------------------------------------------
-- def process_contact(self, contact, friend):
-- """Called for each contact in the address book. Any friends linked to
-- from the contact should have their matching updated. The backend should
-- enrich the contact with any meta-data it can; and return 'True' if any
-- information was updated. If 'friend' is provided, the information should
-- be retrieved from friend. This will be one of the entries from
-- 'get_friends' when manual mapping is being done."""
--
-- if not self._friends:
-- self.get_friends()
--
++ def process_contact(self, contact):
if self._friends_by_contact.has_key(contact):
-- friend.update(self._friends_by_contact[contact])
return
-- for id in contact.get_identifiers():
-- if id in self._friends:
-- matched_friend = self._friends[id]
-- print contact.get_name(), " -> ", matched_friend
-- if matched_friend in self._friends_by_contact.values():
-- print "COLLISSION avoided for %s" % friend
-- else:
-- print "Match found %s" % friend
-- self._friends_by_contact[contact] = matched_friend
-- friend.update(matched_friend)
--
++ if self._match_contact_to_friend_by_urls(contact):
++ return
++
++ self._match_contact_to_friend_by_identifiers(contact)
# -----------------------------------------------------------------------
-- def finalise(self, updated, overwrite = False):
-- """Once all contacts have been processed, allows for any tidy-up/additional
-- enrichment. If any contacts are updated at this stage, 'updated' should
-- be added to."""
--
++ def finalise(self, updated, overwrite=False):
pass
++
++
++ # -----------------------------------------------------------------------
++ def get_contacts_with_match(self):
++ return self._friends_by_contact
++
++
++ # -----------------------------------------------------------------------
++ def get_unmatched_friends(self):
++ return self._friends
++
++
++ # -----------------------------------------------------------------------
++ def _get_tweeters(self):
++ return self._twitter.GetFriends()
++
++
++ def _match_contact_to_friend_by_urls(self, contact):
++ for url in contact.get_urls():
++ if url in self._friends_by_url:
++ matched_friend = self._friends_by_url[url]
++ self._register_match(contact, matched_friend)
++ print contact.get_name(), " -> url -> ", matched_friend
++ return True
++
++ return False
++
++
++ def _match_contact_to_friend_by_identifiers(self, contact):
++ for id in contact.get_identifiers():
++ if id in self._friends_by_name:
++ matched_friend = self._friends_by_name[id]
++ print contact.get_name(), " -> name -> ", matched_friend
++ self._register_match(contact, matched_friend)
++ return True
++
++ return False
++
++
++ def _register_match(self, contact, friend):
++ self._friends_by_contact[contact] = friend
++ self._friends.remove(friend)
++
--- /dev/null
--- /dev/null
++#!/usr/bin/python2.5
++
++import evolution
++from org.maemo.hermes.engine.contact import Contact
++from org.maemo.hermes.engine.friend import Friend
++from org.maemo.hermes.gui.console import ConsoleUICallback
++import org.maemo.hermes.engine.facebook.provider
++import org.maemo.hermes.engine.twitter.provider
++import org.maemo.hermes.engine.gravatar.provider
++import org.maemo.hermes.engine.linkedin.provider
++
++
++providers = [
++ org.maemo.hermes.engine.facebook.provider.Provider(),
++ org.maemo.hermes.engine.twitter.provider.Provider(),
++ org.maemo.hermes.engine.gravatar.provider.Provider(),
++ org.maemo.hermes.engine.linkedin.provider.Provider(),
++]
++
++ui = ConsoleUICallback()
++
++# assemble services, least important (prioritized) first
++print "+++ Initialising Services"
++services = []
++for provider in providers:
++ print "Using %s" % (provider.get_name())
++ services.append(provider.service(ui))
++
++contacts = []
++updated_contacts = set()
++unmatched_contacts = set()
++overwrite = False
++
++print "\n+++ Pre processing"
++addresses = evolution.ebook.open_addressbook('default')
++for econtact in addresses.get_all_contacts():
++ contact = Contact(addresses, econtact)
++ contacts.append(contact)
++ for service in services:
++ print " Pre processing %s with %s" % (contact, service.get_name())
++ service.pre_process_contact(contact)
++
++print "\n+++ Processing"
++for contact in contacts:
++ friend = Friend(contact.get_name())
++ for service in services:
++ print " Processing %s with %s" % (friend, service.get_name())
++ service.process_contact(contact, friend)
++ if friend.is_empty():
++ print "No service matched %s\n" % friend
++ unmatched_contacts.add(contact)
++ else:
++ print " Updating contact data for %s\n" % friend
++ friend.update_contact(contact, overwrite)
++ updated_contacts.add(contact)
++
++print "\n+++ Finalising"
++for service in services:
++ print " Finalising service %s" % service.get_name()
++ service.finalise(updated_contacts)
--- /dev/null
--- /dev/null
++given
++ clean addressbook
++ friend "Adam" on linkedin
++ friend "Adam" on twitter
++when
++ full run
++then
++ addressbook should contain one newly created contact, with data from both services
++
++
++given
++ clean addressbook
++ 100 friends on linkedin
++ 100 friends on twitter, out of which 50 are the same people (have the same name) as on linkedin
++when
++ full run
++then
++ addressbook should contain 150 contacts (not 200)
++
++
++# this will probably not be too uncommon if a person really has an empty address book and runs this the first
++# time. We don't want that person to end up with many duplicates
++
++
++given
++ contact "Fredrik Wendt"
++ friend "Fredrik L. Wendt" on facebook
++when
++ full processing the contact
++then
++ the contact should be matched with the friend
++
++
++given
++ clean addressbook
++ friend "twitter" on twitter
++ friend "linkedin" on linkedin
++ friend "facebook" on facebook
++ friend "plaxo" on plaxo
++when
++ full run
++then
++ addressbook should contain 4 contacts
++
++# process_contact should not rely on pre_process_contact being called
++
++
++
++given
++ clean addressbook
++ friend "adam" on twitter, with homepage set to "http://facebook/user/adam"
++ friend "adam" on facebook (with ID url "http://facebook/user/adam")
++when
++ twitter is processed
++ and facebook is processed
++then
++ addressbook should contain one contact with two website URLs (twitter and facebook)
++
++
--- /dev/null
--- /dev/null
++given
++ clean addressbook
++ friend "Adam" on linkedin
++ friend "Adam" on twitter
++when
++ full run
++then
++ addressbook should contain one newly created contact, with data from both services
++
++
++given
++ clean addressbook
++ 100 friends on linkedin
++ 100 friends on twitter, out of which 50 are the same people (have the same name) as on linkedin
++when
++ full run
++then
++ addressbook should contain 150 contacts (not 200)
++
++
++# this will probably not be too uncommon if a person really has an empty address book and runs this the first
++# time. We don't want that person to end up with many duplicates
++
++
++given
++ contact "Fredrik Wendt"
++ friend "Fredrik L. Wendt" on facebook
++when
++ full processing the contact
++then
++ the contact should be matched with the friend
++
++
++given
++ clean addressbook
++ friend "twitter" on twitter
++ friend "linkedin" on linkedin
++ friend "facebook" on facebook
++ friend "plaxo" on plaxo
++when
++ full run
++then
++ addressbook should contain 4 contacts
++
++# process_contact should not rely on pre_process_contact being called
++
++
++
++given
++ clean addressbook
++ friend "adam" on twitter, with homepage set to "http://facebook/user/adam"
++ friend "adam" on facebook (with ID url "http://facebook/user/adam")
++when
++ twitter is processed
++ and facebook is processed
++then
++ addressbook should contain one contact with two website URLs (twitter and facebook)
++
++
--- /dev/null
--- /dev/null
++from org.maemo.hermes.engine.facebook.service import Service
++from org.maemo.hermes.engine.names import canonical
++import unittest
++
++
++class FakeContact():
++ id_counter = 0
++ def __init__(self, name, addr, id=None):
++ self.name = name;
++ self.urls = addr
++ self.id = id or FakeContact.id_counter
++ FakeContact.id_counter = FakeContact.id_counter + 1;
++ def get_urls(self):
++ return self.urls
++ def get_name(self):
++ return self.name
++ def get_identifiers(self):
++ return [canonical(self.name)]
++
++class TestFacebookService(unittest.TestCase):
++
++ def setUp(self):
++ self.testee = Service(None)
++
++ def test_main_flow_one_match_by_url_one_by_name(self):
++ # arrange
++ self.existing_address = 'http://www.facebook.com/profile.php?id=123456'
++ self.existing_contact = FakeContact("Facebook Person", [self.existing_address])
++ existing_fake = {'uid':'123456','name':'Name Doesnt Match but URL Does'}
++
++ self.other_address = 'http://twitter.com/not/correct/site'
++ self.other_contact = FakeContact("Twitter Person", [self.other_address])
++ other_fake = {'uid':'123','name':self.other_contact.get_name()}
++
++ self.none_contact = FakeContact("No URLson", [])
++
++ fake_data = [existing_fake, other_fake]
++ self._fake_server_response(fake_data)
++
++ # act
++ self._run_service([self.existing_contact, self.other_contact, self.none_contact])
++
++ # assert
++ friends = self.testee.get_friends()
++ contacts = self.testee.get_contacts_with_match()
++ assert len(friends) == 2
++ assert len(contacts) == 2
++ assert self.other_contact in contacts
++ assert self.existing_contact in contacts
++ assert self.none_contact not in contacts
++
++
++ def test_name_collision_avoided_by_previous_matching(self):
++ contact_do_match = FakeContact("Same Name", ["http://www.facebook.com/profile.php?id=123"], 1);
++ contact_no_match = FakeContact("Same Name", [None], 2)
++
++ data = [{'uid':'123','name':'Same Name'}]
++ self._fake_server_response(data)
++
++ self._run_service([contact_no_match, contact_do_match])
++
++ assert len(self.testee.get_unmatched_friends()) == 0
++ matchers = self.testee.get_contacts_with_match().keys()
++ assert len(matchers) == 1
++ assert matchers[0].id == 1
++
++ def test_name_collision_avoided_only_one_person_matched(self):
++ contact_do_match = FakeContact("Same Name", ["http://twitter.com/same_name"]);
++ contact_no_match = FakeContact("Same Name", [None])
++
++ data = [{'uid':'123','name':'Same Name'}]
++ self._fake_server_response(data)
++
++ self._run_service([contact_no_match, contact_do_match])
++
++ matchers = self.testee.get_contacts_with_match().keys()
++ assert len(matchers) == 1
++ assert len(self.testee.get_unmatched_friends()) == 0
++
++
++ def _run_service(self, contacts):
++ for contact in contacts:
++ self.testee.pre_process_contact(contact)
++ self.testee.process_friends()
++ for contact in contacts:
++ self.testee.process_contact(contact)
++
++ def _fake_server_response(self, data):
++ self.testee._get_friends_data = self._get_friends_data
++ self._server_response = data
++
++ def _get_friends_data(self):
++ return self._server_response
++
++
++
++if __name__ == '__main__':
++ unittest.main()
--- /dev/null
--- /dev/null
++from org.maemo.hermes.engine.gravatar.service import Service
++from org.maemo.hermes.engine.friend import Friend
++import unittest
++
++class FakeContact():
++ def __init__(self, addr):
++ self.urls = addr
++ def get_emails(self):
++ return self.urls
++ def get_name(self):
++ return self.urls[0]
++
++class TestGravatarService(unittest.TestCase):
++
++ def setUp(self):
++ self._setUp('maemohermes@wendt.se', 'b14ec179822b')
++
++ def test_main_flow(self):
++ self.testee.pre_process_contact(self.existing_contact)
++ self.testee.pre_process_contact(self.missing_contact)
++ self.testee.process_friends()
++ self.testee.process_contact(self.existing_contact)
++ self.testee.process_contact(self.missing_contact)
++
++ friends = self.testee.get_friends()
++ contacts = self.testee.get_contacts_with_match()
++ assert len(friends) == 1
++ assert len(contacts) == 1
++ assert self.missing_contact not in contacts.keys()
++ assert self.existing_contact in contacts.keys()
++ assert friends[0].get_name() == self.existing_contact.get_name()
++
++
++ def test_that_a_person_with_two_addresses_and_one_gravatar_works(self):
++ self._fake_server_response({self.missing_address: None, self.existing_address: "http://url.to.img/"})
++
++ self.testee.pre_process_contact(self.multiple_contact)
++ self.testee.process_friends()
++ self.testee.process_contact(self.multiple_contact)
++
++ friends = self.testee.get_friends()
++ contacts = self.testee.get_contacts_with_match()
++ assert len(friends) == 1
++ assert len(contacts) == 1
++ assert self.multiple_contact in contacts
++ assert self.missing_contact not in contacts
++ assert self.existing_contact not in contacts.keys()
++ assert friends[0].get_name() == self.existing_contact.get_name()
++
++
++ def _fake_server_response(self, map):
++ self.testee._get_hash_info_from_server = self._get_hash_info_from_server
++ # in real results the addresses hashed, so we'll add that here, and keep originals for easier debugging/inspection
++ for key in map.keys():
++ hash = self.testee._get_hash_for_address(key)
++ map[hash] = map[key]
++ self._server_response = map
++
++ def _get_hash_info_from_server(self, args, url):
++ self._server_args = args
++ self._server_url = url
++ return self._server_response
++
++ def _setUp(self, email, key):
++ self.testee = Service(email, key)
++
++ self.existing_address = 'fredrik@wendt.se'
++ self.existing_contact = FakeContact([self.existing_address])
++ self.existing_friend = Friend("Fredrik Wendt")
++
++ self.missing_address = 'will_not_ever_exist_i_truly_hope_at_least@wendt.se'
++ self.missing_contact = FakeContact([self.missing_address])
++ self.missing_friend = Friend("Unknown Person")
++
++ self.multiple_contact = FakeContact([self.existing_address, self.missing_address])
++ self.multiple_friend = Friend("Another Person")
++
++
++if __name__ == '__main__':
++ unittest.main()
--- /dev/null
--- /dev/null
++import unittest
++from syncjob import Syncjob
++
++class FakeContact():
++ def __init__(self, name, addr):
++ self.name = name
++ self.urls = [addr]
++ def get_emails(self):
++ return self.urls
++ def get_name(self):
++ return self.name
++
++class FakeService():
++ def __init__(self):
++ self.contacts_preprocessed = []
++ self.contacts_processed = []
++ def get_name(self):
++ return "fake service"
++ def pre_process_contact(self, contact):
++ self.contacts_preprocessed.append(contact)
++ def process_friends(self):
++ pass
++ def process_contact(self, contact):
++ self.contacts_processed.append(contact)
++
++
++class TestSyncJob(unittest.TestCase):
++
++ def setUp(self):
++ self.service = FakeService()
++ self.services = [self.service]
++ self.contact = FakeContact('Fredrik Wendt', 'fredrik@wendt.se')
++ self.contacts = [self.contact]
++ self.testee = Syncjob(self.services, self.contacts)
++
++ def test_main_flow(self):
++ self.testee.run()
++ assert self.contact in self.service.contacts_preprocessed
++ assert self.contact in self.service.contacts_processed
++
++if __name__ == '__main__':
++ unittest.main()
--- /dev/null
--- /dev/null
++from org.maemo.hermes.engine.twitter.service import Service
++from org.maemo.hermes.engine.names import canonical
++import unittest
++
++class FakeContact():
++ def __init__(self, name, addr):
++ self.name = name
++ self.urls = addr
++ def get_urls(self):
++ return self.urls
++ def get_name(self):
++ return self.name
++ def get_identifiers(self):
++ return [canonical(self.name)]
++
++class FakeTweeter():
++ def __init__(self, name, screen_name, props=None):
++ self._props = {}
++ self["name"] = name
++ self["screen_name"] = screen_name
++ if props:
++ for key in props:
++ self[key] = props[key]
++ def __setitem__(self, key, value):
++ self._props[key] = value
++ def __getattr__(self, key):
++ try:
++ return self._props[key]
++ except:
++ return ""
++
++
++class TestTwitterService(unittest.TestCase):
++
++ def setUp(self):
++ self.testee = Service(None)
++
++
++ def test_main_flow(self):
++ twitterName = "Twitter Name"
++ self._fake_server([FakeTweeter(twitterName, "wendtse", {"url":"http://wendt.se"})])
++ contact = FakeContact("Fredrik Wendt", ["http://twitter.com/wendtse"])
++
++ self._exercise_service([contact])
++
++ assert len(self.testee.get_unmatched_friends()) == 0
++ matchers = self.testee.get_contacts_with_match()
++ assert contact in matchers
++ assert twitterName == matchers[contact].get_name()
++
++
++ def test_name_collision_avoided_by_previous_matching(self):
++ twitterName = "Same Name"
++ known_tweeter = FakeTweeter(twitterName, "samename")
++ other_tweeter = FakeTweeter(twitterName, "otherscreenname")
++ self._fake_server([other_tweeter, known_tweeter])
++ contact = FakeContact(twitterName, [])
++
++ self._exercise_service([contact])
++
++ assert len(self.testee.get_unmatched_friends()) == 1
++ matchers = self.testee.get_contacts_with_match()
++ assert contact in matchers
++
++
++ def test_name_collision_avoided_only_one_person_matched(self):
++ twitter_name = "Same Name"
++ screen_name = "samename"
++ known_tweeter = FakeTweeter(twitter_name, screen_name)
++ other_tweeter = FakeTweeter(twitter_name, "otherscreenname")
++ self._fake_server([other_tweeter, known_tweeter])
++ contact = FakeContact(twitter_name, ["http://twitter.com/" + screen_name])
++
++ self._exercise_service([contact])
++
++ assert len(self.testee.get_unmatched_friends()) == 1
++ matchers = self.testee.get_contacts_with_match()
++ assert contact in matchers
++ assert screen_name == matchers[contact].get_nickname()
++
++
++ def _exercise_service(self, contacts):
++ for contact in contacts:
++ self.testee.pre_process_contact(contact)
++ self.testee.process_friends()
++ for contact in contacts:
++ self.testee.process_contact(contact)
++
++ def _fake_server(self, data):
++ self._fake_data = data
++ self.testee._get_tweeters = self._get_tweeters
++
++ def _get_tweeters(self):
++ return self._fake_data
++
++if __name__ == '__main__':
++ unittest.main()