#!/usr/bin/env python
"""Facilitates the migration of users from one server to another.

More comment here...
"""

# These standard libs should all load fine
import sys
import os
import binascii
import string
import time
import logging
import traceback
import ConfigParser
import threading
import struct

try:
    import threadpool
except ImportError:
    print "Threadpool library is missing."
    print "See http://www.chrisarndt.de/projects/threadpool for instructions on how"
    print "to install it."
    print "Or try: easy_install threadpool."
    print ""
    sys.exit(1)

try:
    import MAPI
    from MAPI.Util import *
except ImportError:
    print "The MAPI bindings appear to be absent. Make sure to install the"
    print "Zarafa python binding package."
    print ""
    sys.exit(1)


__author__ = "Mark Swaanenburg"
__copyright__ = "Copyright 2010, Zarafa"
__credits__ = ["Mark Swaanenburg", "Others"]
__license__ = "Proprietary"
__version__ = "0.1.0"
__maintainer__ = "Mark Swaanenburg"
__email__ = "m.swaanenburg@zarafa.com"
__status__ = "Beta/Unstable"


MUIDECSAB = DEFINE_GUID(0x50a921ac, 0xd340, 0x48ee, 0xb3, 0x19, 0xfb, 0xa7, 0x53, 0x30, 0x44, 0x25)
def DEFINE_ABEID(type, id):
    return struct.pack("4B16s3I4B", 0, 0, 0, 0, MUIDECSAB, 0, type, id, 0, 0, 0, 0)
EID_EVERYONE = DEFINE_ABEID(MAPI_DISTLIST, 1)


class NoSuchStoreError(Exception):
    def __init__(self): pass

class PathError(Exception):
    def __init__(self): pass

class ServerCollisionError(Exception):
    def __init__(self): pass
    
class Cancelled(Exception):
    def __init__(self): pass
    
    
def synchronized(lock):
    """ Synchronization decorator. """

    def wrap(f):
        def newFunction(*args, **kw):
            lock.acquire()
            try:
                return f(*args, **kw)
            finally:
                lock.release()
        return newFunction
    return wrap


