Post

System Design. Part09. 웹 크롤러 설계

System Design. Part09. 웹 크롤러 설계

Part09. 웹 크롤러 설계

URL Frontier 핵심 개념

  • URL Frontier는 단순한 FIFO 큐가 아닌 크롤링할 URL들을 저장하고 관리하는 우선순위, 예의, 신선도를 모두 고려하는 시스템이다.

1. 우선순위 관리

  • PageRank, 트래픽, 업데이트 빈도를 종합한 스코어링
  • 신선도 보장: 뉴스 사이트, 소셜 미디어 우선 처리
  • 도메인별 균형: 특정 도메인의 큐 독점 방지

2. 정중함 보장

  • 도메인별 요청 간격 제어 (예: 70초 대기)
  • robots.txt 준수
  • 동시 연결 수 제한으로 서버 부하 방지

3. 중복 방지

  • Bloom Filter 활용한 메모리 효율적 중복 감지
  • URL 정규화: 동일 콘텐츠의 다양한 URL 형태 통합
  • Canonical URL 처리: rel=”canonical” 태그 기반

이중 큐 아키텍처: 전면 큐 + 후면 큐

URL Frontier는 전면 큐(Front Queue)후면 큐(Back Queue)로 구성된 이중 구조를 가진다.

전면 큐 (Front Queue) - 우선순위 처리

  • 순위결정장치: PageRank, 트래픽, 업데이트 빈도를 종합하여 우선순위 계산
  • 우선순위별 큐: 계산된 우선 순위 별로 f1 ~ fn URL 분류 저장
  • 큐 선택기: 높은 우선순위 큐에서 더 자주 선택 (f1: 40%, f2: 30%, f3: 20%, f4: 7%, f5: 3% …)

후면 큐 (Back Queue) - 예의 보장

  • 큐 라우터: 같은 호스트의 URL을 동일한 큐로 라우팅
  • 매핑 테이블: 호스트명과 큐 인덱스 관계 저장
  • 호스트별 FIFO 큐: 동일 호스트 URL을 순차 처리, 요청 간 지연시간 적용
  • 큐 선택기: 큐 순회하며 크롤링 가능한 URL 선택하여 작업 스레드에 전달

구현을 위한 메시지 큐 선택

URL Frontier 구조를 구현하기 위해서는 적절한 메시지 큐 기술을 선택해야한다.

기술별 특성 비교

구분KafkaRabbitMQCelery + Redis
전면 큐우선순위별 토픽 분리메시지 priority 속성태스크 priority 파라미터
후면 큐호스트별 파티션라우팅 키 + 호스트별 큐Redis TTL 기반 제한
politeness파티션 내 순차 처리큐별 독립적 소비Redis 키 만료 시간
장점대용량, 순서 보장유연한 라우팅간단한 구현
단점복잡한 설정큐 관리 오버헤드확장성 제한

어떤식으로 구현이 되는지 이해를 하기위해 AI의 도움을 받아 코드로 구현해달라고 했다. 파이썬을 잘 모르므로 할루시네이션이 있을 수 있으니 그저 참고용으로만 봐주시길..

Kafka - 대규모 배치 처리

URL Frontier 구현 방식

전면 큐: 우선순위별 토픽

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 우선순위별로 URL을 분류해 놓는 Kafka 토픽들
topics = {
    'priority-0': 'high-priority-urls',     # 뉴스, 중요 페이지
    'priority-1': 'medium-priority-urls',   # 일반 웹사이트
    'priority-2': 'low-priority-urls'       # 아카이브, 개인 블로그
}

# URL 발행자 (순위결정장치 역할)
def publish_url_by_priority(url, priority_score):
    if priority_score > 0.8:
        topic = topics['priority-0']
    elif priority_score > 0.5:
        topic = topics['priority-1'] 
    else:
        topic = topics['priority-2']
        
    producer.send(topic, {'url': url, 'score': priority_score})

후면 큐: 호스트별 파티션

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 컨슈머 (워커 스레드 역할)
class KafkaCrawlerWorker:
    def __init__(self, partition_id):
        self.partition_id = partition_id
        self.host_delays = {}  # hostname -> last_crawl_time
        
    def consume_urls(self):
        # 특정 파티션만 소비 (같은 호스트의 URL들만 처리)
        consumer = KafkaConsumer(
            'crawl-tasks',
            group_id='crawler-workers',
            enable_auto_commit=False
        )
        consumer.assign([TopicPartition('crawl-tasks', self.partition_id)])
        
        for message in consumer:
            url = message.value['url']
            hostname = extract_hostname(url)
            
            # politeness 체크
            if self.can_crawl_now(hostname):
                self.crawl_url(url)
                self.host_delays[hostname] = time.time()
                consumer.commit()  # 성공 시에만 커밋
            else:
                # 아직 대기 시간, 나중에 다시 처리
                time.sleep(1)

동작 흐름:

  1. 전면 큐: 우선순위 높은 토픽부터 컨슈머가 더 자주 폴링
  2. 후면 큐: 같은 호스트 URL들이 동일 파티션으로 집중되어 순서 보장

