Distributed Web Crawling Made Easy: System and Architecture

September 9, 2022 · 16 min read

Looking for a guide to building a distributed crawler architecture and parser at scale? Learn how to implement a distributed crawler that includes web scraping, extracting content, and storing it with scalability in a fault-tolerant manner. We'll combine all the knowledge from previous posts to create a distributed crawling system.

First, we learned about pro techniques to scrape content, although we'll only use CSS selectors today. Then tricks to avoid blocks, from which we'll add proxies, headers, and headless browsers. And lastly, we built a parallel crawler, and this blog post begins with that code.

Keep in mind that it might be in an earlier post if you don't understand some part or snippet. Brace yourselves; lengthy snippets are coming.

Prerequisites

For the code to work, you'll need Redis and python3 installed. Some systems have it pre-installed. After that, install all the necessary libraries by running pip install.

Terminal
pip install install requests beautifulsoup4 playwright "celery[redis]" 
npx playwright install

Intro to Celery and Redis

Celery "is an open source asynchronous task queue." We created a simple parallel version in the last blog post. Celery takes it a step further by providing actually distributed queues. We'll use it to distribute our load among workers and servers. In a real-world case, we would have several nodes to make a distributed web crawler.

Redis "is an open source, in-memory data structure store, used as a database, cache, and message broker." Instead of using arrays and sets to store all the content (in memory), we'll use Redis as a database. Moreover, Celery can use Redis as a broker, so we won't need other software to run it.

Building a distributed web crawler in Python isn't an easy task, but you're brave enough to try! Let's go!

Simple Celery Task

Our first step will be to create a task in Celery that prints the value received by the parameter. Save the snippet in a file called tasks.py and run it. If you run it as a regular python file, only one string will be printed. The console will print two different lines if you run it with celery -A tasks worker.

The difference is in the demo function call. Direct call implies "execute that task," while delay means "enqueue it for a worker to process." Check Celery's API for more info on calling tasks.

tasks.py
from celery import Celery 
 
app = Celery('tasks', broker_url='redis://127.0.0.1:6379/1') 
 
@app.task 
def demo(str): 
	print(f'Str: {str}') 
 
demo('ZenRows') # Str: ZenRows 
demo.delay('ZenRows') # ?

The celery command won't end; we need to kill it by exiting the console (i.e., ctrl + C). We'll use it several times because Celery doesn't reload after code changes.

Distributed Crawling from Task

The next step is to connect a Celery task with the crawling process. This time we'll be using a slightly altered version of the helper functions seen in the last post. extract_links will get all the links on the page except the nofollow ones. We'll add filtering options later.

We'll execute everything in a single node for simplicity. Celery makes it easy for us to run the distributed scraper in many nodes. Stay until the end to find out how.

tasks.py
from celery import Celery 
import requests 
from bs4 import BeautifulSoup 
from urllib.parse import urljoin 
 
app = Celery('tasks', broker_url='redis://127.0.0.1:6379/1') 
 
@app.task 
def crawl(url): 
	html = get_html(url) 
	soup = BeautifulSoup(html, 'html.parser') 
	links = extract_links(url, soup) 
	print(links) 
 
def get_html(url): 
	try: 
		response = requests.get(url) 
		return response.content 
	except Exception as e: 
		print(e) 
 
	return '' 
 
def extract_links(url, soup): 
	return list({ 
		urljoin(url, a.get('href')) 
		for a in soup.find_all('a') 
		if a.get('href') and not(a.get('rel') and 'nofollow' in a.get('rel')) 
	}) 
 
starting_url = 'https://scrapeme.live/shop/page/1/' 
crawl.delay(starting_url)

We could loop over the retrieved links and enqueue them, but that would end up crawling the same pages repeatedly. We saw the basics of executing tasks, and now we'll start splitting the code into files and keeping track of the pages on Redis.

Redis for Tracking URLs

We already said that relying on memory variables isn't an option in a distributed system. We'll need to persist all that data: visited pages, the ones currently crawled, keep a "to visit" list and store some content later on.

For all that, instead of enqueuing directly to Celery, we'll use Redis to avoid re-crawling and duplicates. And enqueue URLs only once.

Frustrated that your web scrapers are blocked once and again?
ZenRows API handles rotating proxies and headless browsers for you.
Try for FREE

We won't go into further details on Redis, but we'll use lists, sets, and hashes.

Take the last snippet and remove the last two lines that call the task. Create a new file, main.py, with the following content.

We'll create a list named crawling:to_visit and push the starting URL. Then we'll go into a loop that will query that list for items and block for a minute until an item is ready. When an item is retrieved, we call the crawl function, enqueuing its execution.

