爬虫全流程实战
# 第 5 章 爬虫全流程实战
# 目录介绍
# 5.1 场景引入
💬 场景:你是公司的 Python 开发。老板说:"帮我看一下市面上 Python 开发的岗位情况——工资、数量、技术栈要求。不能拍脑门,要数据。"
你不能打开浏览器手动翻 100 页招聘网站——太慢、不准确、不可重复。
你需要写一个爬虫:自动抓取 → 清洗 → 分析 → 出报告。
这就是爬虫的典型落地场景——用程序替代人工,批量获取互联网结构化数据。
本章按"手工作坊 → 工业化 → 数据分析"三步走:
| 阶段 | 工具 | 能力 |
|---|---|---|
| ① 手工作坊 | requests + BeautifulSoup | 理解 HTTP 请求 + HTML 解析 |
| ② 工业化 | Scrapy 框架 | 批量爬取 + Pipeline 清洗 |
| ③ 端到端 | 爬虫 + pandas + matplotlib | 数据采集→分析→报告 |
⚠️ 法律警示:爬虫本身不违法,但爬取数据后的使用方式可能违法。请阅读 §1.6 法律红线,确保你的爬虫合法合规。
# 5.2 requests 入门
# 5.2.1 第一个请求
requests 是 Python 最常用的 HTTP 客户端库——先用 pip install requests 安装:
import requests
# 最简单的 GET 请求
response = requests.get("https://httpbin.org/get")
print(response.status_code) # 200(HTTP 状态码——200 = 成功)
print(response.text[:200]) # 响应的前 200 个字符
print(response.json()) # 如果响应是 JSON,直接解析
2
3
4
5
6
7
🔑 response 对象的常用属性:
resp = requests.get("https://httpbin.org/get?name=张三&age=25") # GET 带参数
print(resp.status_code) # 200(状态码)
print(resp.headers) # 响应头(dict 类型)
print(resp.encoding) # 编码(通常自动检测)
print(resp.text) # 响应体字符串
print(resp.json()) # JSON 响应 → dict
print(resp.content) # 响应体 bytes(图片/文件用这个)
print(resp.url) # 最终请求的 URL(处理了重定向)
print(resp.elapsed) # 请求耗时(timedelta 对象)
2
3
4
5
6
7
8
9
10
POST 请求——提交表单或 JSON:
# POST 表单数据
resp = requests.post("https://httpbin.org/post", data={"key": "value"})
# POST JSON 数据
resp = requests.post("https://httpbin.org/post", json={"name": "张三", "age": 25})
# 查看服务器实际收到的数据
print(resp.json()["json"]) # {'name': '张三', 'age': 25}
2
3
4
5
6
7
8
# 5.2.2 Headers 伪装
很多网站会检查请求头,拒绝"看起来像爬虫"的请求——最常见的反爬手段是检查 User-Agent:
# ❌ 默认 User-Agent 暴露你是 Python 爬虫
resp = requests.get("https://www.baidu.com")
print(resp.request.headers["User-Agent"])
# python-requests/2.31.0 ← 网站一看就知道是爬虫!
# ✅ 伪装成浏览器——设置常见的浏览器 UA
headers = {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/125.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
}
resp = requests.get("https://www.baidu.com", headers=headers)
print(f"状态码:{resp.status_code},响应长度:{len(resp.text)} 字符")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
🔑 常用 Headers 速查:
| Header | 作用 | 示例 |
|---|---|---|
User-Agent | 标识浏览器/爬虫身份 | 浏览器 UA 字符串 |
Referer | 告诉服务器"你从哪个页面来" | https://www.google.com/ |
Cookie | 身份状态信息 | sessionid=abc123 |
Authorization | Bearer Token | Bearer eyJhbG... |
Content-Type | POST 请求数据类型 | application/json |
# 5.2.3 Session 与 Cookie
登录场景的完整流程:
# 创建 Session——自动管理 Cookie(和浏览器一样)
session = requests.Session()
# 第 1 步:GET 登录页面——获取 CSRF Token
login_page = session.get("https://httpbin.org/forms/post")
# 真实场景:用 BeautifulSoup 提取 <input name="csrf_token" value="...">
# 第 2 步:POST 登录
login_resp = session.post(
"https://httpbin.org/post",
data={"username": "admin", "password": "123456"},
headers={"X-CSRF-Token": "fake_token"}, # 实际从 login_page 提取
)
print(f"登录结果:{login_resp.status_code}")
# 第 3 步:访问需要登录的页面——Session 自动带上 Cookie
protected = session.get("https://httpbin.org/cookies/set?token=secret123")
print(protected.json())
# 查看 Session 自动管理的 Cookie
print(session.cookies.get_dict()) # {'token': 'secret123'}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
🔑 Session 的核心价值:它自动保存服务器设置的 Cookie,并在后续请求中自动发送——模拟浏览器的"登录态"。
# 5.2.4 超时、重试与异常处理
生产级爬虫必须防御网络波动——无超时的 requests.get() 可能永远卡住:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 方式一:简单超时
try:
resp = requests.get("https://httpbin.org/delay/3", timeout=5)
print("成功")
except requests.Timeout:
print("请求超时!")
except requests.ConnectionError:
print("无法连接!")
except requests.RequestException as e:
print(f"其他网络错误:{e}")
# 方式二:自动重试(生产级)
def create_session_with_retry():
"""创建带自动重试的 Session"""
session = requests.Session()
retry_strategy = Retry(
total=3, # 最多重试 3 次
backoff_factor=0.5, # 重试间隔:0.5s, 1s, 2s
status_forcelist=[429, 500, 502, 503, 504], # 这些状态码触发重试
allowed_methods=["HEAD", "GET", "POST"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# 生产级请求函数
def safe_get(url, headers=None, timeout=10):
"""安全的 GET 请求——带重试和超时"""
session = create_session_with_retry()
try:
resp = session.get(url, headers=headers, timeout=timeout)
resp.raise_for_status() # 4xx/5xx 自动抛异常
return resp
except requests.Timeout:
print(f"[超时] {url}")
except requests.ConnectionError:
print(f"[连接失败] {url}")
except requests.HTTPError as e:
print(f"[HTTP 错误] {e}")
except requests.RequestException as e:
print(f"[请求异常] {e}")
return None
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# 5.3 BeautifulSoup
# 5.3.1 HTML 结构入门
爬虫不需要精通前端——但需要能在 HTML 的嵌套结构中定位要找的元素:
<!-- 一段典型的 HTML -->
<html>
<head><title>示例页面</title></head>
<body>
<div class="article-list">
<article class="post" id="post-1">
<h2 class="title"><a href="/post/1">Python 爬虫入门</a></h2>
<p class="excerpt">这是一篇入门教程...</p>
<span class="date">2025-06-08</span>
<span class="author">张三</span>
</article>
<article class="post" id="post-2">
<h2 class="title"><a href="/post/2">Scrapy 实战</a></h2>
<p class="excerpt">掌握 Scrapy 框架...</p>
<span class="date">2025-06-09</span>
<span class="author">李四</span>
</article>
</div>
</body>
</html>
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
🔑 BeautifulSoup 帮你"解析"这种结构——用 pip install beautifulsoup4 lxml 安装:
from bs4 import BeautifulSoup
html = "...上面的 HTML 字符串..."
soup = BeautifulSoup(html, "lxml") # 用 lxml 解析器(更快)
# 整个文档
print(soup.title.text) # 示例页面
# 标签名定位
print(soup.h2) # <h2 class="title">... 第一个 h2
print(soup.h2.text) # Python 爬虫入门(只用文本)
# 属性定位
print(soup.find("article", id="post-2")) # 第二个 article
2
3
4
5
6
7
8
9
10
11
12
13
14
# 5.3.2 find/find_all 核心 API
soup = BeautifulSoup(html, "lxml")
# ===== find_all():返回所有匹配的列表 =====
# 按标签名
articles = soup.find_all("article")
print(f"找到 {len(articles)} 篇文章") # 2
# 按 class(用 class_ 因为 class 是 Python 关键字)
titles = soup.find_all("h2", class_="title")
for t in titles:
print(t.text.strip()) # Python 爬虫入门 \n Scrapy 实战
# 按 id
post1 = soup.find("article", id="post-1") # find() 只返回第一个
# 按属性
span = soup.find("span", attrs={"class": "author"})
print(span.text) # 张三
# 按文本内容
python_articles = soup.find_all("h2", string=lambda s: s and "Python" in s)
print(python_articles[0].text.strip()) # Python 爬虫入门
# 限制数量
first_3 = soup.find_all("p", limit=3) # 只要前 3 个
# ===== 导航 DOM 树 =====
article = soup.find("article", id="post-1")
print(article.h2.a["href"]) # /post/1(取属性)
print(article.find("span", class_="date").text) # 2025-06-08
# 父子兄弟
print(article.parent.name) # div(父标签名)
print(article.find_next_sibling("article").h2.text) # Scrapy 实战(下一篇)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# 5.3.3 CSS 选择器
用 select() 写 CSS 选择器——前端工程师的习惯写法在 Python 里也能用:
soup = BeautifulSoup(html, "lxml")
# select()——返回匹配的列表
# select_one()——返回第一个匹配
# 按 class:.class-name
titles = soup.select(".title") # 所有 class="title"
for t in titles:
print(t.text.strip())
# 按 id:#id-name
post1 = soup.select_one("#post-1") # 等价于 find(id="post-1")
# 按标签层次:parent > child
links = soup.select("article h2 a") # article 下的 h2 下的 a
# 按属性:[attr=value]
spans = soup.select("span.date") # class 为 date 的 span
authors = soup.select("[class='author']") # 任何带 class="author" 的标签
# 偷懒大招——浏览器里复制 CSS 选择器
# 右键 → 检查 → 右键目标元素 → Copy → Copy selector
# 得到:body > div.article-list > article:nth-child(1) > h2 > a
link = soup.select_one(
"body > div.article-list > article:nth-child(1) > h2 > a"
)
print(link.text) # Python 爬虫入门
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 5.3.4 实战:爬取新闻标题
"""
爬虫实战——用 requests + BeautifulSoup 爬取新闻标题
以新闻网站为例——请替换为合法的目标网址
"""
import requests
from bs4 import BeautifulSoup
import time
from typing import Optional
def fetch_news_articles(url: str, max_pages: int = 3) -> list[dict]:
"""爬取多页新闻——返回文章列表 (标题, 链接, 摘要)。"""
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
})
articles = []
for page in range(1, max_pages + 1):
page_url = f"{url}?page={page}" if "?" not in url else f"{url}&page={page}"
print(f"📥 正在爬取第 {page}/{max_pages} 页:{page_url}")
resp = session.get(page_url, timeout=15)
if resp.status_code != 200:
print(f" ⚠️ 状态码 {resp.status_code},跳过")
continue
soup = BeautifulSoup(resp.text, "lxml")
# 提取文章列表(根据实际 HTML 结构调整选择器)
# 示例:假设每篇在 <article class="post"> 里
items = soup.select("article.post")
if not items:
# 换一种选择器试试——很多网站用 div
items = soup.select("div.news-item, div.article")
for item in items:
try:
title_tag = item.select_one("h2 a, h3 a, a.title")
if not title_tag:
continue
title = title_tag.text.strip()
link = title_tag.get("href", "")
excerpt_tag = item.select_one("p.excerpt, p.summary, div.description")
excerpt = excerpt_tag.text.strip() if excerpt_tag else ""
articles.append({
"title": title,
"link": link,
"excerpt": excerpt[:100], # 截断长摘要
})
except Exception as e:
print(f" ⚠️ 解析文章出错:{e}")
continue
print(f" 本页提取 {len(items)} 条,累计 {len(articles)} 条")
# 礼貌爬取——间隔 1~2 秒,不给服务器压力
if page < max_pages:
time.sleep(1.5)
return articles
def print_report(articles: list[dict]):
"""打印爬取结果摘要"""
print("\n" + "=" * 60)
print(f"{'爬取结果摘要':^60}")
print("=" * 60)
print(f"共 {len(articles)} 篇文章\n")
for i, a in enumerate(articles[:10], 1): # 只展示前 10 条
print(f"{i:2d}. {a['title']}")
if a["excerpt"]:
print(f" {a['excerpt'][:60]}...")
print(f" {a['link']}")
if len(articles) > 10:
print(f"\n(共 {len(articles)} 条,此处仅展示前 10 条)")
print("=" * 60)
# ===== 运行 =====
if __name__ == "__main__":
# ⚠️ 请替换成合法的测试目标——此处用 httpbin 模拟
# 真实使用时改为实际的新闻网站 URL
test_url = "https://httpbin.org/html" # 演示用——不是真正的新闻站
articles = fetch_news_articles(test_url, max_pages=1)
print_report(articles)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# 5.4 Scrapy 框架
当爬虫需要面对数百页、数万条数据、自动存储、断点续爬时,手写 requests + BS4 会变得难以维护。这时候就该上 Scrapy 了。
# 5.4.1 Scrapy 五层架构
┌─────────────────────────────────────────┐
│ Scrapy Engine │ ← 发动机:调度一切
│ ┌──────────┐ ┌──────────┐ ┌───────┐ │
│ │ Scheduler│ │Downloader│ │Spider │ │ ← 调度/下载/解析
│ │ 调度器 │ │ 下载器 │ │ 爬虫 │ │
│ └──────────┘ └──────────┘ └───────┘ │
│ │ │
│ ┌───────┴───────┐ │
│ │ Item Pipeline │ │ ← 数据清洗/去重/存储
│ │ 数据管道 │ │
│ └───────────────┘ │
└─────────────────────────────────────────┘
2
3
4
5
6
7
8
9
10
11
12
五层各司其职:
- Engine:总控——协调所有组件
- Scheduler:待爬 URL 队列(去重)
- Downloader:下载网页(封装了 requests)
- Spider:解析 HTML,提取数据 + 发现新 URL(你写代码的主要地方)
- Item Pipeline:清洗→验证→去重→存储
# 5.4.2 第一个 Scrapy 项目
# 安装
pip install scrapy
# 创建项目
scrapy startproject job_crawler
cd job_crawler
# 目录结构
# job_crawler/
# ├── scrapy.cfg # 部署配置
# └── job_crawler/
# ├── __init__.py
# ├── items.py # 定义数据模型
# ├── middlewares.py # 中间件(IP 代理、UA 轮换等)
# ├── pipelines.py # 数据管道(清洗 + 存储)
# ├── settings.py # 全局配置
# └── spiders/ # 爬虫逻辑(你在这里写代码)
# └── __init__.py
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
📁 第一步:定义数据模型 items.py:
# job_crawler/items.py
import scrapy
class JobItem(scrapy.Item):
"""一条招聘信息的数据模型"""
title = scrapy.Field() # 职位名
company = scrapy.Field() # 公司名
salary = scrapy.Field() # 薪资
location = scrapy.Field() # 工作地点
experience = scrapy.Field() # 经验要求
education = scrapy.Field() # 学历要求
tags = scrapy.Field() # 标签(如"五险一金"、"双休")
url = scrapy.Field() # 详情页链接
2
3
4
5
6
7
8
9
10
11
12
13
14
📁 第二步:写 Spider spiders/jobs.py:
# job_crawler/spiders/jobs.py
import scrapy
from job_crawler.items import JobItem
class JobSpider(scrapy.Spider):
name = "jobs" # 爬虫唯一标识——运行时用这个名字
allowed_domains = ["example-job.com"] # 限制域名——防止爬到站外
start_urls = [
"https://example-job.com/list?keyword=python&page=1"
]
def parse(self, response):
"""解析列表页——提取职位信息 + 下一页链接。"""
# ① 提取当前页的职位
for card in response.css("div.job-card"):
item = JobItem()
item["title"] = card.css("h3.title::text").get("").strip()
item["company"] = card.css("span.company::text").get("").strip()
item["salary"] = card.css("span.salary::text").get("").strip()
item["location"] = card.css("span.location::text").get("").strip()
item["experience"] = card.css("span.experience::text").get("").strip()
item["education"] = card.css("span.education::text").get("").strip()
item["tags"] = card.css("div.tags span::text").getall()
item["url"] = response.urljoin(
card.css("a.job-link::attr(href)").get("")
)
yield item
# ② 找到下一页链接——继续爬
next_page = response.css("a.next::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
🔑 Scrapy 选择器语法:
| 写法 | 含义 |
|---|---|
css("h3.title::text") | 取 h3.title 的文本 |
css("a::attr(href)") | 取 a 标签的 href 属性 |
css("div.tags span::text").getall() | 取所有匹配的文本 → 列表 |
css("span.salary::text").get() | 取第一个匹配的文本 → 字符串 |
response.follow(url, callback) | 请求新 URL,用 callback 解析 |
# 5.4.3 Item Pipeline 数据清洗与存储
📁 第三步:配置 Pipeline pipelines.py:
# job_crawler/pipelines.py
import json
import re
class CleanSalaryPipeline:
"""清洗薪资字段——统一格式。"""
def process_item(self, item, spider):
salary = item.get("salary", "")
# 提取数字:"15K-25K" → (15000, 25000)
match = re.findall(r"(\d+)\s*[kK]", salary)
if len(match) == 2:
item["salary_min"] = int(match[0]) * 1000
item["salary_max"] = int(match[1]) * 1000
item["salary_avg"] = (item["salary_min"] + item["salary_max"]) // 2
elif len(match) == 1:
item["salary_min"] = item["salary_max"] = int(match[0]) * 1000
item["salary_avg"] = item["salary_min"]
else:
item["salary_min"] = item["salary_max"] = item["salary_avg"] = 0
return item
class DeduplicatePipeline:
"""去重——同一 URL 只保留一条。"""
def __init__(self):
self.seen_urls = set()
def process_item(self, item, spider):
if item["url"] in self.seen_urls:
raise scrapy.exceptions.DropItem(f"重复:{item['url']}")
self.seen_urls.add(item["url"])
return item
class JsonWriterPipeline:
"""存储为 JSON 文件。"""
def open_spider(self, spider):
self.file = open("jobs_output.json", "w", encoding="utf-8")
self.file.write("[\n")
self.first = True
def process_item(self, item, spider):
line = json.dumps(dict(item), ensure_ascii=False)
if not self.first:
self.file.write(",\n")
self.file.write(f" {line}")
self.first = False
return item
def close_spider(self, spider):
self.file.write("\n]\n")
self.file.close()
print(f"\n✅ 数据已保存到 jobs_output.json")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
📁 第四步:启用 Pipeline settings.py:
# job_crawler/settings.py 中关键配置
ITEM_PIPELINES = {
"job_crawler.pipelines.CleanSalaryPipeline": 300, # 数字越大优先级越低
"job_crawler.pipelines.DeduplicatePipeline": 400,
"job_crawler.pipelines.JsonWriterPipeline": 500,
}
# 反爬配置
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36"
ROBOTSTXT_OBEY = True # 遵守 robots.txt
DOWNLOAD_DELAY = 1.5 # 下载间隔——礼貌爬取
CONCURRENT_REQUESTS = 4 # 并发数——别太高
AUTOTHROTTLE_ENABLED = True # 自动调速
2
3
4
5
6
7
8
9
10
11
12
13
# 运行爬虫
cd job_crawler
scrapy crawl jobs -o jobs.json # -o 同时导出为 JSON(最简单方式)
# 或输出为 CSV
scrapy crawl jobs -o jobs.csv
2
3
4
5
🔑 Scrapy 三个核心理念:
yield item而不是return——每个页面可能产出多条数据,用生成器逐条交出- Python 回调不是显式循环——
yield response.follow(...)相当于"把这个 URL 加入队列,数据回来后用 callback 处理" - Pipeline 责任链——每个 Pipeline 只做一件事(清洗、去重、存储),按优先级依次执行
# 5.5 综合实战
现在把爬虫 + 数据处理 + 可视化串联成一条端到端管线。
# 5.5.1 爬取模块
"""
模块一:爬取——用 requests + BeautifulSoup(或 Scrapy)
这里用 requests+BS4 演示快速原型
"""
import requests
from bs4 import BeautifulSoup
import time
import json
import re
from typing import Optional
def crawl_job_listings(keyword: str = "python", pages: int = 10) -> list[dict]:
"""
爬取招聘网站数据——返回结构化列表。
⚠️ 注意:本示例使用模拟数据演示流程。
实际使用时请替换为真实网站的 URL 和选择器,
并遵守该网站的 robots.txt 和服务条款。
"""
# 示例:模拟爬取 10 页数据
jobs = []
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36",
})
for page in range(1, pages + 1):
print(f"📥 爬取第 {page}/{pages} 页...")
# 实际 URL 示例(请替换为合法目标):
# url = f"https://example-job.com/list?q={keyword}&page={page}"
# 模拟数据——实际使用时删掉这整段,用 requests.get + BS4 替换
import random
for _ in range(15):
salary_min = random.choice([8, 10, 12, 15, 18, 20, 25, 30, 35, 40])
jobs.append({
"title": random.choice([
"Python 后端开发", "Python 数据分析师", "Python 爬虫工程师",
"Python 测试开发", "Python 全栈工程师", "算法工程师(Python)",
"DevOps 工程师", "Python 架构师",
]),
"company": random.choice([
"某互联网公司", "某科技集团", "某数据服务商",
"某金融科技", "某电商平台", "某 AI 初创公司",
]),
"salary_min": salary_min * 1000,
"salary_max": (salary_min + random.choice([2, 3, 5, 8, 10])) * 1000,
"location": random.choice(["北京", "上海", "深圳", "杭州", "广州", "成都"]),
"experience": random.choice(["应届生", "1-3年", "3-5年", "5-10年", "不限"]),
"education": random.choice(["大专", "本科", "硕士", "不限"]),
"tags": random.sample(
["五险一金", "双休", "年终奖", "弹性工作", "股票期权",
"带薪年假", "定期体检", "餐补", "房补", "团建"],
k=random.randint(2, 5),
),
})
time.sleep(1.0) # 礼貌爬取
print(f"✅ 总计爬取 {len(jobs)} 条招聘信息")
return jobs
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 5.5.2 清洗与分析
"""
模块二:清洗 + 分析——pandas 数据处理
pip install pandas
"""
import pandas as pd
def analyze_jobs(jobs: list[dict]) -> pd.DataFrame:
"""清洗数据 + 计算分析指标——返回 DataFrame。"""
df = pd.DataFrame(jobs)
# 1. 计算平均薪资
df["salary_avg"] = (df["salary_min"] + df["salary_max"]) // 2
# 2. 薪资分档
bins = [0, 10000, 20000, 30000, 50000, 999999]
labels = ["<10K", "10-20K", "20-30K", "30-50K", "50K+"]
df["salary_range"] = pd.cut(df["salary_avg"], bins=bins, labels=labels)
# 3. 城市薪资排名
city_stats = df.groupby("location", observed=False).agg(
平均薪资=("salary_avg", "mean"),
岗位数量=("title", "count"),
最高薪资=("salary_max", "max"),
).sort_values("平均薪资", ascending=False)
print("\n📊 城市薪资排名:")
print(city_stats.to_string())
# 4. 经验与学历交叉分析
exp_edu = df.groupby(["experience", "education"], observed=False).agg(
平均薪资=("salary_avg", "mean"),
岗位数量=("title", "count"),
).round(0)
print("\n📊 经验 × 学历交叉分析:")
print(exp_edu.to_string())
# 5. 热门技术标签
all_tags = df["tags"].explode() # 展开标签列表
tag_counts = all_tags.value_counts().head(10)
print("\n📊 热门标签 TOP 10:")
print(tag_counts.to_string())
return df
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 5.5.3 可视化与报告
"""
模块三:可视化 + 生成报告
pip install matplotlib
"""
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use("Agg") # 非交互式后端——生成图片
plt.rcParams["font.sans-serif"] = ["Arial Unicode MS", "SimHei"] # 中文字体
plt.rcParams["axes.unicode_minus"] = False
def visualize(df: pd.DataFrame, keyword: str = "Python"):
"""生成 4 张分析图表并保存。"""
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
fig.suptitle(f"📊 {keyword} 岗位市场分析报告", fontsize=16, fontweight="bold")
# 图 1:薪资分布直方图
ax1 = axes[0, 0]
ax1.hist(df["salary_avg"] / 1000, bins=20, color="#4CAF50", edgecolor="white")
ax1.set_title("薪资分布(月薪/千元)")
ax1.set_xlabel("薪资(K)")
ax1.set_ylabel("岗位数量")
ax1.axvline(df["salary_avg"].median() / 1000, color="red",
linestyle="--", label=f"中位数 {df['salary_avg'].median()/1000:.1f}K")
ax1.legend()
# 图 2:薪资按经验要求
ax2 = axes[0, 1]
exp_order = ["应届生", "1-3年", "3-5年", "5-10年", "不限"]
exp_data = df.groupby("experience")["salary_avg"].mean()
exp_data = exp_data.reindex([e for e in exp_order if e in exp_data.index])
ax2.bar(exp_data.index, exp_data.values / 1000, color="#2196F3")
ax2.set_title("不同经验要求的平均薪资")
ax2.set_ylabel("平均薪资(K)")
for i, v in enumerate(exp_data.values):
ax2.text(i, v / 1000 + 0.5, f"{v/1000:.1f}K", ha="center")
# 图 3:城市岗位分布
ax3 = axes[1, 0]
city_count = df["location"].value_counts().head(8)
ax3.pie(city_count.values, labels=city_count.index, autopct="%1.1f%%",
startangle=90, colors=plt.cm.Paired.colors)
ax3.set_title("岗位城市分布")
# 图 4:薪资按学历
ax4 = axes[1, 1]
edu_order = ["大专", "本科", "硕士", "不限"]
edu_data = df.groupby("education")["salary_avg"].mean()
edu_data = edu_data.reindex([e for e in edu_order if e in edu_data.index])
ax4.bar(edu_data.index, edu_data.values / 1000, color="#FF9800")
ax4.set_title("不同学历要求的平均薪资")
ax4.set_ylabel("平均薪资(K)")
plt.tight_layout()
plt.savefig("job_analysis_report.png", dpi=150, bbox_inches="tight")
print("\n✅ 报告已保存:job_analysis_report.png")
def generate_text_report(df: pd.DataFrame, keyword: str = "Python") -> str:
"""生成文字版分析报告。"""
report = []
report.append("=" * 55)
report.append(f"{keyword} 岗位市场分析报告".center(55))
report.append("=" * 55)
report.append(f"\n数据量:{len(df)} 条招聘信息\n")
# 1. 整体
report.append("一、薪资概况")
report.append(f" 平均薪资:¥{df['salary_avg'].mean():,.0f}")
report.append(f" 中位数薪资:¥{df['salary_avg'].median():,.0f}")
report.append(f" 最低/最高:¥{df['salary_min'].min():,} ~ ¥{df['salary_max'].max():,}")
# 2. TOP 薪资城市
report.append("\n二、高薪城市 TOP 5")
city_avg = df.groupby("location")["salary_avg"].mean().sort_values(ascending=False).head(5)
for city, salary in city_avg.items():
report.append(f" {city}:¥{salary:,.0f}")
# 3. 热门标签
report.append("\n三、热门福利标签 TOP 5")
tag_count = df["tags"].explode().value_counts().head(5)
for tag, count in tag_count.items():
report.append(f" {tag}:{count} 次")
report.append(f"\n{'='*55}")
report.append("报告生成完毕")
return "\n".join(report)
# ===== 一键运行 =====
if __name__ == "__main__":
KEYWORD = "Python"
PAGES = 10
print(f"\\n{'🕷️ 开始爬取 ':~^55}")
jobs = crawl_job_listings(KEYWORD, pages=PAGES)
print(f"\\n{'📊 数据分析 ':~^55}")
df = analyze_jobs(jobs)
print(f"\\n{'📈 可视化 ':~^55}")
visualize(df, KEYWORD)
report = generate_text_report(df, KEYWORD)
print("\n" + report)
# 保存原始数据
with open("jobs_raw.json", "w", encoding="utf-8") as f:
json.dump(jobs, f, ensure_ascii=False, indent=2)
print("\n✅ 原始数据已保存:jobs_raw.json")
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
运行输出示例:
~~~~~~~~~~~~~~~~~~~~~~~~🕷️ 开始爬取 ~~~~~~~~~~~~~~~~~~~~~~~~~
📥 爬取第 1/10 页...
📥 爬取第 2/10 页...
...(略)
✅ 总计爬取 150 条招聘信息
~~~~~~~~~~~~~~~~~~~~~~~~📊 数据分析 ~~~~~~~~~~~~~~~~~~~~~~~~~
📊 城市薪资排名:
平均薪资 岗位数量 最高薪资
location
上海 22500.0 28 35000
北京 21800.0 25 32000
深圳 21500.0 24 33000
杭州 20200.0 22 31000
成都 18500.0 20 28000
广州 18000.0 18 27000
📊 经验 × 学历交叉分析:
平均薪资 岗位数量
experience education
1-3年 本科 15800.0 12
硕士 22000.0 5
3-5年 本科 22500.0 15
硕士 28500.0 8
不限 25000.0 3
📊 热门标签 TOP 10:
五险一金 145
年终奖 120
双休 98
弹性工作 87
股票期权 75
...
~~~~~~~~~~~~~~~~~~~~~~~~📈 可视化 ~~~~~~~~~~~~~~~~~~~~~~~~~
✅ 报告已保存:job_analysis_report.png
=======================================================
Python 岗位市场分析报告
=======================================================
数据量:150 条招聘信息
一、薪资概况
平均薪资:¥21,350
中位数薪资:¥20,000
最低/最高:¥8,000 ~ ¥50,000
二、高薪城市 TOP 5
上海:¥22,500
北京:¥21,800
深圳:¥21,500
杭州:¥20,200
成都:¥18,500
三、热门福利标签 TOP 5
五险一金:145 次
年终奖:120 次
双休:98 次
弹性工作:87 次
股票期权:75 次
=======================================================
报告生成完毕
✅ 原始数据已保存:jobs_raw.json
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# 5.6 法律红线
爬虫≠违法,但滥用=违法。你需要明确五条红线:
| # | 红线 | 说明 |
|---|---|---|
| 1 | 遵守 robots.txt | https://example.com/robots.txt 指定了哪些路径不允许爬。Scrapy 默认遵守。 |
| 2 | 不爬个人信息 | 姓名、电话、身份证号——即使网页公开,批量采集也违法(《个人信息保护法》) |
| 3 | 不爬付费/版权内容 | 付费视频、付费文章、数据库的全部内容——侵犯著作权 |
| 4 | 控制频率 | 每秒几百次请求 = DDoS 攻击——可构成"破坏计算机信息系统罪" |
| 5 | 不绕开反爬措施 | 破解验证码、伪造 Token 绕开认证——可构成"非法获取计算机信息系统数据罪" |
🔑 合法爬虫 Checklist:
# ✅ 合法爬虫的规范做法
# 1. 检查 robots.txt
import urllib.robotparser
rp = urllib.robotparser.RobotFileParser()
rp.set_url("https://example.com/robots.txt")
rp.read()
if not rp.can_fetch("*", "/target_page"):
print("robots.txt 禁止爬取——尊重规则")
exit()
# 2. 添加 User-Agent 标识——让网站知道你是谁
headers = {
"User-Agent": "MyResearchBot/1.0 (contact@example.com)"
}
# 3. 控制频率——每秒不要超过 2~3 次
import time
time.sleep(0.5) # 至少 0.5 秒的间隔
# 4. 只爬公开数据 + 不爬个人信息
# 5. 爬取结果仅用于分析——不公开原始数据
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 5.7 新手陷阱
| # | 陷阱 | 说明 |
|---|---|---|
| 1 | 没设置超时 | requests.get(url) 无 timeout 参数——网络不通时永远卡住。永远加 timeout=10 |
| 2 | 编码乱码 | 响应编码检测失败时中文变乱码——resp.encoding = "utf-8" 或 resp.apparent_encoding |
| 3 | 选择器写死 | soup.find("div", class_="item") 但网站改版 class 变成了 card——用更宽松的选择器 |
| 4 | IP 被封 | 短时间大量请求——加 time.sleep() + User-Agent 轮换 + 代理池 |
| 5 | 没处理 429 响应 | 429 Too Many Requests——服务器让你慢点。尊重 Retry-After 头 |
陷阱 1 详解——超时是救命机制:
# ❌ 无超时——网络波动时永远卡住
resp = requests.get("https://slow-server.com") # 可能永远不返回
# ✅ 生产级封装(见 §1.2.4)——连接超时 5s,读取超时 30s
resp = requests.get("https://slow-server.com", timeout=(5, 30))
2
3
4
5
# 5.8 综合思考题
requests vs Scrapy 的边界:多少页 / 多少条数据时该从 requests 切换到 Scrapy?除了数据量,还有哪些因素影响选择?
动态渲染页面怎么爬?
requests只能拿到 HTML 源码——如果数据是用 JavaScript 动态加载的(React/Vue 渲染),requests.get()拿到的是一堆空<div>。你有哪些解决方案?(提示:Selenium、Playwright、抓包分析 API)反爬的攻防:网站有验证码、IP 频率限制、请求签名——这些反爬手段各对应什么绕过方案?绕过反爬措施的法律风险是什么?
增量爬取:第 1 次爬了 1000 条数据,第 2 次你只想爬新增的那部分——如何设计增量爬取策略?用"时间戳"还是"ID 范围"?各自的适用场景是什么?
分布式爬虫:如果需要每天爬取 100 万个页面——单一机器远远不够。你需要哪些组件来构建分布式爬虫系统?(提示:消息队列、去重中心、下载节点、存储集群)