class UserContext:
    """Encapsulates all information about a user store on a particular server."""
    
    def __init__(self, session, store, user):
        self.session = session
        self.store = store
        self.root = self.store.OpenEntry(None, IID_IMAPIFolder, MAPI_BEST_ACCESS|MAPI_DEFERRED_ERRORS)
        self.user = user
        self.logger = logging.getLogger("UserContext[%s]" % self.user.replace('.', '_'))

    def __getattr__(self, attr):
        if attr == 'StoreEntryID':
            return self.store.GetProps([PR_ENTRYID], 0)[0].Value
        elif attr == 'RootEntryID':
            return self.root.GetProps([PR_ENTRYID], 0)[0].Value
        elif attr == 'UserEntryID':
            if self.user == '__public__':
                return EID_EVERYONE
            return self.store.GetProps([PR_MAILBOX_OWNER_ENTRYID], 0)[0].Value
        elif attr == 'DisplayName':
            return self.store.GetProps([PR_DISPLAY_NAME], 0)[0].Value
        elif attr == 'SyncRoot':
            return self.root
        elif attr == 'SyncRootEntryID':
            return self.SyncRoot.GetProps([PR_ENTRYID], 0)[0].Value
        elif attr == 'Hierarchy':
            def gen():
                table = self.SyncRoot.GetHierarchyTable(CONVENIENT_DEPTH)
                table.SetColumns([PR_ENTRYID, PR_FOLDER_TYPE], TBL_BATCH)
                while True:
                    rows = table.QueryRows(16, 0)
                    if len(rows) == 0:
                        break
                    for row in rows:
                        if row[1].Value == FOLDER_GENERIC:
                            yield self.root.OpenEntry(row[0].Value, IID_IMAPIFolder, MAPI_BEST_ACCESS|MAPI_DEFERRED_ERRORS)
            return gen()
        elif attr == 'User':
            return self.user
        elif attr == 'IsPublic':
            mdb_provider = self.store.GetProps([PR_MDB_PROVIDER], 0)[0]
            return mdb_provider.ulPropTag == PR_MDB_PROVIDER and mdb_provider.Value == ZARAFA_STORE_PUBLIC_GUID;
        elif attr == 'ServerGUID':
            return self.store.GetProps([0x67c20102], 0)[0].Value
        else:
            raise AttributeError()

    def __repr__(self):
        return "%s: %s - %s" % (self.DisplayName, binascii.hexlify(self.StoreEntryID), binascii.hexlify(self.RootEntryID))

    def SyncHierarchyTo(self, other, streamstorage):
        """Synchronize the hierarchy from this contexts store to another.
        
        other: The UserContext containing the store to synchronize the hierarchy to.
        streamstorage: A PathBasedStreamStorage object that contains the state streams.
        """
        syncprops = [
            PR_ENTRYID,
            PR_CONTAINER_CLASS,
            PR_DISPLAY_NAME,
            PR_COMMENT,
            PR_IPM_APPOINTMENT_ENTRYID,
            PR_IPM_CONTACT_ENTRYID,
            PR_IPM_DRAFTS_ENTRYID,
            PR_IPM_JOURNAL_ENTRYID,
            PR_IPM_NOTE_ENTRYID,
            PR_IPM_TASK_ENTRYID,
            PR_REM_ONLINE_ENTRYID,
            PR_ADDITIONAL_REN_ENTRYIDS,
            PR_FREEBUSY_ENTRYIDS,
            PR_RULES_DATA,
            PR_FOLDER_XVIEWINFO_E,
            PR_FOLDER_DISPLAY_FLAGS,
            PR_IPM_OL2007_ENTRYIDS,
            PR_ACL_DATA
        ]
        
        exporter = self.SyncRoot.OpenProperty(PR_HIERARCHY_SYNCHRONIZER, IID_IExchangeExportChanges, 0, 0)
        importer = other.SyncRoot.OpenProperty(PR_COLLECTOR, IID_IExchangeImportHierarchyChanges, 0, 0)
        stream = streamstorage.LoadStateStream(binascii.hexlify(self.SyncRootEntryID))
        exporter.Config(stream, SYNC_NORMAL, importer, None, syncprops, None, 64)
        
        self.logger.debug("Starting hierarchy synchronization")
        step = 0
        while True:
            (steps, step) = exporter.Synchronize(step)
            self.logger.debug("Synchronized folder %d from %d" % (step, steps))
            if (steps == step):
                break;
            
        stream = IStream()
        exporter.UpdateState(stream)
        streamstorage.SaveStateStream(binascii.hexlify(self.SyncRootEntryID), stream)
        self.logger.debug("Hierarchy synchronized")

    def CopyReceiveFoldersTo(self, other):
        """Sets the receive folders of the other contexts store based on those of the current.
        
        other: The UserContext containing the other store.
        """
        self.logger.debug("Copying receive folders")
        try:
            table = self.store.GetReceiveFolderTable(0)
            table.SetColumns([PR_MESSAGE_CLASS,PR_ENTRYID], 0)
            rows = table.QueryRows(16, 0)
            for row in rows:
                try:
                    self.logger.debug("Copying receive folder for class '%s'" % row[0].Value)
                    other.store.SetReceiveFolder(row[0].Value, 0, row[1].Value)
                except MAPIError, err:
                    self.logger.warning("Failed to set receive folder for message class '%s' (err: %s)" % (row[0].Value, err))
            self.logger.debug("Receive folders copied")
        except MAPIError, err:
            if err.hr != MAPI_E_NO_SUPPORT:
                raise
            self.logger.debug("Store does not support receive folders")

    def CopyProps(self, fromObj, toObj):
        tags = fromObj.GetPropList(0)        
        tags.append(PR_EC_PUBLIC_IPM_SUBTREE_ENTRYID)	#Only does something for the public store
        props = fromObj.GetProps(tags, 0)

        # Copy all the small properties
        try:
            toObj.SetProps(props)
        except MAPIError:
            pass

        # Go through the properties to copy the large properties
        for prop in zip(tags, props):
            if PROP_TYPE(prop[1].ulPropTag) == PT_ERROR and prop[1].Value == MAPI_E_NOT_ENOUGH_MEMORY:
                try:
                    fromStream = fromObj.OpenProperty(prop[0], IID_IStream, None, 0)
                    toStream = toObj.OpenProperty(prop[0], IID_IStream, None, MAPI_CREATE|MAPI_MODIFY)
                    fromStream.CopyTo(toStream, 0xffffff)
                except MAPIError:
                    pass


    def CopyUnsyncablePropsTo(self, other):
        """Copies 'unsyncable' properties from the current context to another.
        
        other: The UserContext to sync the properties to.
        
        The 'unsyncable' properties are those found in the store and the root container.
        """
        self.logger.debug("Copying store properties")
        self.CopyProps(self.store, other.store)

        self.logger.debug("Copying root properties")
        self.CopyProps(self.root, other.root)

    def GetChangeAdvisor(self):
        """Get a change advisor for the current context.
        """
        return self.store.OpenProperty(PR_EC_CHANGE_ADVISOR, IID_IECChangeAdvisor, 0, 0)

    def GetFolder(self, entryID):
        """Get the folder specified by the passed entryid.
        
        entryID: The entryid that specifies the folder to open.
        """
        return self.store.OpenEntry(entryID, IID_IMAPIFolder, MAPI_BEST_ACCESS|MAPI_DEFERRED_ERRORS)

    def GetImporterFor(self, entryID):
        """ Get an change importer for the folder specified by the passed entryid.
        
        entryID: The enrtyid for the folder to get the change importer for.
        """
        folder = self.store.OpenEntry(entryID, IID_IMAPIFolder, MAPI_BEST_ACCESS|MAPI_DEFERRED_ERRORS)
        importer = folder.OpenProperty(PR_COLLECTOR, IID_IExchangeImportContentsChanges, 0, 0)
        return importer




