# myplugs/bork.py
#
# 

__copyright__ = 'this file is in the public domain'

from gozerbot.generic import rlog, geturl, toenc, fromenc, waitforqueue, \
strippedtxt, handle_exception, now, geturl2, getpostdata, useragent, \
exceptionmsg, lockdec
from gozerbot.rsslist import rsslist
from gozerbot.datadir import datadir
from gozerbot.pdod import Pdod
from gozerbot.persiststate import PersistState
from gozerbot.persistconfig import PersistConfig
from gozerbot.commands import cmnds
from gozerbot.plugins import plugins
from gozerbot.users import users
from gozerbot.fleet import fleet
from gozerbot.thr import start_new_thread
from gozerbot.plughelp import plughelp
from gozerbot.examples import examples
from gozerbot.dol import Dol
from gozerbot.config import config
from gozerbot.ircevent import Ircevent
from gozerbot.threadloop import ThreadLoop
from gozerbot.plugins import plugins
from gozerbot.periodical import periodical
from gozerbot.rest.server import RestServer, RestServerAsync, RestRequestHandler
from gozerbot.rest.client import RestClient, RestResult, RestClientAsync
from gozerbot.contrib.simplejson import loads, dumps
from gozerbot.contrib.BeautifulSoup import BeautifulSoup
from gozerplugs.plugs.infoitem import info
from gozerplugs.plugs.quote import quotes
from asyncore import dispatcher
from xml.sax.saxutils import unescape
import Queue, time, socket, re, os, types, urllib, httplib, asyncore, \
asynchat, select, sys, thread, random

plughelp.add('bork', 'enter the bork!')

bork = PersistState(datadir + os.sep + 'bork')
bork.define('nodes', [])
bork.define('names', {})
borkcfg = PersistConfig()
borkcfg.define('enable', 1)
borkcfg.define('wait', 5)
borkcfg.define('host' , socket.gethostbyname(socket.getfqdn()))
borkcfg.define('name' , socket.getfqdn())
borkcfg.define('port' , 9999)

borklock = thread.allocate_lock()
locked = lockdec(borklock)

if not users.getname('web@web'):
    users.add('web', ['web@web', ], ['WEB', ])

waitre = re.compile(' wait (\d+)', re.I)
borkserver = None

borkhp = "%s:%s" % (borkcfg.get('host'), borkcfg.get('port'))
borkurl = "http://%s" % borkhp

def bork_nodes_GET(server, request):
    return dumps(borknet.getnodes())

def bork_ping_GET(server, request):
    return dumps('pong')

def bork_join_POST(server, request):
    try:
        (host, port) = request.client_address
    except:
        return ["can't determine host/port", ]
    try:
        input = getpostdata(request)
        borkport = input['port']
        borkname = input['name']
    except KeyError:
        rlog(10, host, 'no port number or name provided')
        return dumps('no port number/name provided')
    if borknet.byname(borkname):
        rlog(10, host, '%s name already taken' % borkname)
        return dumps('node name already taken')
    hp = "%s:%s" % (host, borkport)
    url = 'http://%s/' % hp
    borknet.addifping(borkname, url)
    return dumps('node added')

def bork_dispatch_POST(server, request):
    """ dispatch web request """
    try:
        input = getpostdata(request)
        cmnd = input['cmnd']
    except KeyError:
        return dumps(['need cmnd value', ])
    rlog(10, 'bork', 'dispatch request %s from <%s>' % (cmnd, \
request.client_address))
    bot = fleet.getfirstbot()
    ievent = Ircevent()
    ievent.txt = cmnd
    ievent.nick = 'web'
    ievent.userhost = 'web@web'
    ievent.channel = 'web'
    q = Queue.Queue()
    ievent.queues.append(q)
    ievent.speed = 3
    ievent.bot = bot
    result = []
    if plugins.woulddispatch(bot, ievent):
        start_new_thread(plugins.trydispatch, (bot, ievent))
    else:
        return dumps(["can't dispatch %s" % cmnd, ])
    result = waitforqueue(q, 60)
    if not result:
        return dumps(["can't dispatch %s" % cmnd, ])
    res = []
    for item in result:
        res.append(str(item))
    return dumps(res)

