Skip to content

Commit

Permalink
watchdog functionality in monitor script
Browse files Browse the repository at this point in the history
  • Loading branch information
vinaygoel committed Nov 22, 2013
1 parent 9fda6ad commit a8a5784
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 48 deletions.
146 changes: 100 additions & 46 deletions wikipedia/producer/monitor.js
Expand Up @@ -6,6 +6,7 @@ var app = express();
var server = http.createServer(app);
var $ = require('cheerio');
var wikipedias = require('./wikipedias.js');
//var kafka = require('kafka');

// whether to monitor the 1,000,000+ articles Wikipedias
var MONITOR_SHORT_TAIL_WIKIPEDIAS = true;
Expand Down Expand Up @@ -63,7 +64,13 @@ if (MONITOR_WIKIDATA) {
});
}

var client = new irc.Client(
var client = null;
var lastSeenMessageTimestamp = null;
var MAX_WAIT_FOR_RESET = 120000;

function newClient() {
console.log("Monitor: Starting a new IRC client");
client = new irc.Client(
IRC_SERVER,
IRC_NICK,
{
Expand All @@ -74,55 +81,58 @@ var client = new irc.Client(
stripColors: true
});

client.addListener('registered', function(message) {
console.log('Connected to IRC server ' + IRC_SERVER);
// connect to IRC channels
IRC_CHANNELS.forEach(function(channel) {
console.log('Joining channel ' + channel);
client.join(channel);
});
});
client.addListener('registered', function(message) {
console.log('Connected to IRC server ' + IRC_SERVER);
// connect to IRC channels
IRC_CHANNELS.forEach(function(channel) {
console.log('Joining channel ' + channel);
client.join(channel);
});
});

// fired whenever the client connects to an IRC channel
client.addListener('join', function(channel, nick, message) {
console.log(nick + ' joined channel ' + channel);
});
// fired whenever someone parts a channel
client.addListener('part', function(channel, nick, reason, message) {
console.log('User ' + nick + ' has left ' + channel + ' (' + reason + ')');
});
// fired whenever someone quits the IRC server
client.addListener('quit', function(nick, reason, channels, message) {
console.log('User ' + nick + ' has quit ' + channels + ' (' + reason + ')');
});
// fired whenever someone sends a notice
client.addListener('notice', function(nick, to, text, message) {
console.log('Notice from ' + (nick === undefined? 'server' : nick) + ' to ' +
to + ': ' + text);
});
// fired whenever someone gets kicked from a channel
client.addListener('kick', function(channel, nick, by, reason, message) {
console.warn('User ' + (by === undefined? 'server' : by) + ' has kicked ' +
nick + ' from ' + channel + ' (' + reason + ')');
});
// fired whenever someone is killed from the IRC server
client.addListener('kill', function(nick, reason, channels, message) {
console.warn('User ' + nick + ' was killed from ' + channels + ' (' +
reason + ')');
});
// fired whenever the client encounters an error
client.addListener('error', function(message) {
console.warn('IRC error: ' + message);
});
// fired whenever the client connects to an IRC channel
client.addListener('join', function(channel, nick, message) {
console.log(nick + ' joined channel ' + channel);
});
// fired whenever someone parts a channel
client.addListener('part', function(channel, nick, reason, message) {
console.log('User ' + nick + ' has left ' + channel + ' (' + reason + ')');
});
// fired whenever someone quits the IRC server
client.addListener('quit', function(nick, reason, channels, message) {
console.log('User ' + nick + ' has quit ' + channels + ' (' + reason + ')');
});
// fired whenever someone sends a notice
client.addListener('notice', function(nick, to, text, message) {
console.log('Notice from ' + (nick === undefined? 'server' : nick) + ' to ' +
to + ': ' + text);
});
// fired whenever someone gets kicked from a channel
client.addListener('kick', function(channel, nick, by, reason, message) {
console.warn('User ' + (by === undefined? 'server' : by) + ' has kicked ' +
nick + ' from ' + channel + ' (' + reason + ')');
});
// fired whenever someone is killed from the IRC server
client.addListener('kill', function(nick, reason, channels, message) {
console.warn('User ' + nick + ' was killed from ' + channels + ' (' +
reason + ')');
});
// fired whenever the client encounters an error
client.addListener('error', function(message) {
console.warn('IRC error: ' + message);
});

// start the monitoring process
monitorWikipedia();
}
function parseMessage(message, to) {
// get the editor's username or IP address
// the IRC log format is as follows (with color codes removed):
// rc-pmtpa: [[Juniata River]] http://en.wikipedia.org/w/index.php?diff=516269072&oldid=514659029 * Johanna-Hypatia * (+67) Category:Place names of Native American origin in Pennsylvania

//logging all rc-pmta messages for research purposes
//console.log("Wiki-IRC-Message:" + message);

console.log("Wiki-IRC-Message:" + message);
var messageComponents = message.split(' * ');
var articleRegExp = /\[\[(.+?)\]\].+?$/;
var article = messageComponents[0].replace(articleRegExp, '$1');
Expand Down Expand Up @@ -196,6 +206,7 @@ function parseMessage(message, to) {
};
}


function monitorWikipedia() {
// fires whenever a new IRC message arrives on any of the IRC rooms
client.addListener('message', function(from, to, message) {
Expand All @@ -208,6 +219,15 @@ function monitorWikipedia() {
return;
}

//send message to Kafka topic - wiki-irc
/*producer.send(message, "wiki-irc", 0, function(err){
if (err){
console.log("Kafka: send error to topic - wiki-irc: ", err);
} else {
console.log("Kafka: message sent to wiki-irc");
}
});
*/
var article = components.article;
var editor = components.editor;
var delta = components.delta;
Expand All @@ -216,7 +236,10 @@ function monitorWikipedia() {
var jsonUrl = components.jsonUrl;
var now = Date.now();

request.get({
//set the last seen message TS
lastSeenMessageTimestamp = now;

request.get({
uri: jsonUrl,
headers: {'User-Agent': USER_AGENT}
},
Expand All @@ -235,8 +258,11 @@ function monitorWikipedia() {
var resultsObj = getResultsObject(article, editor, delta, comment, language, now, links);
var resultsJson = JSON.stringify(resultsObj);
console.log('Wiki-Links-Results:' + resultsJson);
//console.error('Wiki-Links-Results:' + resultsJson);
}
});


});
}

Expand All @@ -254,7 +280,6 @@ function getResultsObject(article, editor, delta, comment, language, now, links)

}

// Extract external links for "new" pages using the Wikipedia API
function getLinksFromParseUrl(error, body) {
if (!error) {
var json;
Expand All @@ -274,7 +299,7 @@ function getLinksFromParseUrl(error, body) {
}
}

// Extract external links from added lines

function getLinksFromCompareUrl(error, body) {
if (!error) {
var json;
Expand Down Expand Up @@ -309,8 +334,37 @@ function getLinksFromCompareUrl(error, body) {
}
}

// start the monitoring process upon a connection
monitorWikipedia();
/*
// Kafka producer
var producer = new kafka.Producer().connect();
producer.on('error', function(err){
console.log("Kafka: some general error occurred: ", err);
});
producer.on('brokerReconnectError', function(err){
console.log("Kafka: could not reconnect: ", err);
console.log("Kafka: will retry on next send()");
});
producer.send("wiki message1-fun", "wiki-irc", 0, function(err){
if (err){
console.log("send error: ", err);
} else {
console.log("message sent");
}
});
*/

newClient();

//watchdog
setInterval(function() {
var now = Date.now();
if (lastSeenMessageTimestamp === null || (now - lastSeenMessageTimestamp) > MAX_WAIT_FOR_RESET) {
// reset Wiki client
newClient();
}
}, 2 * MAX_WAIT_FOR_RESET);

// start the server
var port = process.env.PORT || 8080;
Expand Down
7 changes: 5 additions & 2 deletions wikipedia/producer/producer.py
Expand Up @@ -3,10 +3,13 @@
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer

kafka = KafkaClient("kafka-setup.archive.org", 9092)
kafka = KafkaClient("crawl-db02.us.archive.org", 9092)
wikiIRCProducer = SimpleProducer(kafka, "wiki-irc", async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)

#wikiIRCProducer = SimpleProducer(kafka, "wiki-irc");
#wikiLinksProducer = SimpleProducer(kafka, "wiki-links");

wikiLinksProducer = SimpleProducer(kafka, "wiki-links", async=False,
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE)

Expand All @@ -33,7 +36,7 @@
if response and response[0].error == 0:
kafkaSuccess = True
if lineToConsider and not kafkaSuccess:
# FLUSH to disk
#print this line out so we can feed these lines from STDIN to this script at a later time (retry failed push)
print line

kafka.close()

0 comments on commit a8a5784

Please sign in to comment.