class AdminSession:
    lock = threading.Lock()     # static lock object (shared between instances)
    
    @synchronized(lock)
    def __init__(self, server = None, **keywords):
        if server is None:
            server = os.getenv('ZARAFA_SOCKET')
            
        self.session = OpenECSession('SYSTEM', '', server, sslkey_file=keywords['sslkey_file'], sslkey_pass=keywords['sslkey_pass'], flags = 0)
        self.store = GetDefaultStore(self.session);
        self.logger = logging.getLogger("AdminSession[%s]" % server.replace('.', '_'))

    def __getattr__(self, attr):
        if attr == 'ServerGUID':
            return self.store.GetProps([0x67c20102], 0)[0].Value
        else:
            raise AttributeError()

    def GetUserContext(self, user, default = True):
        try:
            if user == '__public__':
                flags = 0
                if not default:
                    flags = EC_OVERRIDE_HOMESERVER
                    
                sa = self.store.QueryInterface(IID_IECServiceAdmin)
                storeEid = sa.GetPublicStoreEntryID(flags)
            else:
                flags = 0
                if not default:
                    flags = 0x10
                    
                ema = self.store.QueryInterface(IID_IExchangeManageStore)
                storeEid = ema.CreateStoreEntryID(None, user, flags)
            return UserContext(self.session, self.session.OpenMsgStore(0, storeEid, IID_IMsgStore, MAPI_BEST_ACCESS), user)
            
        except MAPIError, err:
            if err.hr == MAPI_E_NOT_FOUND:
                self.logger.debug("Store for user '%s' not found" % user)
                raise NoSuchStoreError()
            self.logger.error("Failed to open store for user '%s'" % user)
            raise

    def CloneStoreFrom(self, srcContext, streamstorage):
        sa = self.store.QueryInterface(IID_IECServiceAdmin)
        store_type = ECSTORE_TYPE_PRIVATE 
        if srcContext.IsPublic:
            store_type = ECSTORE_TYPE_PUBLIC 
        sa.CreateEmptyStore(store_type, srcContext.UserEntryID, EC_OVERRIDE_HOMESERVER, srcContext.StoreEntryID, srcContext.RootEntryID)
        
        # The store EntryID returned from CreateEmptyStore is the EntryID we passed in the first place. We can't
        # use that because it contains the pseudo URL to the wrong server.
        dstContext = self.GetUserContext(srcContext.User, False)
        
        # We can't set receivefolders to folders that don't exist. So we need to sync the hierarchy first.
        srcContext.SyncHierarchyTo(dstContext, streamstorage)
        srcContext.CopyReceiveFoldersTo(dstContext)
        
        srcContext.CopyUnsyncablePropsTo(dstContext)

        return dstContext


        