class BorkNode(object):

    def __init__(self, name, url):
        self.name = name
        self.url = url
        self.seen = time.time()
        self.synced = time.time()
        self.regtime = time.time()
        self.filter = []
        self.catch = []
        self.lasterrortime = None
        self.lasterror = ""
        self.insocket = None

    def __str__(self):
        return "name=%s url=<%s> seen=%s" % (self.name, self.url, self.regtime)

    def query(self, what, cb):
        if what.startswith('/'):
            what = what[1:]
        client = BorkClient(self.url + 'borknet/' + what, self.name).addcb(cb)
        client.get()

    def dispatch(self, cmnd, cb):
        client = BorkClient(self.url + 'borknet/+dispatch/', self.name).addcb(cb)
        client.dispatch(cmnd)

    def ping(self, cb):
        client = BorkClient(self.url + 'borknet/+ping/', self.name).addcb(cb)
        client.get()

class BorkNet(object):

    def __init__(self):
        self.nodes = {}
        self.state = Pdod(datadir + os.sep + 'bork.state')
        self.startup = Pdod(datadir + os.sep + 'bork.startup')
        if not self.state.has_key('ignore'):
            self.state['ignore'] = []
        if not self.state.has_key('names'):
            self.state['names'] = {}
        if not self.startup.has_key('start'):
            self.startup['start'] = {}

    def addifping(self, name, url):
        if not url.endswith('/'):
            url += '/'
        node = BorkNode(name, url)
        def cb(client, result):
            if result.error:
                rlog(10, url, 'failed to receive pong .. not adding: %s' % \
result.error)
                return    
            if 'pong' in result.data:
                self.add(name, url)
            else:
                rlog(10, url, 'invalid ping data')
        node.ping(cb)

    def add(self, name, url, persist=True):
        if not url.endswith('/'):
            url += '/'
        node = BorkNode(name, url)
        self.nodes[url] = node 
        self.state.set('names', name,  url)
        if persist:
            self.persist(name, url)
        rlog(10, 'bork', 'added %s node <%s>' % (name, url))
        return self.nodes[url]

    def start(self):
        self.boot()
        for name, url in self.startup['start'].iteritems():
            self.add(name, url)
        self.joinall()

    def persist(self, name, url):
        self.startup.set('start', name, url)
        self.startup.save()

    def get(self, url):
        return self.nodes[url]

    def byname(self, name):
        try:
            url = self.state['names'][name]
            if url:
                return self.get(url)
        except KeyError:
            return None

    def remove(self, id):
        target = self.get(id)
        if not target:
            return
        del self.state['names'][target.name]
        del self.nodes[id]

    def unpersist(self, url):
        try:
            del self.startup['start'][url]
            self.startup.save()
        except KeyError:
            pass
            
    def ignore(self, url):
        self.state['ignore'].append(url)

    def unignore(self, url):
        self.state.data['ignore'].remove(url)
        self.state.save()

    def stop(self):
        pass

    def queryall(self, what, cb):
        for url, node in self.nodes.iteritems():
            node.query(what, cb)

    def dispatch(self, cmnd, cb):
        for url, node in self.nodes.iteritems():
            node.dispatch(cmnd, cb)
        start_new_thread(asyncore.loop, (), {'use_poll': True })

    def getnodes(self):
        result = []
        for url, node in self.nodes.iteritems():
            result.append((node.name, url))
        return result

    def getname(self, url):
        return self.get(url).name

    def join(self, url=None):
        def cb(client, result):
            if result.error:
                rlog(10, client.url, 'join error: %s' % result.error)
            else:
                rlog(10, client.url, 'joined')
        if url:
            client = BorkClient("%s/borknet/+join/" % url).addcb(cb)                
        else:
            client = BorkClient('http://213.196.7.59:9999/borknet/+join/').addcb(cb)
        client.add(name=borkcfg.get('name'), port=borkcfg.get('port'))

    def joinall(self):
        for item in self.nodes.values():
           self.join(item.url)
        start_new_thread(asyncore.loop, (), {'use_poll': True })

    def boot(self, node=None):
        if node:
            self.sync(node)
        else:
            self.sync('http://213.196.7.59:9999')
        rlog(10, 'bork', 'booted %s nodes' % self.size())

    def fullboot(self):
        teller = 0
        threads = []
        for node in self.nodes.values():
            self.sync(node.url, False)
            teller += 1
        return teller

    def sync(self, url, throw=True):
        """ sync cache with node """
        def cb(client, result):
            if result.error:
                rlog(10, self.url, "can't sync: %s" % result.error)
                return
            for node in result.data:
                borknode = self.add(node[0], node[1])
                borknode.synced = time.time()
        BorkClient('%s/borknet/nodes/' % url).addcb(cb).get()

    def size(self):
        return len(self.nodes)

    def list(self):
        res = []
        for node in self.nodes.values():
            res.append(str(node))
        return res

    def names(self): 
        return self.state['names'].keys()