tasks.py
from redis import Redis 
from tasks import crawl 
 
connection = Redis(db=1) 
starting_url = 'https://scrapeme.live/shop/page/1/' 
 
connection.rpush('crawling:to_visit', starting_url) 
 
while True: 
	# timeout after 1 minute 
	item = connection.blpop('crawling:to_visit', 60) 
	if item is None: 
		print('Timeout! No more items to process') 
		break 
 
	url = item[1].decode('utf-8') 
	print('Pop URL', url) 
	crawl.delay(url)

It does almost the same as before but allows us to add items to the list, which will be automatically processed. We could do that easily by looping over links and pushing them all, but it's not a good idea without deduplication and a maximum number of pages. We'll keep track of all the queued and visited using sets and exit once their sum exceeds the maximum allowed.

tasks.py
from redis import Redis 
# ... 
connection = Redis(db=1) 
 
@app.task 
def crawl(url): 
	connection.sadd('crawling:queued', url) # add URL to set 
	html = get_html(url) 
	soup = BeautifulSoup(html, 'html.parser') 
	links = extract_links(url, soup) 
	for link in links: 
		if allow_url_filter(link) and not seen(link): 
			print('Add URL to visit queue', link) 
			add_to_visit(link) 
 
	# atomically move a URL from queued to visited 
	connection.smove('crawling:queued', 'crawling:visited', url) 
 
def allow_url_filter(url): 
	return '/shop/page/' in url and '#' not in url 
    
def seen(url): 
	return connection.sismember('crawling:visited', url) or connection.sismember('crawling:queued', url) 
 
def add_to_visit(url): 
	# LPOS command is not available in Redis library 
	if connection.execute_command('LPOS', 'crawling:to_visit', url) is None: 
		connection.rpush('crawling:to_visit', url) # add URL to the end of the list
tasks.py
maximum_items = 5 
 
while True: 
	visited = connection.scard('crawling:visited') # count URLs in visited 
	queued = connection.scard('crawling:queued') 
	if queued + visited > maximum_items: 
		print('Exiting! Over maximum') 
		break 
	# ...

After executing, everything'll be in Redis, so running again won't work as expected. We need to clean the crawling queue manually. We can do that using redis-cli or a GUI like redis-commander. There are commands for deleting keys (i.e., DEL crawling:to_visit) or flushing the database (careful with this one).

Separate Responsibilities

We'll start to separate concepts before the project grows. We already have two files: tasks.py and main.py. We'll create another two to host crawler-related functions (crawler.py) and database access (repo.py). 

Please look at the snippet below for the repository file, it's not complete, but you get the idea. If you want to check the final content, you'll find it in a GitHub repository.

tasks.py
from redis import Redis 
 
connection = Redis(db=1) 
 
to_visit_key = 'crawling:to_visit' 
visited_key = 'crawling:visited' 
queued_key = 'crawling:queued' 
 
def pop_to_visit_blocking(timeout=0): 
	return connection.blpop(to_visit_key, timeout) 
 
def count_visited(): 
	return connection.scard(visited_key) 
 
def is_queued(value): 
	return connection.sismember(queued_key, value)

And the crawler file will have functions for crawling, extracting links, and so on.

Allow Parser Customization

As mentioned above, we need some way to extract and store content and add only a particular subset of links to the queue. We need a new concept for that: default parser (parsers/defaults.py).

tasks.py
import repo 
 
def extract_content(url, soup): 
	return soup.title.string # extract page's title 
 
def store_content(url, content): 
	# store in a hash with the URL as the key and the title as the content 
	repo.set_content(url, content) 
 
def allow_url_filter(url): 
	return True # allow all by default 
 
def get_html(url): 
	# ... same as before

And in the repo.py file:

repo.py
content_key = 'crawling:content' 
# .. 
def set_content(key, value): 
	connection.hset(content_key, key=key, value=value)

There is nothing new here, but it'll allow us to abstract the link and content extraction. Instead of hardcoding it in the crawler, it'll be a set of functions passed as parameters. Now we can substitute the calls to these functions with an import (for the moment).

For it to be completely abstracted, we need a generator or factory. We'll create a new file to host it, parserlist.py. To simplify a bit, we allow one custom parser per domain. The demo includes two domains for testing: scrapeme.live and quotes.toscrape.com.

Nothing has been done for each domain yet, so we'll use the default parser for them.

repo.py
from urllib.parse import urlparse 
from parsers import defaults 
 
parsers = { 
	'scrapeme.live': defaults, 
	'quotes.toscrape.com': defaults, 
} 
 