class Migrator:
    def __init__(self, args, streamstorage):
        self.user = args['user']
        self.target = args['server']
        self.streamstorage = streamstorage
        self.advisesink = args['sinkmonitor'].GetChangeAdviseSink(self)
        self.exitcontrol = args['exitcontrol']
        self.connection = args['connection']
        self.relations = {}
        self.logger = logging.getLogger("Migrator[%s]" % self.user.replace('.', '_'))
        
    def EnableMonitor(self):
        self.advisesink.EnableMonitor()

    def LoadStateStream(self, entryid):
        return self.streamstorage.LoadStateStream(entryid)

    def SaveStateStream(self, entryid, stream):
        self.streamstorage.SaveStateStream(entryid, stream)
        
    def Initialize(self):
        self.logger.debug("Opening default admin session")
        srcAdminSession = AdminSession(self.connection.get('serverpath', 'file:///var/run/zarafa'), sslkey_file=self.connection.get('sslkey_file', None), sslkey_pass=self.connection.get('sslkey_pass', None))
        
        self.logger.debug("Opening admin session to '%s'" % self.target)
        dstAdminSession = AdminSession(self.target, sslkey_file=self.connection.get('sslkey_file', None), sslkey_pass=self.connection.get('sslkey_pass', None))
        self.streamstorage.SetServerGuids(srcAdminSession.ServerGUID, dstAdminSession.ServerGUID)

        self.logger.debug("Opening source store")
        srcContext = srcAdminSession.GetUserContext(self.user)
        dstContext = None
        try:
            # Resolve user store without redirection
            self.logger.debug("Opening destination store")
            dstContext = dstAdminSession.GetUserContext(self.user, False)
        except NoSuchStoreError:
            # Create a new store on the target server.
            self.logger.debug("Opening destination store failed")
            self.logger.debug("Cloning store")
            dstContext = dstAdminSession.CloneStoreFrom(srcContext, self.streamstorage)

        if srcContext.ServerGUID == dstContext.ServerGUID:
            raise ServerCollisionError()

        # Update migrator state
        self.srcContext = srcContext
        self.dstContext = dstContext
        
    def SynchronizeFolder(self, srcFolder):
        foldername = srcFolder.GetProps([PR_DISPLAY_NAME], 0)[0].Value
        self.logger.debug("Synchronizing folder: %s" % foldername)
        entryID = srcFolder.GetProps([PR_ENTRYID], 0)[0].Value
        stream = self.LoadStateStream(entryID)

        importer = self.dstContext.GetImporterFor(entryID)
        exporter = srcFolder.OpenProperty(PR_CONTENTS_SYNCHRONIZER, IID_IExchangeExportChanges, 0, 0)
        exporter.Config(stream, SYNC_NORMAL | SYNC_ASSOCIATED | SYNC_READ_STATE, importer, None, [PR_ENTRYID], None, 64)

        try:
            step = 0
            while True:
                if self.exitcontrol.exit_pending:
                    raise Cancelled()
                steps, step = exporter.Synchronize(step);
                self.logger.debug("%s: %u / %u" % (foldername, step, steps))
                if steps == step:
                    break;

        finally:
            if not stream:
                stream = IStream()
            exporter.UpdateState(stream)
            self.SaveStateStream(entryID, stream)   # Persist the state stream
            
        # Make sure to map the syncid to the entryID.
        state = SyncState(stream)
        self.relations[state.syncid] = srcFolder
        # Make sure the change advisor is monitoring the folder
        try:
            self.advisor.IsMonitoringSyncId(state.syncid)
        except MAPIError:
            self.logger.debug("Adding syncstate '%s' to change advisor" % state)
            self.advisor.AddKeys([state.key])        
        
        return state

    def SynchronizeFolderById(self, syncid):
        if self.relations.has_key(syncid):
            srcFolder = self.relations[syncid]
            self.advisesink.UpdateSyncState(self.SynchronizeFolder(srcFolder))

    def SyncOneWay(self):
        # Get stream for change notifications
        stream = None
        
        self.advisor = self.srcContext.GetChangeAdvisor()
        self.advisor.Config(stream, None, self.advisesink, 0)
        
        self.srcContext.SyncHierarchyTo(self.dstContext, self.streamstorage)
        [self.SynchronizeFolder(folder) for folder in self.srcContext.Hierarchy]
            