RabbitMQ - 복잡한 라우팅

URL Frontier 구현 방식

전면 큐: 우선순위 큐 기능 활용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# RabbitMQ 우선순위 큐 설정
queue_config = {
    'high-priority': {'x-max-priority': 10, 'priority': 8},
    'normal': {'x-max-priority': 10, 'priority': 5},  
    'low-priority': {'x-max-priority': 10, 'priority': 2}
}

# 큐 생성 시 우선순위 설정
channel.queue_declare(
    queue='url-frontier',
    arguments={'x-max-priority': 10}  # 0~10 범위 우선순위
)

# URL 발행 시 우선순위 헤더 추가
def publish_with_priority(url, priority_score):
    priority = int(priority_score * 10)  # 0.8 → 8
    
    channel.basic_publish(
        exchange='',
        routing_key='url-frontier',
        body=json.dumps({'url': url}),
        properties=pika.BasicProperties(priority=priority)
    )

후면 큐: 라우팅 키로 호스트별 분산

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# Exchange와 라우팅 키로 도메인별 큐 분리
def setup_host_routing():
    # Topic Exchange 생성
    channel.exchange_declare(
        exchange='crawl-router',
        exchange_type='topic'
    )
    
    # 호스트별 큐 생성 및 바인딩
    hostnames = ['naver.com', 'google.com', 'github.com']
    
    for hostname in hostnames:
        queue_name = f"crawl-{hostname.replace('.', '-')}"
        routing_key = f"crawl.{hostname}"
        
        channel.queue_declare(queue=queue_name)
        channel.queue_bind(
            exchange='crawl-router',
            queue=queue_name,
            routing_key=routing_key
        )

# URL 라우팅
def route_url_by_host(url):
    hostname = extract_hostname(url)
    routing_key = f"crawl.{hostname}"  # crawl.naver.com, crawl.google.com
    
    channel.basic_publish(
        exchange='crawl-router',
        routing_key=routing_key,
        body=json.dumps({'url': url})
    )

예시 흐름:

  1. URL 생성기/스코어러가 URL에 점수를 붙이고, 도메인 기반 라우팅 키(crawl.example.com)와 함께 우선순위 큐로 publish
  2. Consumer는 도메인별 politeness 상태를 보고, 해당 도메인의 back queue에서 메시지를 꺼내 처리
    • 예: next_allowed_time이 지나야 crawl.naver.com에서 꺼냄
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class RabbitMQCrawlerWorker:
    def __init__(self, hostname):
        self.hostname = hostname
        self.queue_name = f"crawl-{hostname.replace('.', '-')}"
        self.last_crawl = 0
        self.politeness_delay = 70  # 70초 간격
        
    def process_queue(self):
        def callback(ch, method, properties, body):
            if time.time() - self.last_crawl >= self.politeness_delay:
                url_data = json.loads(body)
                self.crawl_url(url_data['url'])
                self.last_crawl = time.time()
                ch.basic_ack(delivery_tag=method.delivery_tag)
            else:
                # 아직 대기 시간, Nack으로 다시 큐에 넣기
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
                
        channel.basic_consume(
            queue=self.queue_name,
            on_message_callback=callback
        )
        channel.start_consuming()

Celery + Redis - 빠른 프로토타이핑

URL Frontier 구현 방식

전면 큐: 태스크 우선순위 활용

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from celery import Celery

app = Celery('crawler', broker='redis://localhost:6379')

@app.task(bind=True, priority=5)  # 높은 우선순위 (숫자 클수록 높음)
def crawl_high_priority_url(self, url):
    """뉴스, 실시간 콘텐츠 등 중요한 URL 처리"""
    return process_url(url)

@app.task(bind=True, priority=1)  # 낮은 우선순위  
def crawl_low_priority_url(self, url):
    """개인 블로그, 아카이브 등 덜 중요한 URL 처리"""
    return process_url(url)

# URL 분류 및 태스크 할당
def enqueue_url(url, priority_score):
    if priority_score > 0.7:
        crawl_high_priority_url.delay(url)
    else:
        crawl_low_priority_url.delay(url)

후면 큐: Redis로 호스트별 제한

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import redis
import time

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def rate_limit_by_host(hostname, delay=70):
    """호스트별 politeness 제어"""
    key = f"crawl_limit:{hostname}"
    
    if redis_client.exists(key):
        return False  # 아직 대기 시간
    
    # delay 초 동안 해당 호스트 크롤링 금지
    redis_client.setex(key, delay, 1)
    return True

# 실제 크롤링 태스크에서 사용
@app.task(bind=True)
def safe_crawl_url(self, url):
    hostname = extract_hostname(url)
    
    # politeness 체크
    if not rate_limit_by_host(hostname):
        # 아직 대기 시간이면 일정 시간 후 재시도
        raise self.retry(countdown=10, max_retries=5)
    
    # 실제 크롤링 수행
    try:
        html_content = fetch_url(url)
        return process_html(url, html_content)
    except Exception as exc:
        # 실패 시 재시도
        raise self.retry(exc=exc, countdown=30, max_retries=3)

