System Design. Part09. 웹 크롤러 설계
Part09. 웹 크롤러 설계
URL Frontier 핵심 개념
- URL Frontier는 단순한 FIFO 큐가 아닌 크롤링할 URL들을 저장하고 관리하는 우선순위, 예의, 신선도를 모두 고려하는 시스템이다.
1. 우선순위 관리
- PageRank, 트래픽, 업데이트 빈도를 종합한 스코어링
- 신선도 보장: 뉴스 사이트, 소셜 미디어 우선 처리
- 도메인별 균형: 특정 도메인의 큐 독점 방지
2. 정중함 보장
- 도메인별 요청 간격 제어 (예: 70초 대기)
- robots.txt 준수
- 동시 연결 수 제한으로 서버 부하 방지
3. 중복 방지
- Bloom Filter 활용한 메모리 효율적 중복 감지
- URL 정규화: 동일 콘텐츠의 다양한 URL 형태 통합
- Canonical URL 처리: rel=”canonical” 태그 기반
이중 큐 아키텍처: 전면 큐 + 후면 큐
URL Frontier는 전면 큐(Front Queue)와 후면 큐(Back Queue)로 구성된 이중 구조를 가진다.
전면 큐 (Front Queue) - 우선순위 처리
- 순위결정장치: PageRank, 트래픽, 업데이트 빈도를 종합하여 우선순위 계산
- 우선순위별 큐: 계산된 우선 순위 별로 f1 ~ fn URL 분류 저장
- 큐 선택기: 높은 우선순위 큐에서 더 자주 선택 (f1: 40%, f2: 30%, f3: 20%, f4: 7%, f5: 3% …)
후면 큐 (Back Queue) - 예의 보장
- 큐 라우터: 같은 호스트의 URL을 동일한 큐로 라우팅
- 매핑 테이블: 호스트명과 큐 인덱스 관계 저장
- 호스트별 FIFO 큐: 동일 호스트 URL을 순차 처리, 요청 간 지연시간 적용
- 큐 선택기: 큐 순회하며 크롤링 가능한 URL 선택하여 작업 스레드에 전달
구현을 위한 메시지 큐 선택
URL Frontier 구조를 구현하기 위해서는 적절한 메시지 큐 기술을 선택해야한다.
기술별 특성 비교
구분 | Kafka | RabbitMQ | Celery + Redis |
---|---|---|---|
전면 큐 | 우선순위별 토픽 분리 | 메시지 priority 속성 | 태스크 priority 파라미터 |
후면 큐 | 호스트별 파티션 | 라우팅 키 + 호스트별 큐 | Redis TTL 기반 제한 |
politeness | 파티션 내 순차 처리 | 큐별 독립적 소비 | Redis 키 만료 시간 |
장점 | 대용량, 순서 보장 | 유연한 라우팅 | 간단한 구현 |
단점 | 복잡한 설정 | 큐 관리 오버헤드 | 확장성 제한 |
어떤식으로 구현이 되는지 이해를 하기위해 AI의 도움을 받아 코드로 구현해달라고 했다. 파이썬을 잘 모르므로 할루시네이션이 있을 수 있으니 그저 참고용으로만 봐주시길..
Kafka - 대규모 배치 처리
URL Frontier 구현 방식
전면 큐: 우선순위별 토픽
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 우선순위별로 URL을 분류해 놓는 Kafka 토픽들
topics = {
'priority-0': 'high-priority-urls', # 뉴스, 중요 페이지
'priority-1': 'medium-priority-urls', # 일반 웹사이트
'priority-2': 'low-priority-urls' # 아카이브, 개인 블로그
}
# URL 발행자 (순위결정장치 역할)
def publish_url_by_priority(url, priority_score):
if priority_score > 0.8:
topic = topics['priority-0']
elif priority_score > 0.5:
topic = topics['priority-1']
else:
topic = topics['priority-2']
producer.send(topic, {'url': url, 'score': priority_score})
후면 큐: 호스트별 파티션
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 컨슈머 (워커 스레드 역할)
class KafkaCrawlerWorker:
def __init__(self, partition_id):
self.partition_id = partition_id
self.host_delays = {} # hostname -> last_crawl_time
def consume_urls(self):
# 특정 파티션만 소비 (같은 호스트의 URL들만 처리)
consumer = KafkaConsumer(
'crawl-tasks',
group_id='crawler-workers',
enable_auto_commit=False
)
consumer.assign([TopicPartition('crawl-tasks', self.partition_id)])
for message in consumer:
url = message.value['url']
hostname = extract_hostname(url)
# politeness 체크
if self.can_crawl_now(hostname):
self.crawl_url(url)
self.host_delays[hostname] = time.time()
consumer.commit() # 성공 시에만 커밋
else:
# 아직 대기 시간, 나중에 다시 처리
time.sleep(1)
동작 흐름:
- 전면 큐: 우선순위 높은 토픽부터 컨슈머가 더 자주 폴링
- 후면 큐: 같은 호스트 URL들이 동일 파티션으로 집중되어 순서 보장
RabbitMQ - 복잡한 라우팅
URL Frontier 구현 방식
전면 큐: 우선순위 큐 기능 활용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# RabbitMQ 우선순위 큐 설정
queue_config = {
'high-priority': {'x-max-priority': 10, 'priority': 8},
'normal': {'x-max-priority': 10, 'priority': 5},
'low-priority': {'x-max-priority': 10, 'priority': 2}
}
# 큐 생성 시 우선순위 설정
channel.queue_declare(
queue='url-frontier',
arguments={'x-max-priority': 10} # 0~10 범위 우선순위
)
# URL 발행 시 우선순위 헤더 추가
def publish_with_priority(url, priority_score):
priority = int(priority_score * 10) # 0.8 → 8
channel.basic_publish(
exchange='',
routing_key='url-frontier',
body=json.dumps({'url': url}),
properties=pika.BasicProperties(priority=priority)
)
후면 큐: 라우팅 키로 호스트별 분산
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Exchange와 라우팅 키로 도메인별 큐 분리
def setup_host_routing():
# Topic Exchange 생성
channel.exchange_declare(
exchange='crawl-router',
exchange_type='topic'
)
# 호스트별 큐 생성 및 바인딩
hostnames = ['naver.com', 'google.com', 'github.com']
for hostname in hostnames:
queue_name = f"crawl-{hostname.replace('.', '-')}"
routing_key = f"crawl.{hostname}"
channel.queue_declare(queue=queue_name)
channel.queue_bind(
exchange='crawl-router',
queue=queue_name,
routing_key=routing_key
)
# URL 라우팅
def route_url_by_host(url):
hostname = extract_hostname(url)
routing_key = f"crawl.{hostname}" # crawl.naver.com, crawl.google.com
channel.basic_publish(
exchange='crawl-router',
routing_key=routing_key,
body=json.dumps({'url': url})
)
예시 흐름:
- URL 생성기/스코어러가 URL에 점수를 붙이고, 도메인 기반 라우팅 키(
crawl.example.com
)와 함께 우선순위 큐로 publish - Consumer는 도메인별 politeness 상태를 보고, 해당 도메인의 back queue에서 메시지를 꺼내 처리
- 예:
next_allowed_time
이 지나야crawl.naver.com
에서 꺼냄
- 예:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class RabbitMQCrawlerWorker:
def __init__(self, hostname):
self.hostname = hostname
self.queue_name = f"crawl-{hostname.replace('.', '-')}"
self.last_crawl = 0
self.politeness_delay = 70 # 70초 간격
def process_queue(self):
def callback(ch, method, properties, body):
if time.time() - self.last_crawl >= self.politeness_delay:
url_data = json.loads(body)
self.crawl_url(url_data['url'])
self.last_crawl = time.time()
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
# 아직 대기 시간, Nack으로 다시 큐에 넣기
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(
queue=self.queue_name,
on_message_callback=callback
)
channel.start_consuming()
Celery + Redis - 빠른 프로토타이핑
URL Frontier 구현 방식
전면 큐: 태스크 우선순위 활용
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from celery import Celery
app = Celery('crawler', broker='redis://localhost:6379')
@app.task(bind=True, priority=5) # 높은 우선순위 (숫자 클수록 높음)
def crawl_high_priority_url(self, url):
"""뉴스, 실시간 콘텐츠 등 중요한 URL 처리"""
return process_url(url)
@app.task(bind=True, priority=1) # 낮은 우선순위
def crawl_low_priority_url(self, url):
"""개인 블로그, 아카이브 등 덜 중요한 URL 처리"""
return process_url(url)
# URL 분류 및 태스크 할당
def enqueue_url(url, priority_score):
if priority_score > 0.7:
crawl_high_priority_url.delay(url)
else:
crawl_low_priority_url.delay(url)
후면 큐: Redis로 호스트별 제한
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import redis
import time
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def rate_limit_by_host(hostname, delay=70):
"""호스트별 politeness 제어"""
key = f"crawl_limit:{hostname}"
if redis_client.exists(key):
return False # 아직 대기 시간
# delay 초 동안 해당 호스트 크롤링 금지
redis_client.setex(key, delay, 1)
return True
# 실제 크롤링 태스크에서 사용
@app.task(bind=True)
def safe_crawl_url(self, url):
hostname = extract_hostname(url)
# politeness 체크
if not rate_limit_by_host(hostname):
# 아직 대기 시간이면 일정 시간 후 재시도
raise self.retry(countdown=10, max_retries=5)
# 실제 크롤링 수행
try:
html_content = fetch_url(url)
return process_html(url, html_content)
except Exception as exc:
# 실패 시 재시도
raise self.retry(exc=exc, countdown=30, max_retries=3)
# URL 처리 예시
def process_discovered_urls(urls):
for url in urls:
hostname = extract_hostname(url)
priority = calculate_priority(url)
# 우선순위에 따라 다른 태스크로 분기
if priority > 0.8:
crawl_high_priority_url.apply_async(
args=[url],
priority=9,
countdown=0 if rate_limit_by_host(hostname, 0) else 70
)
else:
crawl_low_priority_url.apply_async(
args=[url],
priority=3,
countdown=0 if rate_limit_by_host(hostname, 0) else 70
)
도메인별 정중함 제어
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# Redis를 활용한 정교한 politeness 제어
class CeleryPolitenessManager:
def __init__(self, redis_client):
self.redis = redis_client
def can_crawl_host(self, hostname, default_delay=70):
"""호스트별 크롤링 가능 여부 확인"""
key = f"crawl_limit:{hostname}"
last_crawl = self.redis.get(key)
if last_crawl is None:
return True
elapsed = time.time() - float(last_crawl)
return elapsed >= default_delay
def mark_crawled(self, hostname):
"""크롤링 완료 시점 기록"""
key = f"crawl_limit:{hostname}"
self.redis.set(key, time.time(), ex=300) # 5분 TTL
def get_wait_time(self, hostname, default_delay=70):
"""다음 크롤링까지 대기 시간 계산"""
key = f"crawl_limit:{hostname}"
last_crawl = self.redis.get(key)
if last_crawl is None:
return 0
elapsed = time.time() - float(last_crawl)
return max(0, default_delay - elapsed)
# 사용 예시
politeness_manager = CeleryPolitenessManager(redis_client)
@app.task(bind=True)
def polite_crawl_url(self, url):
hostname = extract_hostname(url)
if not politeness_manager.can_crawl_host(hostname):
wait_time = politeness_manager.get_wait_time(hostname)
# 대기 시간만큼 재시도 지연
raise self.retry(countdown=int(wait_time) + 1)
try:
# 실제 크롤링
html = fetch_url(url)
politeness_manager.mark_crawled(hostname) # 완료 시점 기록
return process_html(url, html)
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
장애 복구 및 상태 관리
Circuit Breaker 패턴
- Closed (정상): 모든 요청 통과, 실패율 모니터링
- Open (차단): 30초내 5회 연속 실패시 요청 차단
- Half-Open (테스트): 30-60초 후 제한적 요청으로 상태 확인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class CrawlerCircuitBreaker:
def __init__(self):
self.state = 'CLOSED' # CLOSED → OPEN → HALF_OPEN
self.failure_count = 0
self.failure_threshold = 5
self.timeout = 60
def call(self, func, *args):
if self.state == 'OPEN':
if time.time() - self.last_failure > self.timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise e
체크포인트 기반 상태 복구
1
2
3
4
5
6
7
8
9
10
11
12
13
# 5분마다 URL Frontier 상태 저장
checkpoint_data = {
'front_queue_sizes': [len(q) for q in front_queues], # 전면 큐 상태
'back_queue_states': { # 후면 큐 상태
hostname: {
'queue_size': len(queue),
'last_crawl_time': last_crawl,
'pending_urls': list(queue)[:100] # 샘플만 저장
} for hostname, queue in host_queues.items()
},
'processed_count': total_processed,
'error_count': total_errors
}
분산 환경에서의 URL 재분산
1
2
3
4
5
6
7
8
# Kafka Consumer Group의 자동 리밸런싱
def handle_node_failure(failed_node_id):
# 실패한 노드의 호스트들을 다른 노드에 재할당
failed_hosts = get_hosts_by_node(failed_node_id)
for hostname in failed_hosts:
new_node = consistent_hash(hostname, active_nodes)
migrate_host_queue(hostname, new_node)
읽어보면 좋을 글
실제 구현하며 겪은 문제와 해결 과정들 : https://andrewkchan.dev/posts/crawler.html
실제로 설계하고 구현한 경험담 (아키텍처 중점): https://benbernardblog.com/the-tale-of-creating-a-distributed-web-crawler/
kafka와 rabbit mq의 차이 : https://aws.amazon.com/ko/compare/the-difference-between-rabbitmq-and-kafka/
circuit breaker pattern : https://learn.microsoft.com/en-us/azure/architecture/patterns/circuit-breaker
cloudflare의 crawl 과금 정책 : https://blog.cloudflare.com/introducing-pay-per-crawl/