def ReadUInt(stream):
    """Read one UInt from a binary stream.
    """
    return struct.unpack("I", stream.Read(4))[0]


class SyncState:
    """Represent a syncstate.
    
    A syncstate is defined as a four byte syncid and a four byte change id concatenated
    in a binary stream.
    """
    def __init__(self,arg):
        """Initialize a SyncState object
        
        arg: Can be an IStream object containing at least eight bytes or a struct
             containing the syncid and changeid.
        """
        if type(arg) == type(IStream()):
            clone = arg.Clone()
            clone.Seek(0, BOOKMARK_BEGINNING)
            self.syncid = ReadUInt(clone)
            self.changeid = ReadUInt(clone)
        else:
            self.syncid, self.changeid = struct.unpack("II", arg)

    def __getattr__(self, attr):
        if attr == 'syncid':
            return self.syncid
        elif attr == 'changeid':
            return self.changeid
        elif attr == 'key':
            return struct.pack("II", self.syncid, self.changeid)
        else:
            raise AttributeError

    def __repr__(self):
        return "SyncState: syncid=%u, changeid=%u" % (self.syncid, self.changeid)



class AdviseSink(ECChangeAdviseSink):
    def __init__(self, migrator, collection):
        ECChangeAdviseSink.__init__(self, [IID_IECChangeAdviseSink])
        self.migrator = migrator
        self.collection = collection
        self.lock = threading.Lock()
        self.monitored = False
        self.events = {}

    def OnNotify(self, flags, entries):
        if len(entries) > 0:
            states = (SyncState(entry) for entry in entries)
            newevents = dict([state.syncid, state.changeid] for state in states)
            self.lock.acquire()
            try:
                if self.monitored == True:
                    self.collection.AddEvents(self, newevents)
                else:
                    self.events.update(newevents)
            finally:
                self.lock.release()              
        return 0
    
    def EnableMonitor(self):
        self.lock.acquire()
        try:
            if (self.monitored == False):
                self.monitored = True
                self.collection.AddEvents(self, self.events)
                self.events = None
        finally:
            self.lock.release()

    def UpdateSyncState(self, state):
        self.collection.UpdateState(self, state)

    def SynchronizeFolder(self, syncid):
        # Use a lock to make sure a Migrator isn't used on multiple worker threads simultaneously.
        self.lock.acquire()
        try:
            self.migrator.SynchronizeFolderById(syncid)
        finally:
            self.lock.release()


class AdviseSinkMonitor:
    def __init__(self):
        self.condition = threading.Condition()
        self.doExit = False
        self.sinkevents = {}
        self.logger = logging.getLogger("AdviseSinkMonitor")

    def GetChangeAdviseSink(self, migrator):
        return AdviseSink(migrator, self)

    def AddEvents(self, sink, newevents):
        self.logger.debug("Adding %u new events from sink %s" % (len(newevents), sink.__hash__))
        self.condition.acquire()
        try:
            if (self.sinkevents.has_key(sink)):
                events = self.sinkevents[sink]
            else:
                self.sinkevents[sink] = events = {}
            events.update(newevents)
            self.condition.notify()
        finally:
            self.condition.release()
    
    def UpdateState(self, sink, state):
        self.logger.debug("Updating state for sink %s, syncid=%u, changeid=%u" % (sink.__hash__, state.syncid, state.changeid))
        self.condition.acquire()
        try:
            events = self.sinkevents.get(sink, None)
            if events and events.has_key(state.syncid) and events[state.syncid] <= state.changeid:
                del events[state.syncid]
                if not events:
                    del self.sinkevents[sink]
        finally:
            self.condition.release()
    
    def Cancel(self):
        self.logger.debug("Canceling AdviseSinkMonitor...")
        self.condition.acquire()
        try:
            self.doExit = True
            self.condition.notify()
        finally:
            self.condition.release()

    def Run(self, pool):
        self.logger.debug("AdviseSinkMonitor started")
        self.condition.acquire()
        try:
            while not self.doExit:
                while not self.doExit and not self.sinkevents:
                    self.logger.debug("Waiting for events...")
                    self.condition.wait()
                if not self.doExit:
                    self.logger.debug("Get events for %u sinks" % len(self.sinkevents))
                    requests = threadpool.makeRequests(self.ProcessSink, ([item] for item in self.sinkevents.items()))
                    self.sinkevents = {}
                    [pool.putRequest(req) for req in requests]
        finally:
            self.condition.release()
    
    def ProcessSink(self, args):
        sink, events = args[0]
        self.logger.debug("Processing %u events for sink %s" % (len(events), sink.__hash__))
        [sink.SynchronizeFolder(syncid) for syncid in events.keys()]
                
            