# URL 처리 예시
def process_discovered_urls(urls):
    for url in urls:
        hostname = extract_hostname(url) 
        priority = calculate_priority(url)
        
        # 우선순위에 따라 다른 태스크로 분기
        if priority > 0.8:
            crawl_high_priority_url.apply_async(
                args=[url], 
                priority=9,
                countdown=0 if rate_limit_by_host(hostname, 0) else 70
            )
        else:
            crawl_low_priority_url.apply_async(
                args=[url],
                priority=3,
                countdown=0 if rate_limit_by_host(hostname, 0) else 70
            )

도메인별 정중함 제어

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# Redis를 활용한 정교한 politeness 제어
class CeleryPolitenessManager:
    def __init__(self, redis_client):
        self.redis = redis_client
        
    def can_crawl_host(self, hostname, default_delay=70):
        """호스트별 크롤링 가능 여부 확인"""
        key = f"crawl_limit:{hostname}"
        last_crawl = self.redis.get(key)
        
        if last_crawl is None:
            return True
            
        elapsed = time.time() - float(last_crawl)
        return elapsed >= default_delay
        
    def mark_crawled(self, hostname):
        """크롤링 완료 시점 기록"""
        key = f"crawl_limit:{hostname}"
        self.redis.set(key, time.time(), ex=300)  # 5분 TTL
        
    def get_wait_time(self, hostname, default_delay=70):
        """다음 크롤링까지 대기 시간 계산"""
        key = f"crawl_limit:{hostname}"
        last_crawl = self.redis.get(key)
        
        if last_crawl is None:
            return 0
            
        elapsed = time.time() - float(last_crawl)
        return max(0, default_delay - elapsed)

# 사용 예시
politeness_manager = CeleryPolitenessManager(redis_client)

@app.task(bind=True)
def polite_crawl_url(self, url):
    hostname = extract_hostname(url)
    
    if not politeness_manager.can_crawl_host(hostname):
        wait_time = politeness_manager.get_wait_time(hostname)
        # 대기 시간만큼 재시도 지연
        raise self.retry(countdown=int(wait_time) + 1)
    
    try:
        # 실제 크롤링
        html = fetch_url(url)
        politeness_manager.mark_crawled(hostname)  # 완료 시점 기록
        return process_html(url, html)
        
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

장애 복구 및 상태 관리

Circuit Breaker 패턴

  • Closed (정상): 모든 요청 통과, 실패율 모니터링
  • Open (차단): 30초내 5회 연속 실패시 요청 차단
  • Half-Open (테스트): 30-60초 후 제한적 요청으로 상태 확인
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class CrawlerCircuitBreaker:
    def __init__(self):
        self.state = 'CLOSED'  # CLOSED → OPEN → HALF_OPEN
        self.failure_count = 0
        self.failure_threshold = 5
        self.timeout = 60
        
    def call(self, func, *args):
        if self.state == 'OPEN':
            if time.time() - self.last_failure > self.timeout:
                self.state = 'HALF_OPEN'
            else:
                raise Exception("Circuit breaker is OPEN")
                
        try:
            result = func(*args)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise e

체크포인트 기반 상태 복구

1
2
3
4
5
6
7
8
9
10
11
12
13
# 5분마다 URL Frontier 상태 저장
checkpoint_data = {
    'front_queue_sizes': [len(q) for q in front_queues],  # 전면 큐 상태
    'back_queue_states': {  # 후면 큐 상태
        hostname: {
            'queue_size': len(queue),
            'last_crawl_time': last_crawl,
            'pending_urls': list(queue)[:100]  # 샘플만 저장
        } for hostname, queue in host_queues.items()
    },
    'processed_count': total_processed,
    'error_count': total_errors
}

분산 환경에서의 URL 재분산

1
2
3
4
5
6
7
8
# Kafka Consumer Group의 자동 리밸런싱
def handle_node_failure(failed_node_id):
    # 실패한 노드의 호스트들을 다른 노드에 재할당
    failed_hosts = get_hosts_by_node(failed_node_id)
    
    for hostname in failed_hosts:
        new_node = consistent_hash(hostname, active_nodes)
        migrate_host_queue(hostname, new_node)

읽어보면 좋을 글

실제 구현하며 겪은 문제와 해결 과정들 : https://andrewkchan.dev/posts/crawler.html

실제로 설계하고 구현한 경험담 (아키텍처 중점): https://benbernardblog.com/the-tale-of-creating-a-distributed-web-crawler/

kafka와 rabbit mq의 차이 : https://aws.amazon.com/ko/compare/the-difference-between-rabbitmq-and-kafka/

circuit breaker pattern : https://learn.microsoft.com/en-us/azure/architecture/patterns/circuit-breaker

cloudflare의 crawl 과금 정책 : https://blog.cloudflare.com/introducing-pay-per-crawl/

This post is licensed under CC BY 4.0 by the author.