分布式爬取

分布式爬取

Yiuhang Chan

分布式概念和作用

分布式系统(Distributed Systems)

概念

分布式系统是由一组相互独立的计算机通过网络进行通信和协调工作的系统。每台计算机(通常称为“节点” Node)都有自己的内存和处理能力,它们共同协作以完成特定的任务或管理共享数据。

作用与特点

  1. 可扩展性(Scalability): 分布式系统可以通过增加更多的节点来提高性能和容量,而不是依赖单个更强大的机器。
  2. 容错性(Fault Tolerance): 由于系统分布在多个节点上,即使部分节点失败,整个系统仍然可以继续运行。
  3. 资源共享(Resource Sharing): 分布式系统允许不同地理位置的节点共享资源,如文件、数据或设备。
  4. 灵活性(Flexibility): 可以根据需要在不同的节点上部署不同的服务和应用。
  5. 并行处理(Parallel Processing): 多个节点可以同时处理任务,提高处理速度。

应用场景

  • 云计算(Cloud Computing)
  • 大数据处理(Big Data Processing)
  • 在线事务处理(Online Transaction Processing, OLTP)
  • 分布式文件系统(Distributed File Systems)

分布式系统与网络爬虫的结合

网络爬虫(Web Crawler)概念

网络爬虫,也称为网络蜘蛛(Web Spider)或网络机器人(Web Bot),是一种自动访问互联网并从网页中提取信息的程序。

结合分布式系统的优势

  1. 提高爬取效率: 通过多个节点并行爬取,可以显著提高数据收集的速度。
  2. 抗干扰能力强: 单点爬虫易被封禁,而分布式爬虫由于请求来自不同IP,更难被识别和封禁。
  3. 数据冗余与备份: 在多个节点上存储数据,增加数据安全性。
  4. 负载均衡(Load Balancing): 自动分配任务到不同节点,避免单个节点过载。

应用实例

  • 搜索引擎(如Google, Bing)使用分布式爬虫来索引网页。
  • 数据挖掘和市场分析,通过分布式爬虫收集大量数据进行分析。
  • 社交媒体分析,如通过爬取Twitter或Facebook的公开数据进行情感分析或趋势预测。

技术实现

  • 消息队列(Message Queue): 如RabbitMQ、Kafka,用于在不同节点间分配任务。
  • 分布式存储(Distributed Storage): 如Hadoop HDFS,用于存储爬取的数据。
  • 负载均衡器(Load Balancer): 均衡请求分配。
  • 分布式数据库(Distributed Database): 如Cassandra或MongoDB,用于高效存储和检索数据。

将分布式系统的概念应用于网络爬虫,可以极大地提高爬虫的效率和效能,同时降低被封禁的风险。这种结合在数据密集型应用,如搜索引擎和大数据分析中尤为重要。

Scrapy-Redis

概念

Scrapy-Redis是一个基于Redis的Scrapy扩展,用于支持Scrapy项目的分布式爬取和数据处理。

特点

  1. 分布式爬取: 通过共享Redis中的请求队列,支持多个Scrapy爬虫实例的协作,适合多域名、大规模网页的爬取。
  2. 分布式数据处理: 爬取到的数据(Items)被推送至Redis中,允许部署多个数据处理节点。
  3. 兼容性强: 可以轻松集成至现有的Scrapy项目中,不需大量修改原有代码。

应用架构

  • Scrapy爬虫节点: 多个Scrapy实例并行工作,共享任务队列。
  • Redis服务器: 作为中心节点,管理请求队列和数据存储。
  • 数据处理节点: 可以是Scrapy管道(Pipelines)或其他数据处理应用。

Redis的安装与使用

在Ubuntu/Debian上安装Redis

在Linux系统上安装Redis的过程可以通过不同的方法进行,包括使用包管理器或Snapcraft。

先决条件(Prerequisites)

对于运行极简发行版的系统(例如Docker容器),可能需要首先安装lsb-releasecurlgpg

sudo apt install lsb-release curl gpg

添加仓库(Repository)

将Redis的官方APT仓库添加到系统的apt索引中,然后进行更新:

  1. 导入GPG密钥(Import GPG Key):

curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg

  1. 添加Redis存储库(Add Redis Repository):

echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" | sudo tee /etc/apt/sources.list.d/redis.list

  1. 更新系统包列表(Update Package List):

sudo apt-get update

  1. 安装Redis(Install Redis):

sudo apt-get install redis

信息

如果找不到Redis路径可以先查看redis进程号, 查看命令: ps -ef | grep redis

拿到上面进程号####然后查询ll -l /proc/####/cwd即可

Redis 基本信息和操作

基本信息

Redis(Remote Dictionary Server)是一个开源的、支持网络、基于内存且可选持久化的键值对存储数据库。它通常被用作数据库、缓存或消息传递系统。它提供了丰富的数据结构,如字符串(strings)、哈希表(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets)以及位图(bitmaps)等。

配置信息
  • 默认端口: 6379。
  • 数据库数量: 默认支持16个,编号从0到15。
redis命令
  • redis执行了make install后,redis的课执行文件都会自动复制到 /usr/local/bin 目录
    • redis-server redis服务器
    • redis-cli redis命令行客户端
    • redis-benchmark redis性能测试工具
    • redis-check-aof aof文件修复工具
    • redis-check-dump rdb文件检查工具
    • redis-cli shutdown 停止 redis
    • redis-server 启动 redis-server
    • redis-server /etc/redis/redis.conf 带配置文件启动
