scheduler.yaml 进阶部分

openclaw openclaw中文博客 1

这些设置分散在项目的不同配置文件中,主要是 scheduler.yaml, interceptor.py, runtime.yaml 以及自定义的 pipelinemiddleware

scheduler.yaml 进阶部分-第1张图片-OpenClaw 中文版 - 真正能做事的 AI

以下是 OpenClaw 进阶设置的核心模块和配置指南:

任务调度高级配置 (scheduler.yaml)

这是进阶的核心,用于控制抓取任务的节奏、并发和去重。

  worker:
    max_running: 10                    # 全局最大并发任务数,根据机器和网站承受能力调整
    trigger_interval: 1                # 触发新任务检查的间隔(秒),频繁任务可调小
  # **重点:任务优先级与队列控制**
  priority_strategy: "fifo"           # 队列策略:fifo(默认), lifo, priority
  # 如果使用 priority,需要在任务中指定 priority 字段(值越小优先级越高)
  # **重点:智能速率限制(反爬关键)**
  request:
    delay:
      fixed_delay: 2                   # 固定延迟(秒),每个请求后等待时间,保守策略。
      # 或使用随机延迟,更模拟人类行为
      random_delay:
        min: 1
        max: 5
    # **域名并发控制**:防止对单一站点攻击性抓取
    concurrency_per_domain: 2          # 同一域名下同时运行的最大任务数
    queue_capacity_per_domain: 100     # 同一域名等待队列的最大任务数
  # **重点:持久化与断点续爬**
  persistence:
    enabled: true                      # 启用任务状态持久化
    type: "json"                       # 存储类型:json, sqlite
    file_path: "./data/task_status.json" # 状态文件路径
    # 服务重启后,会根据此文件恢复未完成的任务
  # **布隆过滤器(高效内存去重)**
  deduplication:
    enabled: true
    type: "bloom_filter"              # 使用布隆过滤器,适用于海量URL去重
    filter_capacity: 1000000          # 期望处理的URL总数
    error_rate: 0.001                 # 可接受的误判率
    # 重启后会丢失,适合一次性抓取,如需持久化,需使用 `redis` 类型。

运行时动态配置 (runtime.yaml)

用于在爬虫运行过程中动态调整参数,无需修改代码。

# runtime.yaml
variables:
  # 1. 动态分页控制
  max_page: 50                        # 爬虫运行时可修改的最大翻页数
  # 在 interceptor 中可通过 `context.runtime_variables.get('max_page')` 获取
  # 2. 代理配置轮换
  proxy_enabled: true
  proxy_list:
    - "http://proxy1.example.com:8080"
    - "http://proxy2.example.com:8080"
  current_proxy_index: 0              # 可在失败时通过拦截器动态切换
  # 3. 请求头轮换池(反爬)
  user_agents:
    - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ..."
    - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 ..."
  # 4. 可重试的状态码列表
  retry_status_codes: [500, 502, 503, 504, 429, 408]

拦截器高级应用 (interceptor.py)

拦截器是实现复杂逻辑和反爬策略的“大脑”

# interceptor.py 进阶示例
from openclaw.interceptor.base import BaseInterceptor
class AdvancedInterceptor(BaseInterceptor):
    async def before_request(self, context):
        """请求前:动态配置请求参数"""
        # 1. 动态切换User-Agent
        import random
        ua_list = context.runtime_variables.get('user_agents', [])
        if ua_list:
            context.request.headers['User-Agent'] = random.choice(ua_list)
        # 2. 智能代理切换(当遇到429/403时)
        if hasattr(context, 'last_response_status') and context.last_response_status in [429, 403]:
            self._rotate_proxy(context)
        # 3. 为特定站点添加Cookies或签名(应对反爬)
        if "example.com" in context.request.url:
            context.request.cookies.update({'session_id': 'dynamic_value'})
            # 或计算动态参数
            import time, hashlib
            timestamp = int(time.time())
            context.request.params['_t'] = timestamp
            context.request.params['_sign'] = self._generate_sign(timestamp)
    async def after_response(self, context):
        """响应后:复杂处理与状态判断"""
        # 1. 保存上一次响应状态,供before_request使用
        context.last_response_status = context.response.status
        # 2. 检查响应内容是否被封(如出现“验证码”、“访问过于频繁”等关键词)
        if context.response and context.response.text:
            if "验证码" in context.response.text or "access denied" in context.response.text.lower():
                context.logger.warning("触发反爬机制,尝试减缓速度或更换代理")
                # 可以主动抛出特定异常,触发重试或任务暂停
                # raise RequestBlockedError("页面被拦截")
        # 3. 动态更新任务优先级(发现重要链接提升其优先级)
        if "detail" in context.request.url:
            context.task.priority = 1  # 高优先级
    def _rotate_proxy(self, context):
        """轮换代理IP"""
        proxy_list = context.runtime_variables.get('proxy_list', [])
        if proxy_list:
            current = context.runtime_variables.get('current_proxy_index', 0)
            next_index = (current + 1) % len(proxy_list)
            context.runtime_variables['current_proxy_index'] = next_index
            context.request.proxy = proxy_list[next_index]
            context.logger.info(f"切换代理至: {proxy_list[next_index]}")
    def _generate_sign(self, timestamp):
        """生成请求签名(示例)"""
        secret = "your_secret_key"
        string_to_sign = f"{timestamp}{secret}"
        return hashlib.md5(string_to_sign.encode()).hexdigest()

