Python Pandas使用指南详解
目录
-
Pandas为什么是数据分析首选?
- 1.1 核心优势
- 1.2 数据科学工作流中的定位
-
环境安装与数据准备
- 2.1 安装命令
- 2.2 数据准备
-
核心数据结构解析
- 3.1 Series:一维数据容器
- 3.2 DataFrame:二维数据表
-
数据IO全攻略
- 4.1 常用读写方法
- 4.2 实战示例:处理大型CSV
-
数据清洗与预处理
- 5.1 处理缺失值
- 5.2 数据类型转换
- 5.3 处理重复值
- 5.4 处理异常值
- 5.5 字符串处理
-
数据查询与筛选
- 6.1 基于标签/位置的查询
- 6.2 布尔索引
- 6.3 随机抽样与排序
-
数据变形与计算
- 7.1 列操作进阶
- 7.2 行操作进阶
-
分组聚合与透视
- 8.1 分组统计
- 8.2 透视表与交叉表
- 8.3 分组对象操作
- 8.4 分组应用自定义函数
-
时间序列处理
- 9.1 时间类型转换
- 9.2 重采样与频率转换
- 9.3 滑动窗口操作
-
高效性能优化
- 10.1 数据类型优化
- 10.2 并行处理加速
-
实战数据分析
- 11.1 电商销售分析
- 11.2 股票数据分析
- 11.3 用户行为分析
- 11.4 销售漏斗分析
-
避坑指南与FAQ
- 12.1 常见问题解决
- 12.2 高级问题处理
1. Pandas为什么是数据分析首选?
1.1 核心优势
1.2 数据科学工作流中的定位
数据采集 → 数据清洗 → 数据分析 → 可视化
↑ ↑ ↑
Pandas Pandas Pandas
2. 环境安装与数据准备
2.1 安装命令
# 基础安装
pip install pandas
# 完整安装(包含所有依赖)
pip install "pandas[performance, excel, html, parquet]"
2.2 数据准备
import pandas as pd
import numpy as np
# 创建示例数据
data = {
'姓名': ['张三', '李四', '王五', np.nan],
'年龄': [25, 33, pd.NA, 28],
'城市': ['北京', '上海', '广州', '深圳'],
'薪资': [15000, 22000, 18000, 25000]
}
3. 核心数据结构解析
3.1 Series:一维数据容器
# 创建Series
s = pd.Series([10, 20, 30, 40],
index=['a', 'b', 'c', 'd'],
name='示例序列')
# 关键操作
print(s.values) # 值数组 [10 20 30 40]
print(s.index) # 索引 Index(['a','b','c','d'])
print(s.describe()) # 统计摘要
# 向量化运算
s * 2 + 5 # 保持索引对齐
3.2 DataFrame:二维数据表
# 创建DataFrame的5种方式
df = pd.DataFrame(data) # 从字典创建
df_csv = pd.read_csv('data.csv') # 从CSV读取
df_excel = pd.read_excel('data.xlsx') # Excel文件
df_sql = pd.read_sql('SELECT * FROM table', conn) # SQL数据库
df_web = pd.read_html('https://example.com')[0] # 网页表格
# 查看数据
df.head(3) # 前3行
df.tail() # 后5行
df.sample(5) # 随机5行
df.info() # 数据结构信息
df.describe() # 数值列统计
4. 数据IO全攻略
4.1 常用读写方法
| 格式 | 读取方法 | 写入方法 |
|---|---|---|
| CSV | pd.read_csv() | df.to_csv() |
| Excel | pd.read_excel() | df.to_excel() |
| JSON | pd.read_json() | df.to_json() |
| SQL | pd.read_sql() | df.to_sql() |
| Parquet | pd.read_parquet() | df.to_parquet() |
4.2 实战示例:处理大型CSV
# 分块读取
chunk_iter = pd.read_csv('big_data.csv',
chunksize=10000,
encoding='gbk',
parse_dates=['日期'])
for chunk in chunk_iter:
process(chunk) # 自定义处理函数
# 指定列类型优化内存
dtype_dict = {'ID': 'int32', 'Price': 'float32'}
df = pd.read_csv('data.csv', dtype=dtype_dict)
5. 数据清洗与预处理
5.1 处理缺失值
# 识别缺失值
df.isna().sum()
# 处理策略
df.dropna(subset=['年龄']) # 删除缺失行
df.fillna({'城市': '未知'}) # 填充特定列
df['年龄'] = df['年龄'].interpolate() # 插值填充
# 标记特殊缺失值
df.replace(-999, np.nan, inplace=True)
5.2 数据类型转换
# 自动推断类型
df.infer_objects()
# 强制类型转换
df['日期'] = pd.to_datetime(df['日期'],
format='%Y-%m-%d',
errors='coerce')
# 分类数据优化
df['城市'] = df['城市'].astype('category')
5.3 处理重复值
# 标记重复行(基于所有列)
df.duplicated()
# 标记重复行(指定列)
df.duplicated(subset=['姓名', '城市'])
# 删除重复行(保留最后出现的)
df.drop_duplicates(subset=['姓名'],
keep='last',
inplace=True)
# 统计重复值数量
print(f"发现重复行数:{df.duplicated().sum()}")
5.4 处理异常值
# 箱线图识别异常值
import matplotlib.pyplot as plt
df['薪资'].plot(kind='box')
plt.show()
# 标准差法处理异常值
mean = df['薪资'].mean()
std = df['薪资'].std()
upper = mean + 3*std
lower = mean - 3*std
df = df[(df['薪资'] > lower) & (df['薪资'] < upper)]
# 分位数法处理异常值
Q1 = df['薪资'].quantile(0.25)
Q3 = df['薪资'].quantile(0.75)
IQR = Q3 - Q1
df = df[~((df['薪资'] < (Q1 - 1.5*IQR)) | (df['薪资'] > (Q3 + 1.5*IQR))]
5.5 字符串处理
# 大小写转换
df['城市'] = df['城市'].str.upper()
# 字符串分割
df[['姓', '名']] = df['姓名'].str.split(' ', expand=True)
# 正则表达式提取
df['区号'] = df['电话'].str.extract(r'(\d{3})-\d{8}')
# 判断包含关系
df['是否北上广'] = df['城市'].str.contains('北京|上海|广州')
# 常用字符串方法
df['姓名'] = df['姓名'].str.strip() # 去空格
df['地址'] = df['地址'].str.replace('市', '') # 替换字符
df['邮箱域名'] = df['邮箱'].str.split('@').str[1] # 分割提取
6. 数据查询与筛选
6.1 基于标签/位置的查询
# 选择单列
ages = df['年龄']
# 选择多列
subset = df[['姓名', '城市']]
# 使用loc选择行
df.loc[2] # 第三行
df.loc[1:3, '城市'] # 2-4行的城市列
# 使用iloc按位置选择
df.iloc[0, 1] # 第一行第二列
df.iloc[1:4, :2] # 2-4行前两列
6.2 布尔索引
# 基础条件筛选
beijing_high = df[(df['城市'] == '北京') & (df['薪资'] > 20000)]
# 使用query方法
young_workers = df.query('年龄 < 30 & 城市 in ["上海", "广州"]')
# 复杂条件组合
condition = (df['年龄'].between(25, 35)) | (df['城市'].str.startswith('北'))
filtered_df = df[condition]
6.3 随机抽样与排序
# 随机抽样
sample_df = df.sample(n=5,
weights='薪资',
random_state=42)
# 排序数据
sorted_df = df.sort_values(by=['城市', '薪资'],
ascending=[True, False])
# 最大/最小值行
max_salary_row = df.nlargest(1, '薪资')
min_age_row = df.nsmallest(1, '年龄')
7. 数据变形与计算
7.1 列操作进阶
# 使用assign创建新列
df = df.assign(
时薪=lambda x: x['薪资'] / 22 / 8,
年龄分段=pd.cut(df['年龄'],
bins=[20, 30, 40, 50],
labels=['青年', '中年', '中老年'])
)
# 使用transform标准化
df['薪资标准化'] = df.groupby('城市')['薪资'].transform(
lambda x: (x - x.mean()) / x.std()
)
# 使用pipe链式操作
def add_bonus(df, ratio):
return df.assign(总薪资=df['薪资'] * (1 + ratio))
df = df.pipe(add_bonus, ratio=0.2)
7.2 行操作进阶
# 使用apply处理行数据
def calculate_level(row):
if row['年龄'] < 30 and row['薪资'] > 20000:
return 'A'
elif row['年龄'] >= 40 and row['薪资'] < 15000:
return 'C'
return 'B'
df['等级'] = df.apply(calculate_level, axis=1)
# 使用iterrows遍历(慎用!)
for index, row in df.iterrows():
if row['城市'] == '北京':
df.at[index, '补贴'] = 2000
# 使用itertuples高效遍历
for row in df.itertuples():
if row.年龄 > 45:
df.loc[row.Index, '备注'] = '资深员工'
8. 分组聚合与透视
8.1 分组统计
# 基础分组
grouped = df.groupby('城市')
print(grouped['薪资'].agg(['mean', 'max', 'min']))
# 多级分组
df.groupby(['城市', '级别'])['薪资'].mean()
# 自定义聚合
def salary_range(series):
return series.max() - series.min()
df.groupby('城市')['薪资'].agg(salary_range)
8.2 透视表与交叉表
# 透视表
pd.pivot_table(df,
index='城市',
columns='级别',
values='薪资',
aggfunc=['mean', 'count'])
# 交叉表
pd.crosstab(df['城市'], df['级别'],
values=df['薪资'],
aggfunc='mean')
8.3 分组对象操作
# 创建分组对象
grouped = df.groupby('城市')
# 遍历分组
for city, group in grouped:
print(f"城市: {city}")
print(group[['姓名', '薪资']].head(2))
# 获取特定分组
bj_group = grouped.get_group('北京')
# 分组后过滤
high_salary_cities = grouped.filter(lambda x: x['薪资'].mean() > 20000)
8.4 分组应用自定义函数
# 定义分组处理函数
def top_earners(group):
return group.nlargest(3, '薪资')
# 应用函数
top3_per_city = df.groupby('城市').apply(top_earners)
# 分组扩展计算
df['城市平均薪资'] = df.groupby('城市')['薪资'].transform('mean')
9. 时间序列处理
9.1 时间类型转换
# 转换为Datetime类型
df['订单时间'] = pd.to_datetime(df['时间戳'], unit='s')
# 提取时间成分
df['年份'] = df['订单时间'].dt.year
df['季度'] = df['订单时间'].dt.quarter
df['是否周末'] = df['订单时间'].dt.dayofweek > 4
9.2 重采样与频率转换
# 设置时间索引
df_time = df.set_index('订单时间')
# 按周统计销售额
weekly_sales = df_time.resample('W')['销售额'].sum()
# 按小时计算平均值
hourly_avg = df_time.resample('H').mean()
# 向前填充缺失值
df_time = df_time.resample('D').ffill()
9.3 滑动窗口操作
# 计算7天移动平均
df['7D_MA'] = df['收盘价'].rolling(window=7).mean()
# 扩展窗口累计
df['累计销售额'] = df['销售额'].expanding().sum()
# 带偏移的窗口
df['30D_std'] = df['成交量'].rolling('30D').std()
10. 高效性能优化
10.1 数据类型优化
# 查看当前内存使用
df.info(memory_usage='deep')
# 优化数值类型
df['年龄'] = df['年龄'].astype('uint8')
df['薪资'] = pd.to_numeric(df['薪资'], downcast='float')
# 优化字符串类型
df['城市'] = df['城市'].astype('category')
# 内存优化前后对比
原始内存 = df.memory_usage(deep=True).sum()
优化后内存 = df.astype({
'年龄': 'uint8',
'城市': 'category'
}).memory_usage(deep=True).sum()
print(f"内存减少: {原始内存 - 优化后内存:,} 字节")
10.2 并行处理加速
# 使用swifter加速apply
import swifter
df['新列'] = df['大文本列'].swifter.apply(lambda x: len(x.split()))
# 使用modin加速(替换pandas)
import modin.pandas as mpd
mdf = mpd.DataFrame(df)
11. 实战数据分析
11.1 电商销售分析
# 数据加载
sales = pd.read_csv('sales.csv',
parse_dates=['order_date'])
# 月度销售额分析
monthly_sales = sales.resample('M', on='order_date')['amount'].sum()
# 商品销售排行
top_products = sales.groupby('product_id')['quantity'].sum().nlargest(10)
# 用户复购分析
user_orders = sales.groupby('user_id')['order_id'].nunique()
repeat_users = user_orders[user_orders > 1].count()
11.2 股票数据分析
# 计算技术指标
df['MA5'] = df['close'].rolling(5).mean()
df['MA20'] = df['close'].rolling(20).mean()
# 标记金叉/死叉
df['golden_cross'] = (df['MA5'] > df['MA20']) & (df['MA5'].shift(1) <= df['MA20'].shift(1))
df['death_cross'] = (df['MA5'] < df['MA20']) & (df['MA5'].shift(1) >= df['MA20'].shift(1))
# 可视化
df[['close','MA5','MA20']].plot(figsize=(12,6))
11.3 用户行为分析
# 计算用户活跃天数
user_activity = df.groupby('用户ID')['访问日期'].nunique()
# 最近一次访问分析
last_visit = df.groupby('用户ID')['访问时间'].max()
# RFM模型实现
rfm = df.groupby('用户ID').agg({
'访问时间': lambda x: (pd.Timestamp.now() - x.max()).days,
'订单ID': 'nunique',
'消费金额': 'sum'
}).rename(columns={
'访问时间': 'Recency',
'订单ID': 'Frequency',
'消费金额': 'Monetary'
})
11.4 销售漏斗分析
# 定义转化阶段
funnel_steps = ['访问', '加购', '下单', '支付']
# 计算各阶段人数
funnel_data = df.groupby('事件类型')['用户ID'].nunique().reindex(funnel_steps)
# 计算转化率
funnel_data['转化率'] = funnel_data / funnel_data.shift(1)
# 可视化漏斗
import plotly.express as px
fig = px.funnel(funnel_data, y='用户ID', x='转化率')
fig.show()
12. 避坑指南与FAQ
Q:读取CSV文件出现编码错误?
A:尝试指定编码:encoding='gbk' 或 encoding='utf-8-sig'
Q:如何加速Pandas操作?
# 使用高效方法
df['col'] = df['col'].astype('category') # 分类数据
df = df.query('value > 0') # 代替布尔索引
# 使用向量化操作
df['new'] = df['a'] + df['b'] # 代替apply
Q:处理内存不足问题?
# 优化数据类型
dtypes = {
'id': 'int32',
'price': 'float32',
'category': 'category'
}
df = df.astype(dtypes)
# 分块处理
for chunk in pd.read_csv('big.csv', chunksize=1e5):
process(chunk)
Q:遇到SettingWithCopyWarning怎么办?
A:使用.loc明确索引或复制副本:
# 错误方式
df[df.年龄>30]['新列'] = 1 # 触发警告
# 正确方式
df.loc[df.年龄>30, '新列'] = 1
# 或
df = df[df.年龄>30].copy()
Q:如何高效合并多个DataFrame?
# 垂直合并
pd.concat([df1, df2], axis=0)
# 水平合并
pd.merge(df1, df2, on='关键列', how='left')
# 性能优化技巧
pd.merge(df1, df2[['关键列', '必要列']], on='关键列') # 减少合并列数
Q:处理超大文件内存不足?
# 分块处理
chunksize = 10**6 # 每个块100万行
for chunk in pd.read_csv('超大文件.csv', chunksize=chunksize):
process(chunk)
# 使用Dask
import dask.dataframe as dd
ddf = dd.read_csv('超大文件.csv')
result = ddf.groupby('列名').mean().compute()
性能优化秘籍:
- 避免链式赋值:
df = df[df.col > 0]而不是df = df[df.col > 0].copy()- 使用
eval()进行复杂计算:df.eval('new = a + b * c')- 选择合适的数据类型:
category类型可节省90%内存- 利用
pd.read_csv的usecols参数只加载必要列- 对大数据集使用Dask或Modin替代Pandas
学习路线建议:掌握基础操作 → 熟练数据清洗 → 精通分组聚合 → 学习性能优化 → 实战项目应用
作者:劭清