数据类型
  1. 字符串(String):这是 Redis 最基本的数据类型,它可以存储任何形式的字符串,包括文本、数字甚至二进制数据。字符串类型常用于缓存用户信息、会话数据等。
  2. 哈希(Hash):哈希是键值对的集合,类似于编程语言中的字典或哈希表。它适用于存储对象,每个对象可以包含多对字段和值。例如,可以用哈希存储用户的各种属性,如姓名、年龄、邮箱等。
  3. 列表(List):列表是一系列有序的字符串,类似于数组。Redis 列表是双向链表,因此在列表的头部或尾部添加或删除元素非常快速。它适用于实现队列、栈或其他有序集合。
  4. 集合(Set):集合是一组不重复的字符串元素。由于集合中的元素是无序的,可以快速执行添加、删除和检查元素是否存在等操作。集合适用于存储无序的唯一元素,例如标签、好友关系等。
  5. 有序集合(Sorted Set):有序集合和普通集合类似,但每个元素都会关联一个浮点数分数,这使得元素能按分数有序排列。有序集合适用于需要按某种顺序访问元素的场景,如排行榜。
  6. 位图(Bitmap):位图本质上是字符串类型的一种特殊用法,它允许对字符串的位进行操作。位图适用于处理大量的布尔值,如用户在线状态、特征标志等。
  7. 超级日志(HyperLogLog):超级日志是一种用于高效统计唯一元素个数(如独立访客数)的概率数据结构。它可以使用极小的内存空间来处理大量数据。
基本操作
字符串(String)操作

