Skip to content

Commit

Permalink
Pass authentication credentials via constructor arguments.
Browse files Browse the repository at this point in the history
Allow for running without HeadquarterSubmitter by giving empty HQ URL.
  (useful for testing)
  • Loading branch information
kngenie committed Feb 14, 2016
1 parent 8f6e9bb commit bdd82eb
Showing 1 changed file with 27 additions and 27 deletions.
54 changes: 27 additions & 27 deletions wordpress/process-firehose.py
Expand Up @@ -175,19 +175,22 @@ def _get_batch(self):
return items

def run(self):
while 1:
if self.items is None:
self.items = self._get_batch()
if self.items:
try:
data = [o[1] for o in self.items]
self.receiver.put(data)
# TODO: write to journal
self.items = None
except Exception, ex:
logging.warn('%s.put() failed (%s), retrying...',
self.receiver, ex, exc_info=1)
gevent.sleep(5*60)
try:
while 1:
if self.items is None:
self.items = self._get_batch()
if self.items:
try:
data = [o[1] for o in self.items]
self.receiver.put(data)
# TODO: write to journal
self.items = None
except Exception, ex:
logging.warn('%s.put() failed (%s), retrying...',
self.receiver, ex, exc_info=1)
gevent.sleep(5*60)
except Exception as ex:
logging.error('Pipeline.run exiting by error', exc_info=1)

class StatSubmitter(object):
def __init__(self, carbon, basename, stats):
Expand Down Expand Up @@ -224,10 +227,11 @@ class FirehoseDownloader(object):
INITIAL_RETRY_INTERVAL = 10
MAX_RETRY_INTERVAL = 120 * 60

def __init__(self, endpoint, archiver, pipelines):
def __init__(self, endpoint, archiver, pipelines, auth=None):
self.endpoint = endpoint
self.archiver = archiver
self.pipelines = pipelines
self.auth = auth

self.retry_interval = self.INITIAL_RETRY_INTERVAL

Expand All @@ -244,11 +248,10 @@ def run(self):
# request authentication.
headers = {}
req = urllib2.Request(endpoint)
if options.auth:
auth = 'Basic {}'.format(base64.b64encode(options.auth).strip())
if self.auth:
auth = 'Basic {}'.format(base64.b64encode(self.auth).strip())
req.add_header('Authorization', auth)
opener = urllib2.build_opener(
#urllib2.HTTPBasicAuthHandler(password_manager)
)
try:
#f = opener.open(endpoint)
Expand Down Expand Up @@ -298,7 +301,7 @@ def run(self):

opt = OptionParser('%prog URL')
opt.add_option('--endpoint', dest='endpoint', default=None)
opt.add_option('-A', dest='auth', default='internetarchive:BigData')
opt.add_option('-A', dest='auth', default=None)
opt.add_option('-d', dest='arcdir', default='arcs')
opt.add_option('--carbon', dest='carbon', default=None)
opt.add_option('-L', dest='logfile', default=None)
Expand Down Expand Up @@ -345,25 +348,22 @@ def run(self):
else:
endpoint = args[0]

password_manager = urllib2.HTTPPasswordMgrWithDefaultRealm()
if options.auth:
user, passwd = options.auth.split(':', 1)
password_manager.add_password(None, endpoint, user, passwd)

arcdir = options.arcdir
if not os.path.isdir(arcdir):
opt.error('{}: no such directory'.format(arcdir))

archiver = Archiver(destdir=arcdir, prefix=options.prefix,
rollsize=int(1e9), rolldate='%Y%m%d')

pipelines = [
Pipeline(HeadquarterSubmitter(options.hq, options.hqjob), 'hq', 1000)
]
pipelines = []
if options.hq and options.hqjob:
pipelines.append(
Pipeline(HeadquarterSubmitter(options.hq, options.hqjob), 'hq', 1000)
)
for pl in pipelines:
gevent.spawn(pl.run)

downloader = FirehoseDownloader(endpoint, archiver, pipelines)
downloader = FirehoseDownloader(endpoint, archiver, pipelines, auth=options.auth)

g = gevent.spawn(downloader.run)
if options.carbon:
Expand Down

0 comments on commit bdd82eb

Please sign in to comment.