Skip to content

Commit

Permalink
Fetcher: pass custom delay for queues via metadata, fix #1112
Browse files Browse the repository at this point in the history
Signed-off-by: Julien Nioche <julien@digitalpebble.com>
  • Loading branch information
jnioche committed Oct 20, 2023
1 parent c313b4f commit 5462006
Showing 1 changed file with 32 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public class FetcherBolt extends StatusEmitterBolt {
/** Key name of the custom crawl delay for a queue that may be present in the metadata */
private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay";

/**
* Key name of the custom crawl delay for a queue that may be present in the metadata when
* multi-threading is allowed for a queue
*/
private static final String CRAWL_MIN_DELAY_KEY_NAME = "crawl.min.delay";

/** Key name of the custom max number of threads that may be present in the metadata */
private static final String CRAWL_MAX_THREAD_KEY_NAME = "max.threads.queue";

Expand Down Expand Up @@ -321,18 +327,29 @@ public synchronized void finishFetchItem(FetchItem it, boolean asap) {

public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadata) {
FetchItemQueue fiq = queues.get(id);
// custom min crawl delay from metadata?
final long minDelay =
metadata != null && metadata.getFirstValue(CRAWL_DELAY_KEY_NAME) != null
? Long.parseLong(metadata.getFirstValue(CRAWL_DELAY_KEY_NAME))
: minCrawlDelay;

long delay = crawlDelay;
long minDelay = minCrawlDelay;

if (metadata != null) {
// custom crawl delay from metadata?
String v = metadata.getFirstValue(CRAWL_DELAY_KEY_NAME);
if (v != null) {
delay = Long.parseLong(v);
}
// custom min crawl delay from metadata?
v = metadata.getFirstValue(CRAWL_MIN_DELAY_KEY_NAME);
if (v != null) {
minDelay = Long.parseLong(v);
}
}

if (fiq == null) {
int customThreadVal = defaultMaxThread;
int threadVal = defaultMaxThread;
// custom maxThread value?
for (Entry<Pattern, Integer> p : customMaxThreads.entrySet()) {
if (p.getKey().matcher(id).matches()) {
customThreadVal = p.getValue();
threadVal = p.getValue();
break;
}
}
Expand All @@ -342,19 +359,24 @@ public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadat
if (metadata != null) {
final String val = metadata.getFirstValue(CRAWL_MAX_THREAD_KEY_NAME);
if (val != null) {
customThreadVal = Integer.parseInt(val);
threadVal = Integer.parseInt(val);
}
}

// initialize queue
fiq = new FetchItemQueue(customThreadVal, crawlDelay, minDelay, maxQueueSize);
fiq = new FetchItemQueue(threadVal, delay, minDelay, maxQueueSize);
queues.put(id, fiq);
}

// in cases where we have different pages with the same key that will fall in the same
// queue, each one with a custom crawl delay, we take the less aggressive
// queue, each one with a custom min crawl delay, we take the less aggressive
if (fiq.minCrawlDelay < minDelay) {
fiq.minCrawlDelay = minDelay;
}
// same for the normal delay
if (fiq.crawlDelay < delay) {
fiq.crawlDelay = delay;
}
return fiq;
}

Expand Down

0 comments on commit 5462006

Please sign in to comment.