def get_parser(url): 
	hostname = urlparse(url).hostname # extract domain from URL 
	if hostname in parsers: 
		# use the dict above to return the custom parser if present 
		return parsers[hostname] 
	return defaults

We can now modify the task with the new per-domain-parsers.

task.py
@app.task 
def crawl(url): 
	parser = get_parser(url) # get the parser, either custom or the default one 
	html = parser.get_html(url) 
	# ... 
	for link in links: 
		if parser.allow_url_filter(link) and not seen(link): 
			# ...

Custom Parser

We'll use scrapeme first as an example. Check the repository for the final version and the other custom parser.

Knowledge of the page and its HTML is required for this part. Take a look at it if you want to get the feeling. To summarize, we'll get the product id, name, and price for each item in the product list. Then store that in a set using the id as the key. As for the links allowed, only the ones for pagination will go through the filtering.

task.py
import json 
import defaults 
import repo 
 
def extract_content(url, soup): 
	return [{ 
		'id': product.find('a', 
			attrs={'data-product_id': True})['data-product_id'], 
		'name': product.find('h2').text, 
		'price': product.find(class_='amount').text 
	} for product in soup.select('.product')] 
 
def store_content(url, content): 
	for item in content: 
		if item['id']: 
			repo.set_content(item['id'], json.dumps(item)) 
 
def allow_url_filter(url): 
	return '/shop/page/' in url and '#' not in url 
 
def get_html(url): 
	return defaults.get_html(url)
Crawler Products, Pokemon
Click to open the image in full screen

In the quotes site, we need to handle it differently since there is no ID per quote. We'll extract the author and quote for each entry in the list. Then, in the store_content function, we'll create a list for each author and add that quote. Redis handles the creation of the lists when necessary.

task.py
def extract_content(url, soup): 
	return [{ 
		'quote': product.find(class_='text').text, 
		'author': product.find(class_='author').text 
	} for product in soup.select('.quote')] 
 
def store_content(url, content): 
	for item in content: 
		if item['quote'] and item['author']: 
			list_key = f"crawling:quote:{item['author']}" 
			repo.add_to_list(list_key, item['quote'])
Crawler Quotes in Redis
Click to open the image in full screen

With the last couple of changes, we have introduced custom parsers that will be easy to extend. When adding a new site, we must create one file per new domain and one line in parserlist.py referencing it. We could go a step further and "auto-discover" them, but no need to complicate it even more.

Get HTML: Headless Browsers

Until now, every page visited was done using requests.get, which can sometimes be inadequate. Say we want to use a different library or headless browser, but just for some cases or domains. Loading a browser is memory-consuming and slow, so we should avoid it when it's not mandatory. The solution? Even more customization. New concept: collector.

We'll create a file named collectors/basic.py and paste the already known get_html function. Then change the defaults to use it by importing it. Next, create a new file, collectors/headless_chromium.py, for the new and shiny method of getting the target HTML. 

As in the previous post, we'll be using Playwright. And we'll also parametrize headers and proxies if we want to use them. Spoiler: we will.

task.py
from playwright.sync_api import sync_playwright 
 
def get_html(url, headers=None, proxy=None, timeout=10000): 
	html = '' 
	with sync_playwright() as p: 
		browser_type = p.chromium 
		browser = browser_type.launch(proxy=proxy) 
		page = browser.new_page() 
		page.set_extra_http_headers(headers) 
		page.goto(url) 
		page.wait_for_timeout(timeout) 
 
		html = page.content() 
 
		browser.close() 
 
	return html

If we want to use a headless Chromium for some domain, merely modify the get_html for that parser (i.e., parsers/scrapemelive.py).

scrapemelive.py
from collectors import headless_chromium 
# ... 
def get_html(url): 
	return headless_chromium.get_html(url)

As you can see in the final repository, we also have a fake.py collector used in scrapemelive.py. Since we used that website for intense testing, we downloaded all the product pages the first time and stored them in a data folder. We can customize with a headless browser, but we can do the same with a file reader, hence the fake name.

fake.py
import time 
import re 
import random 
 
def get_html(url): 
	try: 
		page = int(re.search(r'\d+', url).group()) 
		with open('./data/' + str(page) + '.html') as fp: 
			time.sleep(random.randint(1, 10) / 10) 
			return fp.read() 
	except Exception as e: 
		print(e) 
 
	return ''

Avoid Detection with Headers and Proxies

You guessed it: we want to add custom headers and use proxies. We'll start with the headers by creating a file, headers.py. We won't paste the entire content here, there are three different sets of headers for a Linux machine, and it gets pretty long. Check the repository for the details.

headers.py
import random 
 
