Skip to content

Commit

Permalink
Migrate to Python 3, refactoring
Browse files Browse the repository at this point in the history
Ported to Python 3. Tested on 3.5, not tested on Python 2.
Moved FirehoseDownloader to wpfeed package.
  • Loading branch information
kngenie committed Mar 13, 2020
1 parent 8343647 commit aebab7c
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 108 deletions.
134 changes: 27 additions & 107 deletions wordpress/process-firehose.py
Expand Up @@ -8,35 +8,30 @@

import os
import sys
import re
from optparse import OptionParser

import gevent
from gevent import socket
from gevent.event import Event
from gevent.queue import Queue, Empty

import httplib
import urllib2
from six.moves.http_client import BadStatusLine
import six.moves.urllib.request as urllib2
from six.moves.urllib.error import URLError
import json
import base64
import logging
import time
from datetime import datetime
from gzip import GzipFile
from ConfigParser import ConfigParser
from six.moves.configparser import ConfigParser

from wpfeed.firehose import FirehoseDownloader

"""
end point for WP-hosted blogs: 'http://xmpp.wordpress.com:8008/posts.json'
end point for self-hosted blogs: 'http://xmpp.wordpress.com:8008/posts.org.json'
"""

def printsafe(s):
if s is None: return None
if isinstance(s, unicode):
s = s.encode('utf-8')
return repr(s)[1:-1]

# TODO: these two classes were copied from Twitter archiver.
# move to reusable library (widecrawl?)
class ArchiveFile(object):
Expand All @@ -53,16 +48,16 @@ def close(self):
self.f = None
try:
os.rename(self.fn + '.open', self.fn)
except Exception, ex:
logging.warn('failed to rename %s.open to %s (%s)',
self.fn, self.fn, ex)
except Exception as ex:
logging.warning('failed to rename %s.open to %s (%s)',
self.fn, self.fn, ex)
def write_record(self, message):
"""message must be one whole streaming message."""
if self.f is None:
raise IOError, "attempted to write into closed file %s" % self.fn
raise IOError("attempted to write into closed file %s" % self.fn)
z = GzipFile(fileobj=self.f, mode='wb', compresslevel=self.complevel)
z.write(message)
z.write('\r\n')
z.write(b'\r\n')
z.close()
self.f.flush()

Expand Down Expand Up @@ -97,8 +92,11 @@ def _makefn(self):
return os.path.join(self.destdir, fn)

def archive_message(self, message):
"""
:type message: bytes
"""
if isinstance(message, dict):
message = json.dumps(message)
message = json.dumps(message).encode('utf-8')

if self.arc and self.rolldate is not None:
cdate = self.arc.ctime.strftime(self.rolldate)
Expand All @@ -123,7 +121,7 @@ def __init__(self, epbase, job):
def put(self, curls):
ep = '{}/{}/mdiscovered'.format(self.epbase, self.job)
headers = { 'Content-Type': 'text/json' }
data = json.dumps(curls)
data = json.dumps(curls).encode('utf-8')
req = urllib2.Request(ep, data, headers)
f = urllib2.urlopen(req, timeout=10)
if f.code != 200:
Expand Down Expand Up @@ -196,10 +194,10 @@ def run(self):
self.receiver.put(data)
# TODO: write to journal
self.items = None
except Exception, ex:
logging.warn('%s.put() failed (%s), retrying...',
self.receiver, ex, exc_info=1)
gevent.sleep(5*60)
except Exception as ex:
logging.warning('%s.put() failed (%s), retrying...',
self.receiver, ex, exc_info=1)
time.sleep(5*60)
except Exception as ex:
logging.error('Pipeline.run exiting by error', exc_info=1)

Expand All @@ -213,104 +211,26 @@ def __init__(self, carbon, basename, stats):

def run(self):
while 1:
gevent.sleep(5*60.0)
time.sleep(5*60.0)
msg = []
now = time.time()
for stat, value in self.stats.items():
msg.append('{}.{} {:.3f} {:.0f}\n'.format(
self.basename, stat, value, now))
self.basename, stat, value, now))

sock = socket.socket()
try:
sock.connect(self.server)
except:
logging.warn("couldn't connect to carbon server %s:%d",
*self.server)
logging.warning("couldn't connect to carbon server %s:%d",
*self.server)
continue
try:
sock.sendall(''.join(msg))
sock.sendall(''.join(msg).encode('ascii'))
sock.close()
except Exception, ex:
logging.warn("error writing to carbon server (%s)", ex)

class FirehoseDownloader(object):
RETRY_BACKOFF_FACTOR = 1.5
INITIAL_RETRY_INTERVAL = 10
MAX_RETRY_INTERVAL = 120 * 60

def __init__(self, endpoint, archiver, pipelines, auth=None):
self.endpoint = endpoint
self.archiver = archiver
self.pipelines = pipelines
self.auth = auth
except Exception as ex:
logging.warning("error writing to carbon server (%s)", ex)

self.retry_interval = self.INITIAL_RETRY_INTERVAL

# stats
self.stats = {
'connection.success': 0,
'connection.failure': 0,
'downloaded': 0
}