class PathBasedStreamStorage:
    def __init__(self, basepath):
        self.logger = logging.getLogger("PathBasedStreamStorage")
        self.basepath = os.path.abspath(basepath)
        if os.path.exists(self.basepath) and not os.path.isdir(self.basepath):
            self.logger.critical("'%s' exists but is not a directory!" % self.basepath)
            raise PathError()
        elif not os.path.exists(self.basepath):
            self.logger.debug("Creating '%s'" % self.basepath)
            os.mkdir(self.basepath)
        self.curpath = None

    def SetServerGuids(self, srcGuid, dstGuid):
        self.logger.debug("srcGuid=%s" % binascii.hexlify(srcGuid))
        self.logger.debug("dstGuid=%s" % binascii.hexlify(dstGuid))
        self.curpath = '%s/%s-%s' % (self.basepath, binascii.hexlify(srcGuid), binascii.hexlify(dstGuid))
        if os.path.exists(self.curpath) and not os.path.isdir(self.curpath):
            self.logger.critical("'%s' exists but is not a directory!" % self.curpath)
            raise PathError()
        elif not os.path.exists(self.curpath):
            self.logger.debug("Creating '%s'" % self.curpath)
            os.mkdir(self.curpath)

    def LoadStateStream(self, entryid):
        self.logger.debug("Loading state for '%s'" % binascii.hexlify(entryid))
        filename = '%s/%s.state' % (self.curpath, binascii.hexlify(entryid))
        stream = None
        try:
            f = open(filename, 'rb')
            bin = f.read()
            stream = IStream()
            stream.Write(bin)
        except IOError:
            pass
        return stream

    def SaveStateStream(self, entryid, stream):
        self.logger.debug("Saving state for '%s'" % binascii.hexlify(entryid))
        filename = '%s/%s.state' % (self.curpath, binascii.hexlify(entryid))
        clone = stream.Clone()
        clone.Seek(BOOKMARK_BEGINNING, 0)
        bin = clone.Read(0xffffff)
        f = open(filename, 'wb')
        f.write(bin)



class ExitControl:
    def __init__(self):
        self.doExit = False
        self.logger = logging.getLogger("ExitControl")
    
    def initiate_exit(self):
        self.logger.debug("Exit initiated...")
        self.doExit = True
    
    def __getattr__(self, attr):
        if attr == 'exit_pending':
            return self.doExit
        else:
            raise AttributeError()
        
        
        