class BorkClient(RestClientAsync, asynchat.async_chat):

    def add(self, *args, **kwargs):
        self.sendpost(kwargs)

    def get(self):
        self.sendget()

    def dispatch(self, cmnd):
        kwargs = {}
        kwargs['cmnd'] = cmnd
        self.sendpost(kwargs)

class BorkAggregator(ThreadLoop):

    def __init__(self):
        ThreadLoop.__init__(self, 'borkaggregator')
        self.results = {}
        self.outre = re.compile('(/S+)\:s(/S)+s')

    def handle(self, id, client, cmnd, result):
        if not self.results.has_key(id):
            self.results[id] = []
        self.results[id].append((client, cmnd, result))

    def get(self, id):
        result = self.results[id]
        self.results[id] = []
        return result

    def output(self, id, ievent):
        res = self.get(id)
        if res:
            out = []
            for r in res:
                client, cmnd, result = r
                if result.data:
                    if result.data not in out:
                        out.append(result.data)
            ievent.reply(out, dot=True)

    def aggregate(self, id, ievent):
        res = self.get(id)
        if res:
            agg = {}
            aggl = []
            for r in res:
                client, cmnd, result = r
                if result.data:
                    if type(result.data) == types.ListType:
                       for item in result.data:
                           try:
                               name, nr = item.split(':')
                               if not agg.has_key(name):
                                   agg[name] = int(nr)
                               else:
                                   agg[name] += int(nr)
                           except ValueError:
                               if item not in aggl:
                                   aggl.append(item)
                    else:
                        if result.data not in aggl:
                            aggl.append(result.data)
            if agg:
                ievent.reply('results => ', agg)
            else:
                ievent.reply('results => ', aggl, dot=True)

borknet = BorkNet()
borkaggregator = BorkAggregator()

def init():
    """ init the borknet plugin """
    global borkserver
    if not borkcfg.get('enable'):
        return 1
    try:
        borkserver = RestServerAsync((borkcfg.get('host'), borkcfg.get('port')), \
RestRequestHandler)
        if borkserver:
            rlog(10, 'bork', 'server running at %s:%s' % \
(borkcfg.get('host'), borkcfg.get('port')))
            borkserver.start()
            borkserver.addhandler('/borknet/nodes/', 'GET', bork_nodes_GET)
            borkserver.addhandler('/borknet/+ping/', 'GET', bork_ping_GET)
            borkserver.addhandler('/borknet/+join/', 'POST', bork_join_POST)
            borkserver.addhandler('/borknet/+dispatch/', 'POST', bork_dispatch_POST)
            borknet.add(borkhp, borkurl)
            borknet.start()
            borkaggregator.start()
            start_new_thread(asyncore.loop, (), {'use_poll': True })
            rlog(10, 'bork', 'total of %s nodes' % size())
    except Exception, ex:
        handle_exception()

def shutdown():
    """ shutdown the borknet plugin """
    global borkserver
    if not borkserver:
        return 1
    try:
        borkaggregator.stop()
        borkserver.stop = True
        borkserver.server_close()
        time.sleep(2)
    except Exception, ex:
        handle_exception()
        pass
    return 1

def size():
    """ return number of borknet nodes """
    return borknet.size()