def run(self):
while 1:
# we cannot use HTTPBasicAuthHandler because server does not
# request authentication.
headers = {}
req = urllib2.Request(endpoint)
if self.auth:
auth = 'Basic {}'.format(base64.b64encode(self.auth).strip())
req.add_header('Authorization', auth)
opener = urllib2.build_opener(
)
try:
f = opener.open(req)
logging.info('firehose stream opened')
self.stats['connection.success'] += 1
self.retry_interval = self.INITIAL_RETRY_INTERVAL
except (urllib2.URLError, httplib.BadStatusLine, socket.error) as ex:
self.stats['connection.failure'] += 1
logging.warn('failed to open firehose stream (%s), '
'holding off %d seconds',
ex, self.retry_interval)
gevent.sleep(self.retry_interval)
self.retry_interval = min(
self.retry_interval * self.RETRY_BACKOFF_FACTOR,
self.MAX_RETRY_INTERVAL
)
logging.info('retrying connection')
continue

for line in f:
if line == '\n': continue
self.stats['downloaded'] += 1
self.archiver.archive_message(line)

try:
j = json.loads(line.rstrip())
except ValueError, ex:
logging.warn('JSON decode failed: %r', line)
continue

# TODO: make this one of pipelines?
verb = j.get('verb')
published = j.get('published')
blog = j.get('target')
blogurl = blog and blog.get('url')
post = j.get('object')
posturl = post and post.get('permalinkUrl')

print "{} {} {} {}".format(
published, verb,
printsafe(blogurl), printsafe(posturl)
)

for pl in self.pipelines:
pl.put(j)

logging.warn('firehose stream closed')
self.archiver.close()

opt = OptionParser('%prog URL')
opt.add_option('--endpoint', dest='endpoint', default=None)
Expand Down
3 changes: 2 additions & 1 deletion wordpress/setup.py
Expand Up @@ -7,10 +7,11 @@
author_email='kenji@archive.org',
description=("application to read wordpress update feed and schedule"
" permalink to crawler"),
#packages=find_packages(),
packages=find_packages(),
scripts=['process-firehose.py'],
install_requires=[
# requires libevent
'gevent>=0.13.6',
'six'
]
)
Empty file added wordpress/wpfeed/__init__.py
Empty file.
108 changes: 108 additions & 0 deletions wordpress/wpfeed/firehose.py
@@ -0,0 +1,108 @@
from __future__ import print_function

from six.moves.http_client import BadStatusLine
import six.moves.urllib.request as urllib2
from six.moves.urllib.error import URLError

import time
import socket
import base64
import json

import logging

class NullArchiver(object):
"""Do nothing archiver for testing.
"""
def archive_message(self, line):
pass

class PrintPipeline(object):
"""Pipline for testing.
"""
def put(self, item):
print(item)

def printsafe(s):
if s is None: return None
if not isinstance(s, bytes):
s = s.encode('utf-8')
return repr(s)[1:-1]

class FirehoseDownloader(object):
RETRY_BACKOFF_FACTOR = 1.5
INITIAL_RETRY_INTERVAL = 10
MAX_RETRY_INTERVAL = 120 * 60

def __init__(self, endpoint, archiver, pipelines, auth=None):
self.endpoint = endpoint
self.archiver = archiver or NullArchiver()
self.pipelines = pipelines
self.auth = auth

self.retry_interval = self.INITIAL_RETRY_INTERVAL

# stats
self.stats = {
'connection.success': 0,
'connection.failure': 0,
'downloaded': 0
}

def run(self):
while 1:
# we cannot use HTTPBasicAuthHandler because server does not
# request authentication.
req = urllib2.Request(self.endpoint)
if self.auth:
auth = b'Basic ' + base64.b64encode(self.auth.encode('ascii'))
req.add_header('Authorization', auth)
opener = urllib2.build_opener(
)
try:
f = opener.open(req)
logging.info('firehose stream %s opened', self.endpoint)
self.stats['connection.success'] += 1
self.retry_interval = self.INITIAL_RETRY_INTERVAL
except (URLError, BadStatusLine, socket.error) as ex:
self.stats['connection.failure'] += 1
logging.warning('failed to open firehose stream (%s), '
'holding off %d seconds',
ex, self.retry_interval)
time.sleep(self.retry_interval)
self.retry_interval = min(
self.retry_interval * self.RETRY_BACKOFF_FACTOR,
self.MAX_RETRY_INTERVAL
)
logging.info('retrying connection')
continue

for line in f:
if line == b'\n': continue
self.stats['downloaded'] += 1
self.archiver.archive_message(line)

try:
j = json.loads(line.rstrip().decode('utf-8'))
except ValueError as ex:
logging.warning('JSON decode failed: %r', line)
continue

# TODO: make this one of pipelines?
verb = j.get('verb')
published = j.get('published')
blog = j.get('target')
blogurl = blog and blog.get('url')
post = j.get('object')
posturl = post and post.get('permalinkUrl')

print("{} {} {} {}".format(
published, verb,
printsafe(blogurl), printsafe(posturl)
))

for pl in self.pipelines:
pl.put(j)

logging.warning('firehose stream closed')
self.archiver.close()

0 comments on commit aebab7c

Please sign in to comment.