Sign up
internetarchive
/
crawling-for-nomore404
Browse files
producer.py: don't exit upon Kafka communication errors
producer.py: do logging by itself, without relying on /home/ia/bin/logger
master
kngenie committed on Mar 23, 2016

1 parent bdd82eb commit a66f5f56f0a5e4415604e067cd64760ca3f49299
UnifiedSplit
Showing with 83 additions and 38 deletions.
114 wikipedia/producer/producer.py
@@ -3,49 +3,95 @@
import re
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
import argparse
import logging
import logging.config

logging​.​basicConfig​(​level​=​logging​.​INFO​)
parser = argparse.ArgumentParser()
parser​.​add_argument​(​'--kafka'​, help='kafka server host:port',
default​=​'crawl-db02.us.archive.org:9092'​)
parser.add_argument('--log', help='file to send logging output to')
parser​.​add_argument​(​'--failures'​, help='file to write lines that could not '
'send to Kafka')

args = parser.parse_args()

if args.log:
loghandler = {
'class': 'logging.handlers.RotatingFileHandler'​,
'formatter': 'default',
'filename': args.log,
'maxBytes': 50*1024*1024,
'backupCount': 3
}
else:
loghandler = {
'class': 'logging.StreamHandler',
'formatter': 'default',
'stream': 'ext://sys.stderr'
}
logconfig = dict(
version=1,
formatters={'default': {
'format': '%(asctime)s %(levelname)s %(message)s'
}},
handlers={'default': loghandler},
root={
'level': 'INFO',
'handlers': ['default']
}
)

logging​.​config​.​dictConfig​(​logconfig​)

failure_log = open(args.failures, "w") if args.failures else sys.stdout

TOPIC_IRC = "wiki-irc"
TOPIC_LINKS = "wiki-links"

kafka = KafkaClient​(​"crawl-db02.us.archive.org:9092"​)
#kafka = KafkaClient("crawl-hdfs07.us.archive.org:9092")
wikiIRCProducer = SimpleProducer(kafka, async=False,
req_acks​=​SimpleProducer​.​ACK_AFTER_LOCAL_WRITE​)
kafka = KafkaClient(args.kafka)

#wikiIRCProducer = SimpleProducer(kafka, "wiki-irc");
#wikiLinksProducer = SimpleProducer(kafka, "wiki-links");
wikiIRCProducer = SimpleProducer(
kafka, async=False,
req_acks​=​SimpleProducer​.​ACK_AFTER_LOCAL_WRITE​)

# wikiLinksProducer = SimpleProducer(kafka, "wiki-links", async=False,
# req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
wikiLinksProducer = wikiIRCProducer

#read in lines from wikipedia live monitor server
for line in sys.stdin:
try:
line.decode('utf-8')
except UnicodeDecodeError:
continue
line = line.strip()
if line:
kafkaSuccess = False
lineToConsider = False
if re​.​match​(​r'^Wiki-IRC-Message:.*$'​, line):
lineToConsider = True
message = line​.​replace​(​"Wiki-IRC-Message:"​,​""​,​1​);
response = wikiIRCProducer​.​send_messages​(​TOPIC_IRC​, message);
if response and response[0].error == 0:
kafkaSuccess = True
elif re​.​match​(​r'^Wiki-Links-Results:.*$'​, line):
lineToConsider = True
message = line​.​replace​(​"Wiki-Links-Results:"​,​""​,​1​);
response = wikiLinksProducer​.​send_messages​(​TOPIC_LINKS​, message);
if response and response[0].error == 0:
kafkaSuccess = True
if lineToConsider and not kafkaSuccess:
# FLUSH to disk
#print this line out so we can feed these lines from STDIN to this script at a later time (retry failed push)
print line
kafka.close()
try:
line.decode('utf-8')
except UnicodeDecodeError:
continue
line = line.strip()
if line:
kafkaSuccess = False
if line​.​startswith​(​'Wiki-IRC-Message:'​):
message = line​.​replace​(​"Wiki-IRC-Message:"​,​""​,​1​);
try:
response = wikiIRCProducer​.​send_messages​(​TOPIC_IRC​, message);
if response and response[0].error == 0:
kafkaSuccess = True
except KafkaError as ex:
logging.warn("failed to send to %s (%r)",
TOPIC_IRC, ex)
elif line​.​startswith​(​'Wiki-Links-Results:'​):
message = line​.​replace​(​"Wiki-Links-Results:"​,​""​,​1​);
try:
response = wikiLinksProducer​.​send_messages​(​TOPIC_LINKS​, message);
if response and response[0].error == 0:
kafkaSuccess = True
except KafkaError as ex:
logging.warn("failed to send to %s (%r)",
TOPIC_LINKS, ex)
else:
logging.warn("ignoring line: %r", line)
continue

if not kafkaSuccess:
# FLUSH to disk
# log failed lines so we can reprocess such lines by feeding
# this output to this script at a later time.
print >>failure_log, line

kafka.close()
5 wikipedia/producer/run-monitor.sh
@@ -4,6 +4,5 @@ WORKDIR=$BIN/..
export PATH=$BIN:$PATH
LOGDIR=$WORKDIR/logs
cd $WORKDIR
/home/ia/bin/logger -f $LOGDIR/retries.log -fe $LOGDIR/retries.err \
bash -c "shim $BIN/monitor.js | producer.py"

shim $BIN/monitor.js | producer.py --log $LOGDIR/retries.log \
--failures $LOGDIR/retries.err
2 wikipedia/producer/setup.py
@@ -13,7 +13,7 @@

setup(
name​=​"No404WikipediaScraper"​,
version="0.1.0",
version="0.1.1",
install_requires=[
"nodeenv",
"kafka-python==0.9.3"
0 comments on commit a66f5f5
Please sign in to comment.
© 2021 GitHub, Inc.
Terms
Privacy
Security
Status
Docs
Contact GitHubPricingAPITrainingBlogAbout
CodeCodeIssuesIssues2Pull requestsPull requests4ActionsActionsProjectsProjectsWikiWikiSecuritySecurityInsightsInsights Code Issues Pull requests Actions Projects Wiki Security Insights