Redis 的字符串(String)是最基本的数据类型之一,它支持多种操作,涵盖了对单个或多个字符串值的处理。

  • 设置值:
    • SET key value: 设置指定键的值。
    • SETEX key seconds value: 设置键的值,并设置过期时间(以秒为单位)。
    • SETNX key value: 仅当键不存在时,设置键的值。
    • MSET key1 value1 [key2 value2 ...]: 同时设置一个或多个键值对。
  • 获取值:
    • GET key`: 获取指定键的值。
    • MGET key1 [key2 ...]: 获取一个或多个键的值。
    • GETSET key value: 设置指定键的值,并返回键的旧值。
  • 操作数值:
    • INCR key: 将键的整数值增加一。
    • DECR key: 将键的整数值减少一。
    • INCRBY key increment: 将键的整数值增加指定的量。
    • DECRBY key decrement: 将键的整数值减少指定的量。
    • INCRBYFLOAT key increment: 将键的浮点数值增加指定的量。
  • 处理子字符串:
    • GETRANGE key start end: 获取键值的子串。
    • SETRANGE key offset value: 从指定的偏移量开始替换键的值。
  • 键值长度和追加:
    • STRLEN key: 获取指定键值的长度。
    • APPEND key value: 将值追加到指定键的现有值的末尾。
  • 位操作:
    • SETBIT key offset value: 对键值的二进制表示的指定偏移量的位进行设置。
    • GETBIT key offset: 获取键值的二进制表示中指定偏移量的位值。
    • BITCOUNT key [start end]: 计算字符串被设置为1的位数。
    • BITOP operation destkey key1 [key2 ...]: 对一个或多个键执行位操作,并将结果存储在目标键中。
  • 过期和持久化:
    • EXPIRE key seconds: 设置键的过期时间(秒)。
    • PERSIST key: 移除键的过期时间,使其成为持久的键。
哈希表(Hash)操作

Redis 的哈希(Hash)是一种键值对集合的数据类型,非常适合用来存储对象。哈希表中的每个字段都是唯一的,并且它们映射到特定的值。

  • 添加和设置字段:
    • HSET key field value: 设置哈希表字段的值。
    • HMSET key field1 value1 [field2 value2 ...]: 同时设置哈希表的多个字段。
    • HSETNX key field value: 仅当字段不存在时,设置哈希表字段的值。
  • 获取字段值:
    • HGET key field: 获取存储在哈希表中的字段的值。
    • HMGET key field1 [field2 ...]: 获取所有给定字段的值。
    • HGETALL key: 获取在哈希表中指定键的所有字段和值。
    • HVALS key: 获取哈希表中所有字段的值。
  • 删除字段:
    • HDEL key field1 [field2 ...]: 删除一个或多个哈希表字段。
  • 字段计数:
    • HLEN key: 获取哈希表中字段的数量。
  • 检查字段存在:
    • HEXISTS key field: 查看哈希表的指定字段是否存在。
  • 增减数值字段:
    • HINCRBY key field increment: 为哈希表 key 中的指定字段的整数值加上增量 increment。
    • HINCRBYFLOAT key field increment: 为哈希表 key 中的指定字段的浮点数值加上增量 increment。
  • 获取所有字段名:
    • HKEYS key: 获取哈希表中的所有字段。
  • 扫描哈希表:
    • HSCAN key cursor [MATCH pattern] [COUNT count]: 迭代哈希表中的键值对。
  • 获取字段值的长度:
    • HSTRLEN key field: 获取存储在哈希表中的字段值的长度。
列表(List)操作

Redis 的列表(List)是一种线性数据结构,主要用于存储有序的字符串元素集合。Redis 列表是基于双向链表实现的,这意味着即使在很长的列表上,向列表的头部或尾部添加和移除元素仍然非常快速。

  • 添加元素:
    • LPUSH key value1 [value2 ...]: 将一个或多个值插入到列表头部。
    • RPUSH key value1 [value2 ...]: 将一个或多个值插入到列表尾部。
  • 移除元素:
    • LPOP key: 移除并返回列表头部的元素。
    • RPOP key: 移除并返回列表尾部的元素。
    • LREM key count value: 根据参数 count 的值,移除列表中与参数 value 相等的元素。
  • 列表长度:
    • LLEN key: 获取列表长度。
  • 获取列表元素:
    • LRANGE key start stop: 获取列表指定范围内的元素。
    • LINDEX key index: 获取列表中指定索引的元素。
  • 修改列表元素:
    • LSET key index value: 将列表中指定索引的元素的值设置为另一个值。
  • 插入元素:
    • LINSERT key BEFORE|AFTER pivot value: 在列表的元素前或后插入元素。
  • 截取列表:
    • LTRIM key start stop: 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
  • 阻塞操作:
    • BLPOP key1 [key2 ...] timeout: 移除并获取列表的第一个元素,如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
    • BRPOP key1 [key2 ...] timeout: 移除并获取列表的最后一个元素,如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
    • BRPOPLPUSH source destination timeout: 从列表中弹出一个值,将它推入另一个列表并返回它;如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
  • 转移元素:
    • RPOPLPUSH source destination: 移除列表的最后一个元素,将该元素添加到另一个列表并返回。
集合(Set)操作

Redis 的集合(Set)是一种存储唯一元素且无序的字符串集合。集合非常适合用于存储不重复的数据,如标签、好友关系等。

  • 添加元素:
    • SADD key member1 [member2 ...]: 向集合添加一个或多个成员。
  • 移除元素:
    • SREM key member1 [member2 ...]: 移除集合中一个或多个成员。
  • 集合大小:
    • SCARD key: 获取集合的成员数。
  • 检查成员存在:
    • SISMEMBER key member: 判断成员元素是否是集合的成员。
  • 获取所有成员:
    • SMEMBERS key: 获取集合中的所有成员。
  • 随机元素:
    • SRANDMEMBER key [count]: 返回集合中一个或多个随机数。
    • SPOP key [count]: 移除并返回集合中的一个随机元素。
  • 集合运算:
    • SUNION key1 [key2 ...]: 返回所有给定集合的并集。
    • SINTER key1 [key2 ...]: 返回所有给定集合的交集。
    • SDIFF key1 [key2 ...]: 返回第一个集合与其他集合之间的差集。
  • 集合运算并存储结果:
    • SUNIONSTORE destination key1 [key2 ...]: 计算所有给定集合的并集,并存储在 destination 集合中。
    • SINTERSTORE destination key1 [key2 ...]: 计算所有给定集合的交集,并存储在 destination 集合中。
    • SDIFFSTORE destination key1 [key2 ...]: 计算第一个集合与其他集合的差集,并存储在 destination 集合中。
  • 集合的遍历:
    • SSCAN key cursor [MATCH pattern] [COUNT count]: 迭代集合中的元素。
  • 移动成员到另一个集合:
    • SMOVE source destination member: 将成员从一个集合移动到另一个集合。
有序集合(Sorted Set)操作

Redis 的有序集合(Sorted Set)是一种集合数据类型,它不仅存储成员(Member),还为每个成员关联了一个浮点数分数(Score)。这使得成员能够按照分数的大小排序,从而支持快速的插入、删除、查找和排名操作。

  • 添加和更新成员:
    • ZADD key score1 member1 [score2 member2 ...]: 向有序集合添加一个或多个成员,或更新已存在成员的分数。
  • 移除成员:
    • ZREM key member1 [member2 ...]: 移除有序集合中的一个或多个成员。
  • 获取成员数量:
    • ZCARD key: 获取有序集合的成员数量。
  • 获取成员分数:
    • ZSCORE key member: 获取有序集合中成员的分数。
  • 增加成员分数:
    • ZINCRBY key increment member: 增加有序集合中成员的分数。
  • 范围查询:
    • ZRANGE key start stop [WITHSCORES]: 返回有序集合中指定区间内的成员。
    • ZREVRANGE key start stop [WITHSCORES]: 返回有序集合中指定区间内的成员,按分数从高到低排序。
    • ZRANGEBYSCORE key min max [WITHSCORES]: 根据分数值获取有序集合中的成员。
  • 移除指定排名或分数范围内的成员:
    • ZREMRANGEBYRANK key start stop: 移除有序集合中指定排名(index)区间内的所有成员。
    • ZREMRANGEBYSCORE key min max: 移除有序集合中分数在给定区间内的成员。
  • 获取成员排名:
    • ZRANK key member: 获取有序集合中成员的排名(按分数值递增)。
    • ZREVRANK key member: 获取有序集合中成员的排名(按分数值递减)。
  • 有序集合间的运算:
    • ZUNIONSTORE destination numkeys key1 [key2 ...] [WEIGHTS weight1 [weight2 ...]]: 计算多个有序集合的并集,并存储在新的有序集合中。
    • ZINTERSTORE destination numkeys key1 [key2 ...] [WEIGHTS weight1 [weight2 ...]]: 计算多个有序集合的交集,并存储在新的有序集合中。
  • 遍历有序集合:
    • ZSCAN key cursor [MATCH pattern] [COUNT count]: 迭代有序集合中的元素。
键(Key)管理

Redis 提供了一系列命令用于管理和操作键(Key),这些命令允许你对存储在 Redis 中的数据进行各种操作,包括但不限于查询、移动、重命名和设置键的生命周期。

  • 键存在性检查:
    • EXISTS key1 [key2 ...]: 检查一个或多个键是否存在。
  • 删除键:
    • DEL key1 [key2 ...]: 删除一个或多个键。
  • 设置键的过期时间:
    • EXPIRE key seconds: 设置键的过期时间(秒)。
    • PEXPIRE key milliseconds: 设置键的过期时间(毫秒)。
    • EXPIREAT key timestamp: 设置键的过期时间为 UNIX 时间戳(秒)。
    • PEXPIREAT key milliseconds-timestamp: 设置键的过期时间为 UNIX 时间戳(毫秒)。
  • 查询键的剩余生存时间:
    • TTL key: 查询键的剩余生存时间(秒)。
    • PTTL key: 查询键的剩余生存时间(毫秒)。
  • 移除键的过期时间:
    • PERSIST key: 移除键的过期时间。
  • 查找符合给定模式的键:
    • KEYS pattern: 查找所有符合给定模式的键。
    • SCAN cursor [MATCH pattern] [COUNT count]: 迭代数据库中的数据库键。
  • 重命名键:
    • RENAME key newkey: 修改键名。
    • RENAMENX key newkey: 仅当 newkey 不存在时,将 key 改名为 newkey。
  • 随机返回一个键:
    • RANDOMKEY: 从数据库中随机返回一个键。
  • 键类型查询:
    • TYPE key: 返回键所储存的值的类型。
  • 移动键到另一个数据库:
    • MOVE key db: 将当前数据库的键移动到指定的数据库 db 当中。
  • 键的字节串长度查询:
    • STRLEN key: 获取指定键所储存的字符串值的长度。
  • 原子操作:
    • DUMP key: 返回存储在指定键的值的序列化版本。
    • RESTORE key ttl serialized-value [REPLACE]: 通过序列化值创建一个键,ttl 为存活时间,REPLACE 表示是否替换已存在的键。
事务(Transaction)

在 Redis 中,事务提供了一种将多个命令打包然后一次性、按顺序执行的机制。它通过一系列命令来实现事务的开始、执行和管理。

  • 开始事务:
    • MULTI: 标记一个事务块的开始。后续的命令将会被队列化,而不是立即执行。
  • 命令入队:
    • MULTI 命令之后输入的命令不会立即被执行,而是会被放入一个队列中。当执行 EXEC 命令时,队列中的所有命令会被连续执行。
  • 执行事务:
    • EXEC: 执行所有在 MULTI 之后入队的命令。如果在执行 EXEC 之前连接被断开,那么事务中的所有命令都不会被执行。
  • 放弃事务:
    • DISCARD: 取消事务,放弃执行事务块内的所有命令。
  • 监视键:
    • WATCH key1 [key2 ...]: 在执行事务之前监视一个或多个键。如果在调用 EXEC 命令之前这些键被其他命令改变了,那么事务将被放弃。
  • 取消监视键:
    • UNWATCH: 取消 WATCH 命令对所有键的监视。
  • Redis 事务的特点:
    • 原子性: Redis 的事务保证了事务内的命令要么全部执行,要么全部不执行。
    • 没有隔离级别: Redis 不支持传统数据库意义上的事务隔离级别。如果事务执行期间,有其他客户端修改了被 WATCH 命令监视的键,那么该事务将被取消。
    • 不支持回滚: 在 Redis 中,事务没有回滚的概念。如果事务中某个操作失败,其他操作仍会继续执行。
发布/订阅

Redis 的发布/订阅(pub/sub)是一种消息通信模式,它使得消息的发送者(发布者)不需要知道消息的接收者(订阅者),反之亦然。这种模式非常适合构建松耦合的应用程序和服务。

  • 发布消息:
    • PUBLISH channel message: 将消息发送到指定的频道。
  • 订阅频道:
    • SUBSCRIBE channel1 [channel2 ...]: 订阅一个或多个频道。订阅后,客户端将接收这些频道上发布的所有消息。
  • 退订频道:
    • UNSUBSCRIBE [channel1 [channel2 ...]]: 退订一个或多个频道。如果没有提供频道名,客户端将退订所有频道。
  • 按模式订阅频道:
    • PSUBSCRIBE pattern1 [pattern2 ...]: 根据给定的模式订阅频道。模式可以包含通配符。
  • 按模式退订频道:
    • PUNSUBSCRIBE [pattern1 [pattern2 ...]]: 根据给定的模式退订频道。如果没有提供模式,客户端将退订所有模式。
  • 列出当前活跃的频道:
    • PUBSUB CHANNELS [pattern]: 列出当前的活跃频道。可选的模式参数可以用来过滤频道名。
  • 列出特定频道的订阅者数量:
    • PUBSUB NUMSUB [channel1 [channel2 ...]]: 返回一个或多个频道的订阅者数量。
  • 列出按模式订阅的数量:
    • PUBSUB NUMPAT: 返回按模式订阅的数量。
地理位置

Redis 从 3.2 版本开始引入了地理空间支持,允许使用一组命令来存储、查询和操作地理位置信息。这些功能基于 Redis 的有序集合实现,每个元素都是一个带有经纬度的地理空间信息。

  • 添加地理位置信息:
    • GEOADD key longitude latitude member [longitude latitude member ...]: 将指定的地理空间位置(经度、纬度、成员)添加到指定的 key 中。
  • 地理位置查询:
    • GEOPOS key member [member ...]: 返回一个或多个成员位置的经纬度。
  • 计算两地距离:
    • GEODIST key member1 member2 [unit]: 计算两个成员之间的距离。单位可以是 m(米)、km(千米)、mi(英里)或 ft(英尺)。
  • 按半径查询成员:
    • GEORADIUS key longitude latitude radius m|km|mi|ft [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]: 以指定的经纬度为中心,查找在指定半径内的元素。可选参数提供了多种查询功能,例如排序、限制返回数量等。
    • GEORADIUSBYMEMBER key member radius m|km|mi|ft [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] [STORE key] [STOREDIST key]: 以指定成员为中心,查找在指定半径内的元素。
  • 地理空间索引的哈希值:
    • GEOHASH key member [member ...]: 返回一个或多个位置对象的 Geohash 字符串。
基于概率的数据结构

Redis 支持几种基于概率的数据结构,允许在内存效率和准确性之间取得平衡。这些数据结构特别适用于处理大规模数据集,其中完全准确的结果要么不可行,要么成本过高。

  • HyperLogLog

    用于高效的基数估计(即估计唯一元素的数量)。

    • PFADD key element [element ...]: 向 HyperLogLog 中添加元素。
    • PFCOUNT key [key ...]: 返回 HyperLogLog 的近似基数估计值。
    • PFMERGE destkey sourcekey [sourcekey ...]: 合并多个 HyperLogLog 到一个新的 HyperLogLog 中。
  • 布隆过滤器(通过 RedisBloom 模块)

    布隆过滤器是一种空间效率很高的数据结构,用于测试一个元素是否是集合的一部分。它可能返回假阳性,但不会返回假阴性。

    • BF.ADD key element: 将元素添加到布隆过滤器。

    • BF.MADD key element [element ...]: 向布隆过滤器中添加多个元素。

    • BF.EXISTS key element: 检查元素是否可能存在于布隆过滤器中。

    • BF.MEXISTS key element [element ...]: 检查多个元素是否可能存在于布隆过滤器中。

  • 计数最小Sketch(通过 RedisBloom 模块)

    计数最小 Sketch 是一种用于频率估计的数据结构,它可以估计一个元素在数据流中出现的次数。

    • CMS.INITBYDIM key width depth: 通过指定宽度和深度初始化计数最小 Sketch。
    • CMS.INITBYPROB key error probability: 通过指定错误率和置信度初始化计数最小 Sketch。
    • CMS.INCRBY key item count [item count ...]: 增加一个或多个元素的计数值。
    • CMS.QUERY key item [item ...]: 查询一个或多个元素的计数值。
  • Top-K 结构(通过 RedisBloom 模块)

    Top-K 结构用于跟踪数据流中最常见的元素。

    • TOPK.RESERVE key topk width depth decay: 创建一个 Top-K 结构。
    • TOPK.ADD key item [item ...]: 向 Top-K 结构中添加一个或多个元素。
    • TOPK.QUERY key item [item ...]: 查询一个或多个元素是否在 Top-K 结构中。
    • TOPK.LIST key: 列出 Top-K 结构中的元素。
其他
  • PING:检查服务是否运行。
  • INFO:获取服务器的信息和统计。
数据持久化
RDB(Redis Database)

RDB是Redis的一种持久化机制,它会在指定的时间间隔内生成数据集的时间点快照。

  • 触发机制: 可以通过SAVE命令手动触发,或者配置在redis.conf文件中的规则自动触发。
  • 优点: 提供了一个非常紧凑的数据文件,适合灾难恢复,快照频率可配置,可以最大化性能和数据持久性之间的权衡。
  • 缺点: 在生成快照时可能会消耗大量的IO和CPU资源,如果Redis在快照之间崩溃,那么自上次快照以来的所有数据将丢失。
AOF(Append Only File)

AOF是另一种Redis持久化机制,记录了执行过的所有写操作命令,并在服务器启动时重新执行这些命令来重建数据集。

  • 触发机制: 每个写操作命令都会追加到AOF文件中。
  • 优点: 提供了更好的持久性和安全性,可以配置不同的同步方式,如每秒同步或每写操作同步。
  • 缺点: AOF文件通常比RDB文件大,且恢复速度可能慢于RDB方式。
应用场景
  • 会话缓存: 存储用户会话信息,如登录状态或用户的临时偏好设置,以便快速读取。
  • 队列: 利用Redis的列表数据结构实现消息队列,支持多种队列模式,如FIFO(先进先出),LIFO(后进先出)。
  • 排行榜: 使用Redis的有序集合数据类型,它可以保持元素插入的顺序,适合实时排行榜系统。
  • 计数器: 利用Redis的原子操作如INCR实现计数器,常用于统计点击数、用户在线数等。
  • 发布/订阅: Redis提供了PUB/SUB模式,允许客户端订阅频道并接收发布到这些频道的消息,适用于实时消息系统如聊天室。

Redis如何开启远程服务

Redis开启远程服务的操作方法:

  1. 打开redis的配置文件redis.conf
  2. bind 127.0.0.1注释掉
  3. protected-mode yes改成protected-mode no
  4. 添加daemonize no
  5. 重启redis服务

Scrapy-Redis 中存储的数据类型及其用途

  • List类型 (spidername:items)

    • 用途:存储爬虫获取到的数据项,数据项是JSON格式的字符串。
    • 例如:db250:items 存储豆瓣电影 Top 250 的爬取数据。
  • Set类型 (spidername:dupefilter)

    • 用途:用于存储已访问URL的哈希值,以实现去重。
    • 特点:存储的是URL的40字符哈希字符串。
  • List类型 (spidername:start_urls)

    • 用途:存储启动爬虫时的初始URL。
    • 例如:db250:start_urls 可以存储 https://www.movie.douban.com/top250 作为起始页面。
  • ZSet类型 (spidername:requests)

    • 用途:存储待调度的请求,内容是序列化的请求对象。
    • 特点:使用ZSet可以对请求进行优先级排序。

常用设置及其说明

  1. 调度器设置
    • SCHEDULER = "scrapy_redis.scheduler.Scheduler"
    • 功能:启用Scrapy-Redis的调度器,将请求存储到Redis中。
  2. 去重过滤器设置
    • DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
    • 功能:确保所有的Spider通过Redis共享相同的重复过滤器。
  3. 管道设置
    • ITEM_PIPELINES = {'scrapy_redis.pipelines.RedisPipeline': 300,}
    • 功能:启用Redis管道,将爬取的项存储到Redis中。
  4. Redis连接设置
    • REDIS_HOST = 'localhost'
    • REDIS_PORT = 6379
    • 功能:指定连接到Redis服务器的主机和端口。
  5. 调度器持久化设置
    • # SCHEDULER_PERSIST = True
    • 功能:设置为True时,不清理Redis队列,允许爬虫暂停和恢复,数据不会丢失。
    • 特点:这是一个可选设置,根据需求选择是否启用。

补充说明

  • 扩展性:Scrapy-Redis的设计使得Scrapy爬虫具有更好的扩展性,特别适用于大规模分布式爬取。
  • 负载均衡:通过Redis队列,多个爬虫实例可以共享URL请求,实现负载均衡。
  • 数据共享:所有的Spider可以通过Redis数据库共享去重信息和待抓取的URL,从而协同工作。

分布式项目演示

演示逻辑

  1. 将代码设置成可以进行分布式爬取代码
  2. 将分布式代码copy一份,一共两份
  3. 两份代码,两处内容存放的位置,两处爬取的内容之和是整体内容
项目配置及项目代码

spider作了简单改动,更重要的是在settings里作一些设置

Spider文件

代码作了简单改动

  1. 导出RedisSpider

  2. 类继承 RedisSpider

  3. 注销start_urls 设置 redis_key = “db:start_urls” 开启爬虫

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import scrapy
from ..items import DbItem
from scrapy_redis.spiders import RedisSpider

class Dbtest1Spider(RedisSpider):
print("#1 Spider on...")
name = "dbtest1"
allowed_domains = ["movie.douban.com"]
# start_urls = ["https://movie.douban.com/top250"] # 修改为开始爬取的页面
redis_key = "dbtest1:start_urls" # 3.开启爬虫的key
page_num = 0

# parse 里面我们只需要关注 怎么解析就行, 因为scrapy会自动帮我们去请求指定的网址
def parse(self, response):
node_list = response.xpath('//div[@class="info"]')
if node_list:
for node in node_list:
# 标题
movie_name = node.xpath('./div/a/span/text()').get()
# 导演
director = node.xpath('./div/p/text()').get().strip()
# 分数
score = node.xpath('.//span[@class="rating_num"]/text()').get()

item = DbItem()
item["movie_name"] = movie_name
item["director"] = director
item["score"] = score

# 电影详情页
detail_url = node.xpath('./div/a/@href').get()
yield scrapy.Request(detail_url, callback=self.get_detail, meta={"info":item})

# 发送新一页的请求
# 构造url
if response.meta.get("num"):
self.page_num = response.meta.get('num')
page_url = "https://movie.douban.com/top250?start={}&filter=".format(self.page_num*25)
yield scrapy.Request(page_url, callback=self.parse, meta={"num": self.page_num})
self.page_num +=1
if self.page_num ==6:
return

# 详情页解析
def get_detail(self,response):
item = DbItem()
info = response.meta.get("info")
item.update(info)
description_list = response.xpath('//div[@id="link-report"]/span[@class="all hidden"]/text()|//div[@id="link-report"]/span[@property="v:summary"]/text()').getall()
description = ''.join([des.strip() for des in description_list])

item["description"] = description
yield item
Items文件
1
2
3
4
5
6
7
8
9
10
import scrapy


class DbItem(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
movie_name = scrapy.Field()
director = scrapy.Field()
score = scrapy.Field()
description = scrapy.Field()
Pipelines文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from itemadapter import ItemAdapter
import json


class DbPipeline:
def open_spider(self, spider):
self.f = open('film.txt', 'w', encoding='utf-8')

def process_item(self, item, spider):
json_str = json.dumps(dict(item), ensure_ascii=False) + '\n'
self.f.write(json_str)
return item

def close_spider(self, spider):
self.f.close()
Settings文件

settings文件需要配置

  • 公共的调度器 SCHEDULER

  • 公共的过滤器 DUPEFILTER_CLASS

  • 公共存储区域 redis

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# -*- coding: utf-8 -*-

BOT_NAME = 'db'

SPIDER_MODULES = ['db.spiders']
NEWSPIDER_MODULE = 'db.spiders'

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

DOWNLOAD_DELAY = 1

# Override the default request headers:
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36"
}

# 启用调度将请求存储进redis
# 必须
SCHEDULER = "scrapy_redis.scheduler.Scheduler"

# 确保所有spider通过redis共享相同的重复过滤。
# 必须
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"

#公共管道
ITEM_PIPELINES = {
'scrapy_redis.pipelines.RedisPipeline':300,
'db.pipelines.DbPipeline': 200,
}
# 指定连接到Redis时要使用的主机和端口。
# 必须
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
# 不清理redis队列,允许暂停/恢复抓取。
# 可选 允许暂停,redis数据不丢失
SCHEDULER_PERSIST = True

#日志文件配置
# LOG_FILE="db_redis.log"
LOG_ENABLED=True

LOG_FORMAT='%(asctime)s [%(name)s] %(levelname)s: %(message)s'
LOG_DATEFORMAT='%Y'

LOG_ENCODING="utf-8"
LOG_LEVEL="INFO"

执行演示

爬虫执行

可以发现爬虫1和爬虫2未有提交URL时进行了等待

注意

TypeError: ExecutionEngine.crawl() got an unexpected keyword argument 'spider'

Scrapy版本和scrapy-redis版本有概率不兼容,我的案例是修正Scrapy版本为2.8.0解决问题

添加URL给Redis开启爬取

执行命令在Redis添加列表添加URL

LPUSH dbtest1:start_urls "https://movie.douban.com/top250"

随后两个爬虫就会开始进行爬取即可

Scrapy-Redis 源码解读

connection.py

这个模块的作用是建立与Redis数据库的连接。它通常提供一个工厂方法,用于创建Redis连接对象,这使得Scrapy项目能够连接到Redis服务器并执行各种操作,如存储或检索数据。

defaults.py

这里定义了Scrapy-Redis用到的一些默认设置。这可能包括默认的Redis URL、连接参数、队列名称等。这些常量和默认值对于确保Scrapy-Redis的标准行为至关重要。

dupefilter.py

这个 dupefilter.py 源码是Scrapy-Redis扩展的一部分,专门用于处理请求去重。请求去重是网页爬虫中的一个关键步骤,用于确保不重复爬取相同的网页,从而提高效率和减少资源浪费。这个模块使用Redis作为存储后端来记录已经处理过的请求。

RFPDupeFilter

RFPDupeFilter 类继承自 Scrapy 的 BaseDupeFilter,是一个基于Redis的请求去重过滤器。

初始化方法 __init__

  • server: 接收一个 Redis 实例,用于与Redis数据库通信。
  • key: 指定在Redis中用于存储去重指纹(fingerprints)的键。
  • debug: 一个布尔值,用于确定是否打印日志信息,用于调试。

类方法 from_settings

这个方法用于从Scrapy的设置中创建一个 RFPDupeFilter 实例。它读取相关配置并实例化一个连接到Redis的去重过滤器。

类方法 from_crawler

这个方法从Scrapy的爬虫中创建 RFPDupeFilter 实例。它是 from_settings 的包装方法,提供了一个从 Crawler 对象获取设置的快捷方式。

方法 request_seen

这个方法用于检查一个请求是否已经被看到(即之前已经处理过)。它使用请求的指纹(fingerprint),如果这个指纹已经存在于Redis中,就认为这个请求是重复的。

  • request_fingerprint 方法用于生成请求的指纹,通常基于请求的URL和其他特征。

方法 closeclear

  • close 方法在过滤器关闭时被调用,用于执行清理工作。
  • clear 方法用于从Redis中删除所有的去重指纹数据。

方法 log

这个方法用于在启用调试模式时打印日志信息,记录被过滤掉的重复请求。

整体工作流程

  1. 当Scrapy爬虫启动时,RFPDupeFilter 会被实例化,并与Redis建立连接。
  2. 对于每个发出的请求,RFPDupeFilter 会计算其指纹并检查这个指纹是否已经存储在Redis中。
  3. 如果指纹已存在,表明请求是重复的,将不会再次处理该请求。
  4. 所有的指纹都存储在Redis中,这允许在分布式爬虫环境下跨多个爬虫实例共享去重信息。

picklecompat.py

这个模块是用于数据序列化和反序列化的。虽然您提到它类似于JSON转换,但它实际上更多使用的是pickle库,这是Python中一个常用的序列化方法。它允许复杂的数据结构如Python对象被转换为可以存储在Redis中的形式。

pipelines.py

在Scrapy框架中,pipelines用于处理爬取的数据。Scrapy-Redis的这一模块扩展了这个概念,允许将爬取的数据存储在Redis数据库中。这对于分布式爬虫来说特别重要,因为它允许多个爬虫实例共享和存储数据。

queue.py

这个模块维护了请求队列。在Scrapy-Redis中,通常有几种类型的队列,例如FIFO(先进先出)、LIFO(后进先出)等,用于控制请求的处理顺序。通过Redis来管理这些队列,可以实现跨多个爬虫实例的请求共享。

scheduler.py

这是Scrapy-Redis中的一个核心组件。调度器负责决定何时以及如何发送请求。在Scrapy-Redis中,这通常涉及从Redis队列中获取请求,并在分布式环境下有效地管理这些请求。

Scheduler

Scheduler 类是一个基于 Redis 的调度器。

初始化方法 __init__

  • server: Redis 服务器实例,用于和 Redis 数据库进行交互。
  • persist: 表示在关闭时是否保留请求数据。如果为 False,则在关闭时清空队列。
  • flush_on_start: 表示在启动时是否清空 Redis 队列。
  • queue_key: 请求队列在 Redis 中的键名。
  • queue_cls: 请求队列的类的路径,用于动态加载队列类。
  • dupefilter_key: 去重过滤器在 Redis 中的键名。
  • dupefilter_cls: 去重过滤器类的路径,用于动态加载去重类。
  • idle_before_close: 在关闭前等待的空闲时间,如果在这段时间内没有接收到消息,则关闭调度器。
  • serializer: 用于序列化请求的类或模块。

类方法 from_settings

这个方法从 Scrapy 的设置中创建一个 Scheduler 实例。它读取相关配置并基于这些配置创建一个实例。

类方法 from_crawler

这个方法从 Scrapy 的爬虫对象中创建一个 Scheduler 实例。它是 from_settings 的包装器,提供从爬虫对象获取设置的方式。

方法 open

在爬虫开启时调用,用于初始化请求队列和去重过滤器。

方法 close

在爬虫关闭时调用,根据 persist 参数决定是否清空队列。

方法 flush

清空队列和去重过滤器中的数据。

方法 enqueue_request

将新的请求加入队列。如果请求不需要过滤且之前没有见过,那么它将被加入队列。

方法 next_request

从队列中弹出下一个请求。

方法 has_pending_requests

检查是否还有等待处理的请求。

工作流程

  1. 当爬虫启动时,Scheduler 会被初始化。它会创建请求队列和去重过滤器。
  2. 如果设置了 flush_on_start,则在启动时清空 Redis 队列。
  3. 在爬虫运行期间,所有新的请求都会通过 enqueue_request 方法加入队列。如果请求被去重过滤器认为是重复的,它将不会被加入队列。
  4. next_request 方法用于获取下一个要处理的请求,这个过程可能会根据 idle_before_close 设置等待一段时间。
  5. 当爬虫关闭时,根据 persist 设置决定是否清空 Redis 队列。

通过扩展关闭分布式爬虫

在Scrapy和Scrapy-Redis中,利用扩展(Extensions)来智能地关闭分布式爬虫是一种有效的策略。这种方法主要依赖于Scrapy的信号系统和对 spider_idle 信号的处理。

Scrapy-Redis 的 spider_idle 方法

在Scrapy-Redis中,当一个爬虫耗尽其内部队列中的所有请求时,Scrapy的信号系统会触发 spider_idle 信号。Scrapy-Redis通过重写 spider_idle 方法,并在方法中主动从Redis队列中调度新的请求。由于这个方法抛出了 DontCloseSpider 异常,所以即使没有剩余的本地请求,爬虫也不会停止,而是会继续等待新的请求从Redis中被调度。

1
2
3
4
5
6
7
8
9
10
11
# The idle signal is called when the spider has no requests left,that's when we will schedule new requests from redis queue
爬虫程序没有剩余请求时,空闲信号就会被调用,也就是我们将从redis队列调度新请求的时候
crawler.signals.connect(self.spider_idle, signal=signals.spider_idle)
意思就非常明显了,空闲的信号被调用---> spide_idle


def spider_idle(self):
"""Schedules a request if available, otherwise waits."""
# XXX: Handle a sentinel to close the spider.
self.schedule_next_requests()
raise DontCloseSpider

因为抛出了DontCloseSpider异常,所以当触发spider_idle信号的时候整个爬虫也不会停止。

自定义扩展

这个自定义扩展RedisSpiderSmartClosedExtensions的目的是在某些条件下智能地关闭爬虫,而不是无限期地等待新的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import time

import logging

from scrapy import signals # 引入信号
from scrapy.exceptions import NotConfigured # 在settings 设置是否开启这个拓展

logger = logging.getLogger(__name__)


class ResdisSpiderSmartClosedExtensions: # 高内聚 低耦合

def __init__(self, idle_number, crawler):
# 数量值
self.idle_number = idle_number
# 这个就是settings里面的一些配置项
self.crawler = crawler
# 专门记录触发信号的时间
self.idle_list = []
# 记录次数,进行对比
self.idle_count = 0

@classmethod
def from_crawler(cls, crawler):
if not crawler.settings.getbool('MYEXT_ENABLED'):
raise NotConfigured

# 判断一下 你是不是一个redis爬虫
if not 'redis_key' in crawler.spidercls.__dict__.keys():
raise NotConfigured('Only Support Redis Version')
idle_number = crawler.settings.getint('IDLE_NUMBER', 10)
ext = cls(idle_number, crawler)

crawler.signals.connect(ext.spider_idle, signal=signals.spider_idle)

return ext


def spider_idle(self, spider):
self.idle_count += 1
self.idle_list.append(time.time())
idle_list_len = len(self.idle_list)

# 长度大于二就证明已经空转很多次了, key有存在就证明你还没有抓完
if idle_list_len > 2 and spider.server.exists(spider.redis_key):
self.idle_list = [self.idle_list[-1]]
elif idle_list_len > self.idle_number:
logger.info('\n continued idle number exceed {} Times'
'\n meet the idle shutdown conditions, will close the reptile operation'
'\n idle start time: {},close spider time: {}'.format(self.idle_number,
self.idle_list[0],
self.idle_list[-1]))
self.crawler.engine.close_spider(spider, 'closespider_pagecount')

初始化方法 __init__

  • idle_number: 表示允许爬虫空闲的次数(没有新请求的情况)。
  • crawler: Scrapy的爬虫对象。
  • idle_list: 用于记录每次空闲时的时间戳。
  • idle_count: 记录空闲发生的次数。

方法 from_crawler

这个类方法用于从爬虫的配置中创建扩展实例,并连接到Scrapy的信号。

方法 spider_openedspider_closed

  • spider_opened: 在爬虫开启时调用,用于记录初始信息。
  • spider_closed: 在爬虫关闭时调用,用于记录关闭信息。

方法 spider_idle

这是关键方法。每次爬虫空闲时,都会增加 idle_count 的值,并记录当前时间。如果连续空闲次数超过设定的 idle_number,并且Redis队列中没有新的请求,那么爬虫将被关闭。

settings.py 中的相关设置

  • EXTENSIONS: 在这里配置自定义的扩展类。
  • MYEXT_ENABLED: 用于启用或禁用这个扩展。
  • IDLE_NUMBER: 设置允许的最大空闲次数。
1
2
3
4
5
6
EXTENSIONS = {
# 'scrapy.extensions.telnet.TelnetConsole': None,
'db.extensions.RedisSpiderSmartIdleClosedExensions': 500
}
MYEXT_ENABLED = True
IDLE_NUMBER = 5

工作流程

  1. 当爬虫开始运行时,RedisSpiderSmartClosedExtensions 被初始化,并连接到相应的Scrapy信号。
  2. 在爬虫运行期间,每当它空闲时(没有新的请求),spider_idle 方法会被触发。
  3. 如果连续空闲次数超过 IDLE_NUMBER 设置的阈值,并且Redis中没有新的请求,那么这个扩展会指示Scrapy关闭爬虫。
  4. 这种方法使得爬虫在完成其任务后不会无限期地等待,从而有效管理资源。

Scrapyd

Scrapyd是一个用于部署和运行Scrapy爬虫的应用,它提供了一个JSON API,通过该API可以实现爬虫的部署、启动、停止等操作。

安装 Scrapyd 和 Scrapyd-client

  1. 安装 Scrapyd

    使用命令 pip install scrapyd 来安装 Scrapyd。

    验证: 输入 scrapyd 命令。如果能够访问 http://localhost:6800/ 页面,表示安装成功。

  2. 安装 Scrapyd-client

    使用命令 pip install scrapyd-client 来安装 Scrapyd-client,这是一个用于与Scrapyd交互的命令行工具。

    验证: 在Scrapy项目目录下输入 scrapyd-deploy,如果出现提示 Unknown target: default,表示安装成功。

配置 Scrapy.cfg

scrapy.cfg 文件中,配置部署信息。例如:

1
2
3
4
5
[deploy:douban]  # 部署名称,可自定义
url = http://localhost:6800/ # Scrapyd服务器地址
project = db250 # 项目名称
username = xxx # 服务器访问用户名(可选)
password = xxx # 服务器访问密码(可选)

验证: 使用命令 scrapyd-deploy -l 可以查看设置的部署名称和URL。

执行打包命令

在包含 scrapy.cfg 的目录下执行部署命令:

scrapyd-deploy [部署名称] -p [项目名称]

例如:

scrapyd-deploy douban -p db250

成功部署后,终端会显示类似如下信息:

1
2
3
Deploying to project "db250" in http://localhost:6800/addversion.json
Server response (200):
{"node_name": "YNRBYA8RP4AT92A", "status": "ok", "project": "db250", "version": "1595508145", "spiders": 1}

启动爬虫

使用 curl 命令启动爬虫:

1
curl http://localhost:6800/schedule.json -d project=[项目名称] -d spider=[爬虫名称]

成功启动后,会返回包含 jobid 的响应,如:

1
{"node_name": "YNRBYA8RP4AT92A", "status": "ok", "jobid": "98785578cce211eab46598fa9b72ce54"}

关闭爬虫

使用 curl 命令关闭爬虫:

1
curl http://localhost:6800/cancel.json -d project=[项目名称] -d job=[jobid]

列出项目

列出Scrapyd服务器上的所有项目:

1
curl http://localhost:6800/listprojects.json

删除项目

从Scrapyd服务器上删除指定项目:

1
curl http://localhost:6800/delproject.json -d project=[项目名称]
  • 标题: 分布式爬取
  • 作者: Yiuhang Chan
  • 创建于 : 2021-12-18 08:24:32
  • 更新于 : 2024-02-28 18:50:38
  • 链接: https://www.yiuhangblog.com/2021/12/18/20211218分布式爬取/
  • 版权声明: 本文章采用 CC BY-NC-SA 4.0 进行许可。
评论