Skip to content

Commit

Permalink
tweetwarc: bug fix and improvements, additional operation support fea…
Browse files Browse the repository at this point in the history
…ture

FIX: last_offset is set to 0 after writing warcinfo
Add --seek PARTITION:OFFSET option for manually committing consumer offset
set session_timeout_ms=60s too see if it reduces the chance of coordinator
heartbeat expiration.
Exit with code=2 if unexpected exception is detected (so that supervisor will
leave program offline)
  • Loading branch information
kngenie committed Jul 20, 2018
1 parent 7fa5f0d commit 3a1fce6
Showing 1 changed file with 56 additions and 10 deletions.
66 changes: 56 additions & 10 deletions twitter/tweetwarc.py
Expand Up @@ -15,7 +15,7 @@
import socket
import signal
from datetime import datetime
from time import time
import time
from hanzo.warctools import WarcRecord
from hanzo.warctools.warc import warc_datetime_str
from kafka import KafkaConsumer, TopicPartition
Expand Down Expand Up @@ -97,9 +97,7 @@ def __init__(self, directory):
os.remove(self.target_filename)
raise

self.start_time = time()
# last good offset
self.last_offset = 0
self.start_time = time.time()

def warc_filename(self, directory):
"""WARC filename example: /tmp/tweets-20170307100027-0001-fqdn.warc.gz.open
Expand All @@ -121,12 +119,12 @@ def rollback(self):

def should_rollover(self, size_limit, time_limit):
size = os.fstat(self.f.fileno()).st_size
return size > size_limit or (time() - self.start_time) > time_limit
return size > size_limit or (time.time() - self.start_time) > time_limit

def close(self):
self.f.close()
logging.debug('renaming %s to %s', self.target_filename,
self.target_filename[:-5])
logging.info('renaming %s to %s', self.target_filename,
self.target_filename[:-5])
os.rename(self.target_filename, self.target_filename[:-5])


Expand All @@ -139,6 +137,8 @@ def close(self):
help='override consumer group ID for testing')
parser.add_argument('-v', action='count', dest='loglevel', default=0,
help='generate DEBUG level logs')
parser.add_argument('--seek', default=None,
help='update consumer offset and exit (for failure recovery)')
args = parser.parse_args()

with open(args.config) as f:
Expand All @@ -160,11 +160,48 @@ def close(self):
group_id=args.group or config.get('kafka_group_id'),
# use small number not to exceed session_timeout.
max_poll_records=5,
auto_offset_reset='earliest'
session_timeout_ms=60*1000,
auto_offset_reset='earliest',
enable_auto_commit=False if args.seek else True
)
kafka_topic = config.get('kafka_topic')
consumer.subscribe([kafka_topic])

if args.seek:
# this is a rudimentary recovery aid for advancing consumer offset.
# currently this app experiences unexpected revokation of assigned
# partition and subsequent failure to commit consumer offsets. When
# this happens, stop the app and advance offset with this.
target_partition, _, target_offset = args.seek.partition(':')
if not target_offset:
parser.error('--seek expects PARTITION:OFFSET')
target_partition = TopicPartition(kafka_topic, int(target_partition))
target_offset = int(target_offset)

msgs = consumer.poll(max_records=1)
logging.info('msgs=%s', msgs)
while True:
a = consumer.assignment()
if a:
assert target_partition in a
break
logging.info('assignment=%s', a)
time.sleep(1.0)
offset = consumer.committed(target_partition)
# this must be identical to the committed offset.
position = consumer.position(target_partition)

logging.info('current committed offset=%s position=%s', offset, position)
if target_offset == position:
logging.info('no need to change position')
exit(0)

logging.info('seeking to %s', target_offset)
consumer.seek(target_partition, target_offset)
logging.info('new position=%s', consumer.position(target_partition))
consumer.commit()
exit(0)

time_limit = config.get('warc_time_limit')
size_limit = config.get('warc_size_limit')

Expand Down Expand Up @@ -206,7 +243,9 @@ def interrupt(sig, stack):
ex)
warc.rollback()
# re-read the same msg again
consumer.seek(-1, 1)
consumer.seek(TopicPartition(msg.topic, msg.partition),
msg.offset)
consumer.commit()

logging.info('pausing 5 seconds before continueing')
time.sleep(5.0)
Expand All @@ -221,11 +260,18 @@ def interrupt(sig, stack):
warc.close()
warc = None
consumer.commit()
logging.info('exiting by interrupt')
exitcode = 0
except Excpetion as ex:
logging.error('exiting by error', exc_info=1)
# 2 indicates serious error needing operator attention.
# by default supervisor won't restart if program exist with code 2.
exitcode = 2
finally:
if warc is not None:
warc.close()
warc = None

consumer.close()

logging.info('exiting.')
exit(exitcode)

0 comments on commit 3a1fce6

Please sign in to comment.