Skip to content

Commit

Permalink
producer.py: don't exit upon Kafka communication errors
Browse files Browse the repository at this point in the history
producer.py: do logging by itself, without relying on /home/ia/bin/logger
  • Loading branch information
kngenie committed Mar 23, 2016
1 parent bdd82eb commit a66f5f5
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 38 deletions.
114 changes: 80 additions & 34 deletions wikipedia/producer/producer.py
Expand Up @@ -3,49 +3,95 @@
import re import re
from kafka.client import KafkaClient from kafka.client import KafkaClient
from kafka.producer import SimpleProducer from kafka.producer import SimpleProducer
import argparse
import logging import logging
import logging.config


logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser()
parser.add_argument('--kafka', help='kafka server host:port',
default='crawl-db02.us.archive.org:9092')
parser.add_argument('--log', help='file to send logging output to')
parser.add_argument('--failures', help='file to write lines that could not '
'send to Kafka')

args = parser.parse_args()

if args.log:
loghandler = {
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'default',
'filename': args.log,
'maxBytes': 50*1024*1024,
'backupCount': 3
}
else:
loghandler = {
'class': 'logging.StreamHandler',
'formatter': 'default',
'stream': 'ext://sys.stderr'
}
logconfig = dict(
version=1,
formatters={'default': {
'format': '%(asctime)s %(levelname)s %(message)s'
}},
handlers={'default': loghandler},
root={
'level': 'INFO',
'handlers': ['default']
}
)

logging.config.dictConfig(logconfig)

failure_log = open(args.failures, "w") if args.failures else sys.stdout


TOPIC_IRC = "wiki-irc" TOPIC_IRC = "wiki-irc"
TOPIC_LINKS = "wiki-links" TOPIC_LINKS = "wiki-links"


kafka = KafkaClient("crawl-db02.us.archive.org:9092") kafka = KafkaClient(args.kafka)
#kafka = KafkaClient("crawl-hdfs07.us.archive.org:9092")
wikiIRCProducer = SimpleProducer(kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)


#wikiIRCProducer = SimpleProducer(kafka, "wiki-irc"); wikiIRCProducer = SimpleProducer(
#wikiLinksProducer = SimpleProducer(kafka, "wiki-links"); kafka, async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)


# wikiLinksProducer = SimpleProducer(kafka, "wiki-links", async=False,
# req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)
wikiLinksProducer = wikiIRCProducer wikiLinksProducer = wikiIRCProducer


#read in lines from wikipedia live monitor server #read in lines from wikipedia live monitor server
for line in sys.stdin: for line in sys.stdin:
try: try:
line.decode('utf-8') line.decode('utf-8')
except UnicodeDecodeError: except UnicodeDecodeError:
continue continue
line = line.strip() line = line.strip()
if line: if line:
kafkaSuccess = False kafkaSuccess = False
lineToConsider = False if line.startswith('Wiki-IRC-Message:'):
if re.match(r'^Wiki-IRC-Message:.*$', line): message = line.replace("Wiki-IRC-Message:","",1);
lineToConsider = True try:
message = line.replace("Wiki-IRC-Message:","",1); response = wikiIRCProducer.send_messages(TOPIC_IRC, message);
response = wikiIRCProducer.send_messages(TOPIC_IRC, message); if response and response[0].error == 0:
if response and response[0].error == 0: kafkaSuccess = True
kafkaSuccess = True except KafkaError as ex:
elif re.match(r'^Wiki-Links-Results:.*$', line): logging.warn("failed to send to %s (%r)",
lineToConsider = True TOPIC_IRC, ex)
message = line.replace("Wiki-Links-Results:","",1); elif line.startswith('Wiki-Links-Results:'):
response = wikiLinksProducer.send_messages(TOPIC_LINKS, message); message = line.replace("Wiki-Links-Results:","",1);
if response and response[0].error == 0: try:
kafkaSuccess = True response = wikiLinksProducer.send_messages(TOPIC_LINKS, message);
if lineToConsider and not kafkaSuccess: if response and response[0].error == 0:
# FLUSH to disk kafkaSuccess = True
#print this line out so we can feed these lines from STDIN to this script at a later time (retry failed push) except KafkaError as ex:
print line logging.warn("failed to send to %s (%r)",
kafka.close() TOPIC_LINKS, ex)
else:
logging.warn("ignoring line: %r", line)
continue

if not kafkaSuccess:
# FLUSH to disk
# log failed lines so we can reprocess such lines by feeding
# this output to this script at a later time.
print >>failure_log, line

kafka.close()
5 changes: 2 additions & 3 deletions wikipedia/producer/run-monitor.sh
Expand Up @@ -4,6 +4,5 @@ WORKDIR=$BIN/..
export PATH=$BIN:$PATH export PATH=$BIN:$PATH
LOGDIR=$WORKDIR/logs LOGDIR=$WORKDIR/logs
cd $WORKDIR cd $WORKDIR
/home/ia/bin/logger -f $LOGDIR/retries.log -fe $LOGDIR/retries.err \ shim $BIN/monitor.js | producer.py --log $LOGDIR/retries.log \
bash -c "shim $BIN/monitor.js | producer.py" --failures $LOGDIR/retries.err

2 changes: 1 addition & 1 deletion wikipedia/producer/setup.py
Expand Up @@ -13,7 +13,7 @@


setup( setup(
name="No404WikipediaScraper", name="No404WikipediaScraper",
version="0.1.0", version="0.1.1",
install_requires=[ install_requires=[
"nodeenv", "nodeenv",
"kafka-python==0.9.3" "kafka-python==0.9.3"
Expand Down

0 comments on commit a66f5f5

Please sign in to comment.