def handle_borkping(bot, ievent):
    """ do a ping on a borknet node """
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    try:
        name = ievent.args[0]
    except IndexError:
        ievent.missing('<name>')
        return
    node = borknet.byname(name)
    if not node:
        ievent.reply('there is not node named %s' % name)
        return
    def cb(client, result):
        if result.error:
            ievent.reply('%s is not alive: %s' % (name, result.error))
            return
        if 'pong' in result.data:
            ievent.reply('%s is alive' % name)
    node.ping(cb)

cmnds.add('bork-ping', handle_borkping, 'OPER')
examples.add('bork-ping', 'ping a borknet node', 'bork-ping gozerbot')

def handle_borklist(bot, ievent):
    """ bork-list .. list all nodes in cache """
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    ievent.reply("borknet nodes: ", borknet.list(), dot=' \002||\002 ')

cmnds.add('bork-list', handle_borklist, 'OPER')
examples.add('bork-list', 'list nodes cache', 'bork-list')

def handle_borkenable(bot, ievent):
    """ bork-enable .. enable the borknet """
    ievent.reply('enabling borknet')
    borkcfg.set('enable', 1)
    bork.save()
    plugins.reload('myplugs', 'bork')
    ievent.reply('done')

cmnds.add('bork-enable', handle_borkenable, 'OPER')
examples.add('bork-enable', 'enable the borknet', 'bork-enable')

def handle_borkdisable(bot, ievent):
    """ bork-disable .. disable the borknet """
    borkcfg.set('enable', 0)
    bork.save()
    plugins.reload('myplugs', 'bork')
    ievent.reply('borknet disabled')

cmnds.add('bork-disable', handle_borkdisable, 'OPER')
examples.add('bork-disable', 'disable the borknet', 'bork-disable')

def handle_borksync(bot, ievent):
    """ bork-sync <node> .. sync nodes cache with node """ 
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    try:
        url = ievent.args[0]
    except IndexError:
        ievent.missing('<url>')
        return
    start = time.time()
    borknet.sync(url)
    time.sleep(5)
    teller = 0
    for node in borknet.nodes.values():
        if node.synced > start:
            teller += 1
    ievent.reply('%s nodes synced' % str(teller))

cmnds.add('bork-sync', handle_borksync, 'OPER', allowqueue=False)
examples.add('bork-sync', 'bork-sync <url> .. sync with provided node', \
'bork-sync http://gozerbot.org:9999')

def handle_borkdispatch(bot, ievent):
    """ dispatch <cmnd> on nodes """
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    if not ievent.rest:
        ievent.missing('[--node <nodename>] <path>')
        return
    starttime = time.time()
    try:
        name = ievent.options['--node']
    except KeyError: 
        name = None
    try:
        wait = int(ievent.options['--w'])
    except KeyError: 
        wait = 3
    except ValueError:
        ievent.reply("%s is not an integer" % ievent.options['--w'])
        return
    cmnd = ievent.rest
    id = ievent.nick + str(random.random())
    def cb(client, result):
        if result.error:
            if '-e' in ievent.optionset:
                ievent.reply("%s: %s => %s" % (client.name, cmnd, result.error))
            return
        if '-d' in ievent.optionset:
            ievent.reply("%s: %s => " % (client.name, cmnd), result.data, dot=True)
        else:
            borkaggregator.put(id, client, cmnd, result)
    if name:
        node = borknet.byname(name)
        if not node:
            ievent.reply('there is no node named %s' % name)
            return
        ievent.reply('dispatching "%s" onto %s nodes' % (cmnd, borknet.size()))
        node.dispatch(cmnd, cb)
    else:
        ievent.reply('dispatching "%s" onto %s nodes' % (cmnd, borknet.size()))
        borknet.dispatch(cmnd, cb)
    if '-d' not in ievent.optionset:
        ievent.reply('waiting %s seconds' % wait)
        time.sleep(wait)
        borkaggregator.aggregate(id, ievent)

cmnds.add('dispatch', handle_borkdispatch, ['USER', ], allowqueue=False, \
options={'--node': '', '-e': '', '--w': '3', '-d': ''})
examples.add('dispatch', 'dispatch <cmnd> .. execute <cmnd> in the borknet', \
'dispatch lq 3')