class ProcessTracker:
    def __init__(self):
        self.lock = threading.Lock()
        self.started = 0
        self.completed = 0
        self.failed = 0
        self.canceled = 0
        
    def __getattr__(self, attr):
        if attr == 'finished':
            return self.completed + self.failed + self.canceled
        elif attr == 'completed':
            return self.completed
        elif attr == 'failed':
            return self.failed
        elif attr == 'canceled':
            return self.canceled
        else:
            raise AttributeError()
    
    def start_migration(self, args):
        self.lock.acquire()
        try:
            self.started += 1
        finally:
            self.lock.release()
        
        ss = PathBasedStreamStorage(os.getenv('HOME') + '/.zarafa-migrate-store')
        logging.info("Starting to migrate user '%s' to '%s'" % (args['user'], args['server']))
    
        m = Migrator(args, ss)
        m.Initialize()
        m.SyncOneWay()
        
        logging.info("Maintaining sync for user '%s'..." % args['user'])
        m.EnableMonitor()
        
    def migration_completed(self, request, result):
        self.lock.acquire()
        try:
            self.completed += 1
        finally:
            self.lock.release()
        
        logging.info("Initial migration completed for user '%s'" % request.args[0]['user'])
    
    def migration_exception(self, request, exc_info):
        if exc_info[0] is Cancelled:
            self.lock.acquire()
            try:
                self.canceled += 1
            finally:
                self.lock.release()
            
            logging.info("Migration for user '%s' was canceled" % request.args[0]['user'])
        else:
            self.lock.acquire()
            try:
                self.failed += 1
            
                logging.error("Migration for user '%s' failed:" % request.args[0]['user'])
                lines = traceback.format_exception(exc_info[0], exc_info[1], exc_info[2])
                for line in lines:
                    [logging.error("exc_info: %s" % s) for s in string.split(string.rstrip(line, '\n'), '\n')]
            finally:
                self.lock.release()
            


def main(argv = None):
    if argv is None:
        argv = sys.argv

    if len(argv) != 2:
        print >> sys.stderr, 'Usage: <config file>'
        return 1
    
    sinkmonitor = AdviseSinkMonitor()
    exitcontrol = ExitControl()    
    tracker = ProcessTracker()
    
    config = ConfigParser.SafeConfigParser()
    config.read(argv[1])
    
    userlist = ({'user': x[0], 'server': x[1] % dict(config.items('Servers')), 'sinkmonitor': sinkmonitor, 'exitcontrol': exitcontrol, 'connection': dict(config.items('Connection'))} for x in config.items('Mapping', True))
    requests = threadpool.makeRequests(tracker.start_migration, userlist, tracker.migration_completed, tracker. migration_exception)
    
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(levelname)-8s %(name)s: %(message)s',
                        datefmt='%m-%d %H:%M',
                        filename='./migrate-store.log',
                        filemode='a')
    
    console = logging.StreamHandler()
    console.setLevel(logging.INFO)
    
    formatter = logging.Formatter('%(levelname)-8s: %(message)s')
    console.setFormatter(formatter)
    
    logging.getLogger('').addHandler(console)

    logging.debug("Creating threadpool with 4 workers")
    pool = threadpool.ThreadPool(4)
    
    logging.debug("Adding %u tasks to the threadpool" % len(requests))
    [pool.putRequest(req) for req in requests]
    pool.putRequest(threadpool.WorkRequest(sinkmonitor.Run, [pool], None))    
    
    message_displayed = False
    while 1: 
        try: 
            time.sleep(0.5)
            pool.poll()
            if tracker.failed == len(requests):
                logging.critical("All migrations have failed, exiting!")
                sinkmonitor.Cancel()     # Cancel if all initial migrations failed
                break
            elif tracker.completed == len(requests):
                if message_displayed == False:
                    message_displayed = True
                    logging.info("All migrations have completed successfully, maintaining sync.")
            elif tracker.finished == len(requests):
                if message_displayed == False:
                    message_displayed = True
                    if tracker.failed > 0:
                        logging.warning("%u migrations have failed!" % tracker.failed)
                    if tracker.canceled > 0:
                        logging.warning("%u migrations were canceled!" % tracker.canceled)
                    if tracker.completed > 0:
                        logging.info("%u migrations have completed successfully, maintaining sync." % tracker.completed) 
                
        except KeyboardInterrupt:
            print ""    # Get a newline after the ^C
            logging.info("User requested exit...")
            exitcontrol.initiate_exit()
            sinkmonitor.Cancel()
        except threadpool.NoResultsPending: 
            break

    logging.debug("Threadpool ready, waiting for workers to exit")
    pool.dismissWorkers(4, True)
    logging.debug("Done")


if __name__ == '__main__':
    sys.exit(main())
