MovieLens/matrix_factorization.py
2025-05-05 04:46:22 +08:00

709 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MovieLens评分预测系统
===================
使用矩阵分解方法SVD预测MovieLens数据集中缺失的评分数据。
数据集特征:
- 1,000,209条评分
- 6,040位用户
- 3,952部电影
- 评分范围1-5分
"""
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.sparse.linalg import svds
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import time
import matplotlib
# 设置中文字体支持
try:
# 首先尝试设置微软雅黑(Windows)或Heiti SC(macOS)
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'Heiti SC', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 验证字体是否支持中文
matplotlib.font_manager.findfont('SimHei')
use_chinese = True
except:
print("警告: 系统中没有找到支持中文的字体,将使用英文标签")
use_chinese = False
# 设置随机种子确保结果可复现
np.random.seed(42)
class MovieLensMatrixFactorization:
"""MovieLens数据集矩阵分解模型实现"""
def __init__(self, data_path='./dataset', output_path='./result'):
"""
初始化矩阵分解模型
参数:
data_path (str): 数据文件所在路径
output_path (str): 输出结果保存路径
"""
self.data_path = data_path
self.output_path = output_path
self.ratings_df = None
self.users_df = None
self.movies_df = None
self.user_movie_matrix = None
self.user_movie_matrix_mean = None
self.user_movie_predictions = None
self.filled_ratings_matrix = None
self.use_chinese = use_chinese
# 创建输出目录(如果不存在)
if not os.path.exists(output_path):
os.makedirs(output_path)
def load_data(self):
"""
加载MovieLens数据集
返回:
self: 返回实例本身以支持链式调用
"""
print("\n加载MovieLens数据集...")
# 加载评分数据
ratings_file = os.path.join(self.data_path, 'ratings.dat')
self.ratings_df = pd.read_csv(
ratings_file,
sep='::',
engine='python',
names=['userId', 'movieId', 'rating', 'timestamp']
)
# 加载电影数据
movies_file = os.path.join(self.data_path, 'movies.dat')
self.movies_df = pd.read_csv(
movies_file,
sep='::',
engine='python',
names=['movieId', 'title', 'genres'],
encoding='ISO-8859-1'
)
# 加载用户数据
users_file = os.path.join(self.data_path, 'users.dat')
self.users_df = pd.read_csv(
users_file,
sep='::',
engine='python',
names=['userId', 'gender', 'age', 'occupation', 'zipcode']
)
# 打印数据集基本信息
print(f"加载完成: {len(self.ratings_df)} 条评分, {len(self.movies_df)} 部电影, {len(self.users_df)} 位用户")
print(f"评分范围: {self.ratings_df['rating'].min()}-{self.ratings_df['rating'].max()}")
# 计算评分矩阵稀疏度
total_possible_ratings = len(self.users_df) * len(self.movies_df)
sparsity = 100 * (1 - len(self.ratings_df) / total_possible_ratings)
print(f"评分矩阵稀疏度: {sparsity:.2f}%")
return self
def preprocess_data(self):
"""
预处理数据:创建用户-电影评分矩阵
返回:
self: 返回实例本身以支持链式调用
"""
print("\n创建用户-电影评分矩阵...")
# 创建评分透视表
self.user_movie_matrix = self.ratings_df.pivot(
index='userId',
columns='movieId',
values='rating'
).fillna(0)
print(f"评分矩阵形状: {self.user_movie_matrix.shape}")
return self
def train_test_split_matrix(self, test_size=0.2):
"""
将评分数据分割为训练集和测试集
参数:
test_size (float): 测试集比例
返回:
tuple: (训练数据, 测试数据) 元组
"""
print(f"\n将数据分为训练集({100 - test_size * 100:.0f}%)和测试集({test_size * 100:.0f}%)...")
# 将非零评分转换为列表格式
rating_records = []
for i in range(len(self.user_movie_matrix.index)):
for j in range(len(self.user_movie_matrix.columns)):
if self.user_movie_matrix.iloc[i, j] > 0:
user_id = self.user_movie_matrix.index[i]
movie_id = self.user_movie_matrix.columns[j]
rating = self.user_movie_matrix.iloc[i, j]
rating_records.append((user_id, movie_id, rating))
# 转换为DataFrame
ratings_list_df = pd.DataFrame(rating_records, columns=['userId', 'movieId', 'rating'])
# 分割训练集和测试集
train_data, test_data = train_test_split(
ratings_list_df,
test_size=test_size,
random_state=42
)
print(f"训练集: {len(train_data)} 条评分, 测试集: {len(test_data)} 条评分")
return train_data, test_data
def create_training_matrix(self, train_data):
"""
根据训练数据创建用户-电影评分矩阵
参数:
train_data (DataFrame): 训练数据
返回:
ndarray: 训练评分矩阵
"""
# 创建训练矩阵
train_matrix = self.user_movie_matrix.copy().values
# 将矩阵中所有数据清零(后面会填入训练数据)
train_matrix[:] = 0
# 填入训练数据
for _, row in train_data.iterrows():
user_idx = self.user_movie_matrix.index.get_loc(row['userId'])
movie_idx = self.user_movie_matrix.columns.get_loc(row['movieId'])
train_matrix[user_idx, movie_idx] = row['rating']
return train_matrix
def matrix_factorization_svd(self, train_matrix, n_factors=100):
"""
使用SVD进行矩阵分解
参数:
train_matrix (ndarray): 训练评分矩阵
n_factors (int): 潜在因子数量
返回:
ndarray: 预测评分矩阵
"""
print(f"\n使用SVD进行矩阵分解 (潜在因子数量: {n_factors})...")
# 计算用户评分均值
user_ratings_mean = np.mean(train_matrix, axis=1).reshape(-1, 1)
# 去中心化
ratings_centered = train_matrix - user_ratings_mean
# 使用SVD分解矩阵
U, sigma, Vt = svds(ratings_centered, k=n_factors)
# 重构sigma为对角矩阵
sigma_diag = np.diag(sigma)
# 预测完整矩阵
all_user_predicted_ratings = user_ratings_mean + np.dot(np.dot(U, sigma_diag), Vt)
# 保存均值用于后续预测
self.user_movie_matrix_mean = user_ratings_mean
# 将预测结果转换为DataFrame
self.user_movie_predictions = pd.DataFrame(
all_user_predicted_ratings,
index=self.user_movie_matrix.index,
columns=self.user_movie_matrix.columns
)
return all_user_predicted_ratings
def evaluate_model(self, test_data, predicted_matrix):
"""
评估模型性能
参数:
test_data (DataFrame): 测试数据
predicted_matrix (ndarray): 预测评分矩阵
返回:
float: RMSE值
"""
print("\n评估模型性能...")
# 收集实际评分和预测评分
actual_ratings = []
predicted_ratings = []
for _, row in test_data.iterrows():
user_idx = self.user_movie_matrix.index.get_loc(row['userId'])
movie_idx = self.user_movie_matrix.columns.get_loc(row['movieId'])
actual = row['rating']
predicted = predicted_matrix[user_idx, movie_idx]
actual_ratings.append(actual)
predicted_ratings.append(predicted)
# 计算RMSE
rmse = np.sqrt(mean_squared_error(actual_ratings, predicted_ratings))
print(f"测试集RMSE: {rmse:.4f}")
# 绘制实际评分与预测评分散点图
plt.figure(figsize=(10, 6))
plt.scatter(actual_ratings, predicted_ratings, alpha=0.3)
plt.plot([min(actual_ratings), max(actual_ratings)],
[min(actual_ratings), max(actual_ratings)],
'r--', lw=2)
# 设置标签(英文或中文)
if self.use_chinese:
plt.xlabel('实际评分')
plt.ylabel('预测评分')
plt.title('实际评分 vs 预测评分')
else:
plt.xlabel('Actual Ratings')
plt.ylabel('Predicted Ratings')
plt.title('Actual vs Predicted Ratings')
plt.tight_layout()
plt.savefig(os.path.join(self.output_path, 'rating_prediction_scatter.png'))
plt.close()
# 计算预测评分的分布
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(actual_ratings, bins=20, alpha=0.7, label='Actual' if not self.use_chinese else '实际评分')
plt.hist(predicted_ratings, bins=20, alpha=0.7, label='Predicted' if not self.use_chinese else '预测评分')
if self.use_chinese:
plt.xlabel('评分')
plt.ylabel('频率')
plt.title('评分分布对比')
else:
plt.xlabel('Rating')
plt.ylabel('Frequency')
plt.title('Rating Distribution Comparison')
plt.legend()
plt.subplot(1, 2, 2)
prediction_errors = np.array(actual_ratings) - np.array(predicted_ratings)
plt.hist(prediction_errors, bins=20)
if self.use_chinese:
plt.xlabel('预测误差(实际-预测)')
plt.ylabel('频率')
plt.title('预测误差分布')
else:
plt.xlabel('Prediction Error (Actual-Predicted)')
plt.ylabel('Frequency')
plt.title('Prediction Error Distribution')
plt.tight_layout()
plt.savefig(os.path.join(self.output_path, 'prediction_analysis.png'))
plt.close()
return rmse
def fill_missing_ratings(self):
"""
填补缺失评分
返回:
DataFrame: 填补后的完整评分矩阵
"""
print("\n填补缺失评分...")
# 获取原始评分矩阵和预测评分矩阵
original_ratings = self.user_movie_matrix.values
predicted_ratings = self.user_movie_predictions.values
# 创建填补后的矩阵(保持原有评分,填补缺失评分)
filled_ratings = original_ratings.copy()
# 将原始值为0的位置即缺失值填充为预测值
mask = (filled_ratings == 0)
filled_ratings[mask] = predicted_ratings[mask]
# 将超出范围的预测值裁剪到有效范围内
filled_ratings = np.clip(filled_ratings, 1, 5)
# 转换为DataFrame
self.filled_ratings_matrix = pd.DataFrame(
filled_ratings,
index=self.user_movie_matrix.index,
columns=self.user_movie_matrix.columns
)
missing_count = np.sum(original_ratings == 0)
total_cells = original_ratings.size
print(f"总评分单元格数: {total_cells}")
print(f"原始缺失评分数: {missing_count} ({missing_count / total_cells * 100:.2f}%)")
print(f"已预测填补: {missing_count} 个评分")
return self.filled_ratings_matrix
def save_results(self):
"""
保存结果
返回:
self: 返回实例本身以支持链式调用
"""
print("\n保存结果...")
# 保存填补后的完整评分矩阵
ratings_file = os.path.join(self.output_path, 'filled_ratings_matrix.csv')
self.filled_ratings_matrix.to_csv(ratings_file)
# 将评分矩阵转换为评分列表格式(更易处理)
ratings_list = []
for user_id in self.filled_ratings_matrix.index:
for movie_id in self.filled_ratings_matrix.columns:
rating = self.filled_ratings_matrix.loc[user_id, movie_id]
if rating > 0: # 排除可能的0值评分
ratings_list.append({
'userId': user_id,
'movieId': movie_id,
'rating': rating,
'isOriginal': self.user_movie_matrix.loc[user_id, movie_id] > 0
})
# 保存为CSV
ratings_df = pd.DataFrame(ratings_list)
ratings_list_file = os.path.join(self.output_path, 'filled_ratings_list.csv')
ratings_df.to_csv(ratings_list_file, index=False)
print(f"已保存填补后的评分矩阵: {ratings_file}")
print(f"已保存填补后的评分列表: {ratings_list_file}")
return self
def compare_factor_performance(self, train_matrix, test_data, factors_list=[50, 100, 150, 200]):
"""
比较不同潜在因子数量的性能
参数:
train_matrix (ndarray): 训练评分矩阵
test_data (DataFrame): 测试数据
factors_list (list): 要测试的潜在因子数量列表
返回:
dict: 每个因子数量对应的RMSE
"""
print("\n比较不同潜在因子数量的性能...")
results = {}
for n_factors in factors_list:
print(f"测试潜在因子数量: {n_factors}")
# 训练模型
start_time = time.time()
predicted_matrix = self.matrix_factorization_svd(train_matrix, n_factors=n_factors)
training_time = time.time() - start_time
# 评估模型
rmse = self.evaluate_model(test_data, predicted_matrix)
results[n_factors] = {
'rmse': rmse,
'training_time': training_time
}
print(f"训练时间: {training_time:.2f}")
# 可视化结果
factors = list(results.keys())
rmse_values = [results[f]['rmse'] for f in factors]
training_times = [results[f]['training_time'] for f in factors]
fig, ax1 = plt.figure(figsize=(10, 6)), plt.gca()
color1 = 'tab:blue'
ax1.set_xlabel('Latent Factors' if not self.use_chinese else '潜在因子数量')
ax1.set_ylabel('RMSE', color=color1)
ax1.plot(factors, rmse_values, 'o-', color=color1, label='RMSE')
ax1.tick_params(axis='y', labelcolor=color1)
ax2 = ax1.twinx()
color2 = 'tab:red'
ax2.set_ylabel('Training Time (s)' if not self.use_chinese else '训练时间(秒)', color=color2)
ax2.plot(factors, training_times, 's-', color=color2,
label='Training Time' if not self.use_chinese else '训练时间')
ax2.tick_params(axis='y', labelcolor=color2)
if self.use_chinese:
plt.title('潜在因子数量对性能的影响')
else:
plt.title('Impact of Latent Factors on Performance')
fig.tight_layout()
plt.savefig(os.path.join(self.output_path, 'factors_performance.png'))
plt.close()
# 找出最佳因子数量
best_factor = min(factors, key=lambda f: results[f]['rmse'])
print(f"最佳潜在因子数量: {best_factor}, RMSE: {results[best_factor]['rmse']:.4f}")
return results, best_factor
def get_sample_predictions(self, n_samples=10):
"""
获取样例预测
参数:
n_samples (int): 样例数量
返回:
DataFrame: 样例预测结果
"""
print(f"\n获取{n_samples}个预测样例...")
# 获取原始矩阵中的缺失值位置
original_matrix = self.user_movie_matrix.values
mask = (original_matrix == 0)
# 找出所有缺失值的索引
missing_indices = np.where(mask)
if len(missing_indices[0]) == 0:
print("没有缺失评分需要预测")
return None
# 随机选择样例
random_indices = np.random.choice(
len(missing_indices[0]),
min(n_samples, len(missing_indices[0])),
replace=False
)
samples = []
for idx in random_indices:
user_idx = missing_indices[0][idx]
movie_idx = missing_indices[1][idx]
user_id = self.user_movie_matrix.index[user_idx]
movie_id = self.user_movie_matrix.columns[movie_idx]
# 获取电影标题和类型
movie_info = self.movies_df[self.movies_df['movieId'] == movie_id]
if not movie_info.empty:
movie_title = movie_info.iloc[0]['title']
movie_genres = movie_info.iloc[0]['genres']
else:
movie_title = f"Unknown ({movie_id})"
movie_genres = "Unknown"
# 获取用户信息
user_info = self.users_df[self.users_df['userId'] == user_id]
if not user_info.empty:
user_gender = user_info.iloc[0]['gender']
user_age = user_info.iloc[0]['age']
else:
user_gender = "Unknown"
user_age = "Unknown"
# 获取预测评分
predicted_rating = self.user_movie_predictions.iloc[user_idx, movie_idx]
samples.append({
'userId': user_id,
'userGender': user_gender,
'userAge': user_age,
'movieId': movie_id,
'movieTitle': movie_title,
'movieGenres': movie_genres,
'predictedRating': round(predicted_rating, 2)
})
samples_df = pd.DataFrame(samples)
# 打印样例
print("\n预测样例:")
for _, row in samples_df.iterrows():
print(f"用户 {row['userId']} ({row['userGender']}, 年龄组 {row['userAge']}) -> "
f"电影 '{row['movieTitle']}' ({row['movieGenres']}) -> "
f"预测评分: {row['predictedRating']}")
return samples_df
def analyze_user_movie_patterns(self):
"""
分析用户-电影模式
返回:
self: 返回实例本身以支持链式调用
"""
print("\n分析用户-电影评分模式...")
# 分析用户评分活跃度
user_rating_counts = self.ratings_df['userId'].value_counts()
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
sns.histplot(user_rating_counts, bins=30, kde=True)
if self.use_chinese:
plt.xlabel('用户评分数量')
plt.ylabel('用户数')
plt.title('用户评分活跃度分布')
else:
plt.xlabel('Number of Ratings per User')
plt.ylabel('Number of Users')
plt.title('User Rating Activity Distribution')
# 分析电影受欢迎程度
movie_rating_counts = self.ratings_df['movieId'].value_counts()
plt.subplot(1, 2, 2)
sns.histplot(movie_rating_counts, bins=30, kde=True)
if self.use_chinese:
plt.xlabel('电影评分数量')
plt.ylabel('电影数')
plt.title('电影受欢迎程度分布')
else:
plt.xlabel('Number of Ratings per Movie')
plt.ylabel('Number of Movies')
plt.title('Movie Popularity Distribution')
plt.tight_layout()
plt.savefig(os.path.join(self.output_path, 'user_movie_patterns.png'))
plt.close()
# 分析不同类型电影的平均评分
movie_genres = []
for genres in self.movies_df['genres']:
movie_genres.extend(genres.split('|'))
movie_genres = list(set(movie_genres))
genre_avg_ratings = {}
genre_rating_counts = {}
for genre in movie_genres:
# 找出属于该类型的电影
genre_movies = self.movies_df[self.movies_df['genres'].str.contains(genre)]['movieId'].values
# 获取这些电影的评分
genre_ratings = self.ratings_df[self.ratings_df['movieId'].isin(genre_movies)]['rating']
if len(genre_ratings) > 0:
genre_avg_ratings[genre] = genre_ratings.mean()
genre_rating_counts[genre] = len(genre_ratings)
# 绘制类型平均评分
plt.figure(figsize=(12, 6))
genres = list(genre_avg_ratings.keys())
avg_ratings = list(genre_avg_ratings.values())
# 根据平均评分排序
sorted_indices = np.argsort(avg_ratings)
sorted_genres = [genres[i] for i in sorted_indices]
sorted_ratings = [avg_ratings[i] for i in sorted_indices]
plt.barh(sorted_genres, sorted_ratings, color='skyblue')
if self.use_chinese:
plt.xlabel('平均评分')
plt.ylabel('电影类型')
plt.title('各类型电影平均评分')
else:
plt.xlabel('Average Rating')
plt.ylabel('Movie Genre')
plt.title('Average Rating by Movie Genre')
plt.grid(axis='x', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.savefig(os.path.join(self.output_path, 'genre_average_ratings.png'))
plt.close()
return self
def run_pipeline(self, n_factors=100, test_size=0.2, tune_factors=True):
"""
运行完整的矩阵分解推荐系统流程
参数:
n_factors (int): 默认潜在因子数量
test_size (float): 测试集比例
tune_factors (bool): 是否调优潜在因子数量
返回:
self: 返回实例本身以支持链式调用
"""
# 记录开始时间
start_time = time.time()
# 加载数据
self.load_data()
# 预处理数据
self.preprocess_data()
# 分析用户-电影模式
self.analyze_user_movie_patterns()
# 分割训练集和测试集
train_data, test_data = self.train_test_split_matrix(test_size=test_size)
# 创建训练矩阵
train_matrix = self.create_training_matrix(train_data)
# 如果需要调优潜在因子数量
if tune_factors:
# 比较不同因子数量的性能
_, best_factor = self.compare_factor_performance(
train_matrix,
test_data,
factors_list=[50, 100, 150, 200]
)
n_factors = best_factor
# 使用最佳因子数量训练模型
self.matrix_factorization_svd(train_matrix, n_factors=n_factors)
# 评估模型
self.evaluate_model(test_data, self.user_movie_predictions.values)
# 获取样例预测
self.get_sample_predictions(n_samples=10)
# 填补缺失评分
self.fill_missing_ratings()
# 保存结果
self.save_results()
# 报告总运行时间
total_time = time.time() - start_time
print(f"\n总运行时间: {total_time:.2f}")
return self
if __name__ == "__main__":
# 创建并运行推荐系统
recommender = MovieLensMatrixFactorization()
recommender.run_pipeline(n_factors=100, test_size=0.2, tune_factors=True)
print("\n矩阵分解评分预测系统执行完成!")
print("已生成填补后的完整评分矩阵。")