def handle_borkaddnode(bot, ievent):
    """ bork-addnode <name> <host:port> .. add node to cache """
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    try:
        (name, url) = ievent.args
    except ValueError:
        ievent.missing('<name> <url>')
        return
    if not 'http://' in url or not url.endswith('/'):
        ievent.reply('invalid url')
        return
    client = RestClient(url)
    ip = socket.gethostbyname(client.host)
    url = "http://%s:%s%s" % (ip, client.port, client.path)
    borknet.add(name, url)
    borknet.persist(name, url)
    ievent.reply('%s added' % name)

cmnds.add('bork-add', handle_borkaddnode, 'OPER')
examples.add('bork-add', 'bork-add <name> <url> .. add a node to cache and \
persist it', 'bork-add gozerbot http://gozerbot.org:9999')

def handle_borkgetnode(bot, ievent):
    """ bork-getnode .. show node of <name>  """
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    try:
        name = ievent.args[0]
    except IndexError:
        ievent.missing('<name>')
        return
    node = borknet.byname(name)
    if not node:
        ievent.reply('there is no node named %s' % name)
        return
    ievent.reply(str(node))
 
cmnds.add('bork-getnode', handle_borkgetnode, 'OPER')
examples.add('bork-getnode', 'bork-getnode <name> .. get node of <name>', \
'bork-getnode gozerbot')

def handle_borknames(bot, ievent):
    """ bork-names .. show names with nodes in cache """
    if not borkcfg.get('enable'):
        ievent.reply('collective is not enabled')
        return
    ievent.reply("borknet node names: ", borknet.names(), dot=True)
 
cmnds.add('bork-names', handle_borknames, 'OPER')
examples.add('bork-names', 'show all node names', 'bork-names')

def handle_borkboot(bot, ievent):
    """ boot the borknet node cache """
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    try:
        url = ievent.args[0]
    except IndexError:
        url = 'http://213.196.7.59:9999'
    start = time.time()
    borknet.boot(url)
    time.sleep(5)
    teller = 0
    for node in borknet.nodes.values():
        if node.synced > start:
            teller += 1
    if teller:
        ievent.reply('borknet added %s nodes' % teller)
    else:
        ievent.reply("no new nodes added from %s" % url)
 
cmnds.add('bork-boot', handle_borkboot, 'OPER')
examples.add('bork-boot', 'sync borknet nodes list with provided host', \
'1) bork-boot 2) bork-boot http://localhost:8888')

def handle_borkfullboot(bot, ievent):
    """ bork-fullboot .. boot from all nodes in cache """
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    borknet.fullboot()
    ievent.reply('%s nodes checked .. current %s nodes in list' % (teller, \
size()))
 
cmnds.add('bork-fullboot', handle_borkfullboot, 'OPER')
examples.add('bork-fullboot', 'do a boot on every node in the borknet node \
list', 'bork-fullboot')

def handle_borkremove(bot, ievent):
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    if not ievent.rest:
        ievent.missing('<name>')
        return
    got = False
    try:
        url = borknet.state['names'][ievent.rest]
        if url:
            borknet.unpersist(url)
            borknet.remove(url)
            got = True
    except Exception, ex:
        ievent.reply('error removing %s: %s' % (ievent.rest, str(ex)))
        return
    if got:
        ievent.reply('%s node removed' % ievent.rest)
    else:
        ievent.reply('error removing %s node' % ievent.rest)

cmnds.add('bork-remove', handle_borkremove, 'OPER')
examples.add('bork-remove', 'remove node with <name> from borknet' , \
'bork-remove gozerbot')

def handle_borkjoin(bot, ievent):
    if not borkcfg.get('enable'):
        ievent.reply('borknet is not enabled')
        return
    if not ievent.rest:
        ievent.missing('<name>')
        return
    try:
        url = borknet.state['names'][ievent.rest]
        result = borknet.join(url)
    except Exception, ex:
        ievent.reply('error joining %s: %s' % (ievent.rest, str(ex)))
        return
    ievent.reply(result.data)

cmnds.add('bork-join', handle_borkjoin, 'OPER')
examples.add('bork-join', 'join node with <name>' , 'bork-join gozerbot')