数据处理管道 (pipeline.py)

用于对抓取到的 Item 进行复杂的后处理。

# pipeline.py 进阶示例
class AdvancedPipeline:
    async def process_item(self, item, spider_name):
        """处理抓取到的数据项"""
        # 1. 数据清洗
        item = self._clean_data(item)
        # 2. 数据验证
        if not self._validate_item(item):
            raise DropItem(f"数据验证失败: {item}")
        # 3. 数据增强(如根据标题获取情感分类)
        item['sentiment'] = self._analyze_sentiment(item.get('title', ''))
        # 4. 关联外部数据或数据库查询
        item['category_id'] = await self._query_category_from_db(item['category_name'])
        # 5. 分派到不同的存储(根据类型)
        if item['type'] == 'news':
            await self._save_to_news_db(item)
        elif item['type'] == 'product':
            await self._save_to_es(item)  # 存到Elasticsearch
        return item
    def _clean_data(self, item):
        """高级清洗:去除HTML标签、空格、规范化日期等"""
        import re
        for key, value in item.items():
            if isinstance(value, str):
                # 去除HTML标签
                value = re.sub(r'<[^>]+>', '', value)
                # 规范化空白字符
                value = ' '.join(value.split())
                item[key] = value.strip()
        return item

扩展中间件 (Middleware)

用于在请求-响应生命周期中注入更底层的逻辑。

# 示例:自定义重试中间件,针对特定异常增加延迟
from openclaw.middleware.retry import RetryMiddleware
import asyncio
class CustomRetryMiddleware(RetryMiddleware):
    async def process_exception(self, request, exception, spider):
        # 如果是连接超时,等待更长时间再重试
        if "Timeout" in str(exception):
            await asyncio.sleep(10)  # 等待10秒
            return await self._retry(request, exception, spider)
        # 其他异常使用默认行为
        return await super().process_exception(request, exception, spider)

性能与稳定性调优建议

  1. 数据库优化

    • task_id, url 等字段建立索引。
    • 定期清理已完成的任务记录(scheduler 表)。
  2. 内存与队列监控

    • interceptor 中记录队列长度,当待处理任务过多时,暂停新增种子任务。
      if context.scheduler.get_waiting_count() > 10000:
      context.logger.error("任务队列堆积,暂停接收新任务")
  3. 分布式扩展(高级)

    • 修改 scheduler.persistence.typeredis,可以让多个 OpenClaw 实例共享任务队列和去重集合,实现分布式抓取。
  4. 日志与监控

    • 配置详细的日志级别 (logging.yaml) 来跟踪问题。
    • 将关键指标(如请求速度、成功率)发送到监控系统(如 Prometheus)。

进阶工作流

  1. 配置 scheduler.yaml:设定符合目标网站的速率限制、并发策略和持久化。
  2. 编写强大的 interceptor.py:实现动态请求头、代理轮换、反爬检测和绕过逻辑。
  3. 利用 runtime.yaml:将可能变化的参数外置,实现动态调参。
  4. 完善 pipeline.py:确保数据被正确清洗、验证和存储。
  5. 根据需要编写 middleware:处理底层网络异常或特殊重试逻辑。
  6. 启动前,使用 openclaw check 命令验证所有配置文件的正确性。

请注意:修改任何配置文件后,需要重启 OpenClaw 服务 (openclaw run) 才能使更改生效,对于 runtime.yaml 中的部分变量,如果拦截器支持热加载,则可能无需重启。

抱歉,评论功能暂时关闭!