diff --git a/core/src/main/java/com/digitalpebble/stormcrawler/bolt/FetcherBolt.java b/core/src/main/java/com/digitalpebble/stormcrawler/bolt/FetcherBolt.java index b97b1bdae..6316ea750 100644 --- a/core/src/main/java/com/digitalpebble/stormcrawler/bolt/FetcherBolt.java +++ b/core/src/main/java/com/digitalpebble/stormcrawler/bolt/FetcherBolt.java @@ -76,9 +76,12 @@ public class FetcherBolt extends StatusEmitterBolt { */ public static final String QUEUED_TIMEOUT_PARAM_KEY = "fetcher.timeout.queue"; - /** Key name of the custom crawl delay for a page that may be present in metadata */ + /** Key name of the custom crawl delay for a queue that may be present in the metadata */ private static final String CRAWL_DELAY_KEY_NAME = "crawl.delay"; + /** Key name of the custom max number of threads that may be present in the metadata */ + private static final String CRAWL_MAX_THREAD_KEY_NAME = "max.threads.queue"; + private final AtomicInteger activeThreads = new AtomicInteger(0); private final AtomicInteger spinWaiting = new AtomicInteger(0); @@ -318,11 +321,12 @@ public synchronized void finishFetchItem(FetchItem it, boolean asap) { public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadata) { FetchItemQueue fiq = queues.get(id); - // custom crawl delay from metadata? - final long customCrawlDelay = + // custom min crawl delay from metadata? + final long minDelay = metadata != null && metadata.getFirstValue(CRAWL_DELAY_KEY_NAME) != null ? Long.parseLong(metadata.getFirstValue(CRAWL_DELAY_KEY_NAME)) : minCrawlDelay; + if (fiq == null) { int customThreadVal = defaultMaxThread; // custom maxThread value? @@ -332,16 +336,24 @@ public synchronized FetchItemQueue getFetchItemQueue(String id, Metadata metadat break; } } + + // overridden at URL level + // custom thread number from metadata? + if (metadata != null) { + final String val = metadata.getFirstValue(CRAWL_MAX_THREAD_KEY_NAME); + if (val != null) { + customThreadVal = Integer.parseInt(val); + } + } + // initialize queue - fiq = - new FetchItemQueue( - customThreadVal, crawlDelay, customCrawlDelay, maxQueueSize); + fiq = new FetchItemQueue(customThreadVal, crawlDelay, minDelay, maxQueueSize); queues.put(id, fiq); } // in cases where we have different pages with the same key that will fall in the same // queue, each one with a custom crawl delay, we take the less aggressive - if (fiq.minCrawlDelay < customCrawlDelay) { - fiq.minCrawlDelay = customCrawlDelay; + if (fiq.minCrawlDelay < minDelay) { + fiq.minCrawlDelay = minDelay; } return fiq; }