爬虫合规性
1. 遵守robots.txt规则
2. 合理设置请求延迟
3. 尊重网站服务条款
4. 不要爬取个人信息
5. 注意数据版权问题
6. 使用User-Agent标识自己
7. 避免对服务器造成压力
数据采集实战
网络爬虫是自动化获取互联网数据的程序,是量化交易数据采集的重要工具。 本节将系统介绍网络爬虫的基础知识和实现方法。
import requests
# GET请求 - 获取数据
response = requests.get('https://example.com/api/data')
print(response.status_code) # 状态码
print(response.text) # 响应内容
print(response.json()) # JSON响应
# POST请求 - 提交数据
data = {'symbol': 'AAPL', 'period': '1d'}
response = requests.post('https://example.com/api/quotes', data=data)
# PUT请求 - 更新数据
response = requests.put('https://example.com/api/quotes/1', data=data)
# DELETE请求 - 删除数据
response = requests.delete('https://example.com/api/quotes/1')
# 设置请求头
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Accept': 'application/json',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Referer': 'https://example.com'
}
# URL参数
params = {
'symbol': 'AAPL',
'start': '2024-01-01',
'end': '2024-12-31'
}
# 发送带参数的GET请求
response = requests.get(
'https://example.com/api/stocks',
headers=headers,
params=params
)
# Cookie处理
cookies = {'session_id': 'abc123'}
response = requests.get(url, cookies=cookies)
# 状态码检查
if response.status_code == 200:
print("请求成功")
elif response.status_code == 404:
print("资源未找到")
elif response.status_code == 500:
print("服务器错误")
# 响应头
print(response.headers['Content-Type'])
print(response.headers['Content-Length'])
# 编码处理
response.encoding = 'utf-8'
# 二进制数据
content = response.content
with open('data.json', 'wb') as f:
f.write(content)
from bs4 import BeautifulSoup
import requests
# 获取网页内容
url = 'https://example.com/stock/AAPL'
response = requests.get(url)
# 创建BeautifulSoup对象
soup = BeautifulSoup(response.text, 'html.parser')
# 查找元素
# 通过ID查找
element = soup.find(id='price')
# 通过class查找
price = soup.find(class_='current-price')
prices = soup.find_all(class_='price')
# 通过标签查找
title = soup.find('h1')
all_links = soup.find_all('a')
# 组合查找
element = soup.find('div', class_='quote-info')
# CSS选择器
price = soup.select_one('.current-price')
all_prices = soup.select('.price')
# 提取属性
link = soup.find('a')
href = link.get('href')
text = link.text.strip()
# 提取股票信息
def extract_stock_info(url):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
stock_info = {
'symbol': soup.select_one('.symbol').text.strip(),
'price': float(soup.select_one('.price').text.strip('$')),
'change': float(soup.select_one('.change').text.strip()),
'volume': int(soup.select_one('.volume').text.replace(',', '')),
'timestamp': soup.select_one('.timestamp')['data-time']
}
return stock_info
# 提取表格数据
def extract_table_data(url, table_class='data-table'):
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
table = soup.find('table', class_=table_class)
rows = table.find_all('tr')
data = []
headers = [th.text.strip() for th in rows[0].find_all('th')]
for row in rows[1:]:
cells = row.find_all('td')
row_data = {headers[i]: cells[i].text.strip()
for i in range(len(cells))}
data.append(row_data)
return data
import requests
from bs4 import BeautifulSoup
import time
import random
class BasicCrawler:
def __init__(self, base_url, delay_range=(1, 3)):
self.base_url = base_url
self.delay_range = delay_range
self.session = requests.Session()
def get_page(self, url):
"""获取页面内容"""
try:
headers = {
'User-Agent': self._get_random_user_agent()
}
response = self.session.get(url, headers=headers)
response.raise_for_status()
return response.text
except Exception as e:
print(f"获取页面失败: {e}")
return None
def parse_page(self, html):
"""解析页面内容"""
soup = BeautifulSoup(html, 'html.parser')
# 实现解析逻辑
return soup
def save_data(self, data, filename):
"""保存数据"""
import json
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _get_random_user_agent(self):
"""随机User-Agent"""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)',
'Mozilla/5.0 (X11; Linux x86_64)'
]
return random.choice(user_agents)
def crawl(self, urls):
"""爬取多个URL"""
results = []
for url in urls:
print(f"正在爬取: {url}")
html = self.get_page(url)
if html:
data = self.parse_page(html)
results.append(data)
# 随机延迟
time.sleep(random.uniform(*self.delay_range))
return results
class IncrementalCrawler(BasicCrawler):
def __init__(self, base_url, db_path='crawled.db'):
super().__init__(base_url)
self.db_path = db_path
self._init_database()
def _init_database(self):
"""初始化数据库"""
import sqlite3
self.conn = sqlite3.connect(self.db_path)
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS crawled_urls (
url TEXT PRIMARY KEY,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def is_crawled(self, url):
"""检查URL是否已爬取"""
import sqlite3
cursor = self.conn.cursor()
cursor.execute('SELECT 1 FROM crawled_urls WHERE url = ?', (url,))
return cursor.fetchone() is not None
def mark_crawled(self, url):
"""标记URL为已爬取"""
import sqlite3
cursor = self.conn.cursor()
cursor.execute('INSERT INTO crawled_urls (url) VALUES (?)', (url,))
self.conn.commit()
def crawl(self, urls):
"""增量爬取"""
results = []
for url in urls:
if self.is_crawled(url):
print(f"已爬取,跳过: {url}")
continue
html = self.get_page(url)
if html:
data = self.parse_page(html)
results.append(data)
self.mark_crawled(url)
time.sleep(random.uniform(*self.delay_range))
return results
def fetch_stock_price(symbol):
"""获取股票价格"""
url = f"https://finance.example.com/quote/{symbol}"
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'
}
try:
response = requests.get(url, headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')
price = soup.find('fin-streamer', {'data-field': 'regularMarketPrice'})
change = soup.find('fin-streamer', {'data-field': 'regularMarketChangePercent'})
return {
'symbol': symbol,
'price': float(price.text) if price else None,
'change_percent': float(change.text.strip('%')) if change else None,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
print(f"获取{symbol}价格失败: {e}")
return None
def fetch_multiple_stocks(symbols):
"""批量获取股票数据"""
results = []
for symbol in symbols:
data = fetch_stock_price(symbol)
if data:
results.append(data)
# 礼貌延迟
time.sleep(1)
return results
# 保存到CSV
import pandas as pd
df = pd.DataFrame(fetch_multiple_stocks(['AAPL', 'MSFT', 'GOOG']))
df.to_csv('stock_prices.csv', index=False)
1. 遵守robots.txt规则
2. 合理设置请求延迟
3. 尊重网站服务条款
4. 不要爬取个人信息
5. 注意数据版权问题
6. 使用User-Agent标识自己
7. 避免对服务器造成压力
1. 使用Session保持连接
2. 合理设置超时时间
3. 实现错误重试机制
4. 记录爬取日志
5. 使用代理IP池
6. 处理异常情况
7. 定期检查网站结构变化