chrome_linux_88 = { 
	# ... 
	'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36', 
} 
 
chromium_linux_92 = { 
	# ... 
	'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36', 
} 
 
firefox_linux_88 = { 
	# ... 
	'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0', 
} 
 
headers = [ 
	chrome_linux_88, 
	chromium_linux_92, 
	firefox_linux_88 
] 
 
def random_headers(): 
	return random.choice(headers)

We can import a concrete set of headers or call the random_headers to get one of the available options. We'll see a usage example in a moment.

The same applies to the proxies: create a new file, proxies.py. It'll contain a list of them grouped by the provider. In our example, we'll include only free proxies. Add your paid ones in the proxies dictionary and change the default type to the one you prefer. If we were to complicate things, we could add a retry with a different provider in case of failure.

Note that these free proxies might not work for you. They're short-lived.

proxies.py
import random 
 
free_proxies = [ 
	{'http': 'http://62.33.210.34:58918', 'https': 'http://194.233.69.41:443'}, 
	{'http': 'http://190.64.18.177:80', 'https': 'http://203.193.131.74:3128'}, 
] 
 
proxies = { 
	'free': free_proxies, 
} 
 
def random_proxies(type='free'): 
	return random.choice(proxies[type])

And the usage in a parser:

proxies.py
from headers import random_headers 
from proxies import random_proxies 
# ... 
def get_html(url): 
	return basic.get_html(url, headers=random_headers(), proxies=random_proxies())

Bringing It All Together

It's been a long and eventful trip. It's time to put an end to it by completing the puzzle. We hope you understand the process and all the challenges distributed web scraping and crawling have.

We can't show here the final code, so take a look at the repository and don't hesitate to comment or contact us with any doubts.

The two entry points are tasks.py for Celery and main.py to start queueing URLs. From there, we begin storing URLs in Redis to keep track and start crawling the first URL. A custom or the default parser will get the HTML, extract and filter links, and generate and store the appropriate content. We add those links to a list and start the process again.

Thanks to Celery, once more than one link is in the queue, the distributed web crawling process starts.

Crawler File Tree
Click to open the image in full screen

Points Still Missing

We have already covered a lot of ground, but there is always a step more. Here are a few functionalities that we didn't include. Also, note that most of the code doesn't contain error handling or retries for brevity's sake.

Distributed Web Crawling

Celery offers us distributed scraping and crawling out-of-the-box. The code will be the same, but the execution will differ since there are several strategies for distributed crawling.

For local testing, we can start two different workers celery -A tasks worker --concurrency=20 -n worker and ... -n worker2. But this isn't an actual distributed web crawler design.

It's important to note that the worker's name is essential, especially when starting several on the same machine. If we execute the above command twice without changing the worker's name, Celery won't recognize them correctly. Thus launch the second one as -n worker2.

If the project grows, our only node would be the bottleneck. To properly make a distributed crawler, we would need multiple nodes. Each of them would run the same code and have access to the broker - in our case, Redis. Celery handles the workers and distributes the load.

Robots.txt

Along with the allow_url_filter part, we should also add a robots.txt checker. For that, the robotparser library can take a URL and tell us if it's allowed to crawl it. We can add it to the default or as a standalone function, and then each scraper decides whether to use it. We thought it was complex enough and didn't implement this functionality.

If you were to do it, consider the last time the file was accessed with mtime() and reread it from time to time. And also, cache it to avoid requesting it for every single URL.

Rate Limiting the Distributed Crawler

Celery's rate limit API doesn't allow customization per task and parameter (in our case, domain). Meaning that we can throttle workers or queues, but not to a fine-grained detail as we would like to. It means that we can't limit our distributed system as a whole.

There are several open issues and workarounds. From reading several of those, the takeaway is that we can't do it without keeping track of the requests ourselves.

We could easily rate-limit to 30 requests per minute for each task using the parameters @app.task(rate_limit="30/m"). But remember that it would affect the task, not the crawled domain.

Conclusion

Building a custom distributed web crawler and parser isn't easy. We provided some guidance and tips, hopefully helping you all with your day-to-day tasks.

Before developing something for large-scale production and high performance, think about some essential takeaways:

  1. Separate responsabilities.
  2. Use abstractions when necessary, but don't over-engineer.
  3. Don't be afraid of using specialized software instead of building everything.
  4. Think about scaling even if you don't need it now; just keep it in mind.

Thanks for joining us until the end. It's been a fun series to write, and we hope it's also been attractive from your side. If you liked it, you might be interested in the Javascript Web Scraping guide.

Don't forget to take a look at the rest of the posts in this series.

Ready to get started?

Up to 1,000 URLs for free are waiting for you