数据填补预测完成。

This commit is contained in:
Cat Tom
2025-05-05 04:46:22 +08:00
parent 1afcbb70a4
commit cb8e81e94a
10 changed files with 22390990 additions and 779 deletions

708
matrix_factorization.py Normal file
View File

@ -0,0 +1,708 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
MovieLens评分预测系统
===================
使用矩阵分解方法SVD预测MovieLens数据集中缺失的评分数据。
数据集特征:
- 1,000,209条评分
- 6,040位用户
- 3,952部电影
- 评分范围1-5分
"""
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.sparse.linalg import svds
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import time
import matplotlib
# 设置中文字体支持
try:
# 首先尝试设置微软雅黑(Windows)或Heiti SC(macOS)
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'Heiti SC', 'SimHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题
# 验证字体是否支持中文
matplotlib.font_manager.findfont('SimHei')
use_chinese = True
except:
print("警告: 系统中没有找到支持中文的字体,将使用英文标签")
use_chinese = False
# 设置随机种子确保结果可复现
np.random.seed(42)
class MovieLensMatrixFactorization:
"""MovieLens数据集矩阵分解模型实现"""
def __init__(self, data_path='./dataset', output_path='./result'):
"""
初始化矩阵分解模型
参数:
data_path (str): 数据文件所在路径
output_path (str): 输出结果保存路径
"""
self.data_path = data_path
self.output_path = output_path
self.ratings_df = None
self.users_df = None
self.movies_df = None
self.user_movie_matrix = None
self.user_movie_matrix_mean = None
self.user_movie_predictions = None
self.filled_ratings_matrix = None
self.use_chinese = use_chinese
# 创建输出目录(如果不存在)
if not os.path.exists(output_path):
os.makedirs(output_path)
def load_data(self):
"""
加载MovieLens数据集
返回:
self: 返回实例本身以支持链式调用
"""
print("\n加载MovieLens数据集...")
# 加载评分数据
ratings_file = os.path.join(self.data_path, 'ratings.dat')
self.ratings_df = pd.read_csv(
ratings_file,
sep='::',
engine='python',
names=['userId', 'movieId', 'rating', 'timestamp']
)
# 加载电影数据
movies_file = os.path.join(self.data_path, 'movies.dat')
self.movies_df = pd.read_csv(
movies_file,
sep='::',
engine='python',
names=['movieId', 'title', 'genres'],
encoding='ISO-8859-1'
)
# 加载用户数据
users_file = os.path.join(self.data_path, 'users.dat')
self.users_df = pd.read_csv(
users_file,
sep='::',
engine='python',
names=['userId', 'gender', 'age', 'occupation', 'zipcode']
)
# 打印数据集基本信息
print(f"加载完成: {len(self.ratings_df)} 条评分, {len(self.movies_df)} 部电影, {len(self.users_df)} 位用户")
print(f"评分范围: {self.ratings_df['rating'].min()}-{self.ratings_df['rating'].max()}")
# 计算评分矩阵稀疏度
total_possible_ratings = len(self.users_df) * len(self.movies_df)
sparsity = 100 * (1 - len(self.ratings_df) / total_possible_ratings)
print(f"评分矩阵稀疏度: {sparsity:.2f}%")
return self
def preprocess_data(self):
"""
预处理数据:创建用户-电影评分矩阵
返回:
self: 返回实例本身以支持链式调用
"""
print("\n创建用户-电影评分矩阵...")
# 创建评分透视表
self.user_movie_matrix = self.ratings_df.pivot(
index='userId',
columns='movieId',
values='rating'
).fillna(0)
print(f"评分矩阵形状: {self.user_movie_matrix.shape}")
return self
def train_test_split_matrix(self, test_size=0.2):
"""
将评分数据分割为训练集和测试集
参数:
test_size (float): 测试集比例
返回:
tuple: (训练数据, 测试数据) 元组
"""
print(f"\n将数据分为训练集({100 - test_size * 100:.0f}%)和测试集({test_size * 100:.0f}%)...")
# 将非零评分转换为列表格式
rating_records = []
for i in range(len(self.user_movie_matrix.index)):
for j in range(len(self.user_movie_matrix.columns)):
if self.user_movie_matrix.iloc[i, j] > 0:
user_id = self.user_movie_matrix.index[i]
movie_id = self.user_movie_matrix.columns[j]
rating = self.user_movie_matrix.iloc[i, j]
rating_records.append((user_id, movie_id, rating))
# 转换为DataFrame
ratings_list_df = pd.DataFrame(rating_records, columns=['userId', 'movieId', 'rating'])
# 分割训练集和测试集
train_data, test_data = train_test_split(
ratings_list_df,
test_size=test_size,
random_state=42
)
print(f"训练集: {len(train_data)} 条评分, 测试集: {len(test_data)} 条评分")
return train_data, test_data
def create_training_matrix(self, train_data):
"""
根据训练数据创建用户-电影评分矩阵
参数:
train_data (DataFrame): 训练数据
返回:
ndarray: 训练评分矩阵
"""
# 创建训练矩阵
train_matrix = self.user_movie_matrix.copy().values
# 将矩阵中所有数据清零(后面会填入训练数据)
train_matrix[:] = 0
# 填入训练数据
for _, row in train_data.iterrows():
user_idx = self.user_movie_matrix.index.get_loc(row['userId'])
movie_idx = self.user_movie_matrix.columns.get_loc(row['movieId'])
train_matrix[user_idx, movie_idx] = row['rating']
return train_matrix
def matrix_factorization_svd(self, train_matrix, n_factors=100):
"""
使用SVD进行矩阵分解
参数:
train_matrix (ndarray): 训练评分矩阵
n_factors (int): 潜在因子数量
返回:
ndarray: 预测评分矩阵
"""
print(f"\n使用SVD进行矩阵分解 (潜在因子数量: {n_factors})...")
# 计算用户评分均值
user_ratings_mean = np.mean(train_matrix, axis=1).reshape(-1, 1)
# 去中心化
ratings_centered = train_matrix - user_ratings_mean
# 使用SVD分解矩阵
U, sigma, Vt = svds(ratings_centered, k=n_factors)
# 重构sigma为对角矩阵
sigma_diag = np.diag(sigma)
# 预测完整矩阵
all_user_predicted_ratings = user_ratings_mean + np.dot(np.dot(U, sigma_diag), Vt)
# 保存均值用于后续预测
self.user_movie_matrix_mean = user_ratings_mean
# 将预测结果转换为DataFrame
self.user_movie_predictions = pd.DataFrame(
all_user_predicted_ratings,
index=self.user_movie_matrix.index,
columns=self.user_movie_matrix.columns
)
return all_user_predicted_ratings
def evaluate_model(self, test_data, predicted_matrix):
"""
评估模型性能
参数:
test_data (DataFrame): 测试数据
predicted_matrix (ndarray): 预测评分矩阵
返回:
float: RMSE值
"""
print("\n评估模型性能...")
# 收集实际评分和预测评分
actual_ratings = []
predicted_ratings = []
for _, row in test_data.iterrows():
user_idx = self.user_movie_matrix.index.get_loc(row['userId'])
movie_idx = self.user_movie_matrix.columns.get_loc(row['movieId'])
actual = row['rating']
predicted = predicted_matrix[user_idx, movie_idx]
actual_ratings.append(actual)
predicted_ratings.append(predicted)
# 计算RMSE
rmse = np.sqrt(mean_squared_error(actual_ratings, predicted_ratings))
print(f"测试集RMSE: {rmse:.4f}")
# 绘制实际评分与预测评分散点图
plt.figure(figsize=(10, 6))
plt.scatter(actual_ratings, predicted_ratings, alpha=0.3)
plt.plot([min(actual_ratings), max(actual_ratings)],
[min(actual_ratings), max(actual_ratings)],
'r--', lw=2)
# 设置标签(英文或中文)
if self.use_chinese:
plt.xlabel('实际评分')
plt.ylabel('预测评分')
plt.title('实际评分 vs 预测评分')
else:
plt.xlabel('Actual Ratings')
plt.ylabel('Predicted Ratings')
plt.title('Actual vs Predicted Ratings')
plt.tight_layout()
plt.savefig(os.path.join(self.output_path, 'rating_prediction_scatter.png'))
plt.close()
# 计算预测评分的分布
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.hist(actual_ratings, bins=20, alpha=0.7, label='Actual' if not self.use_chinese else '实际评分')
plt.hist(predicted_ratings, bins=20, alpha=0.7, label='Predicted' if not self.use_chinese else '预测评分')
if self.use_chinese:
plt.xlabel('评分')
plt.ylabel('频率')
plt.title('评分分布对比')
else:
plt.xlabel('Rating')
plt.ylabel('Frequency')
plt.title('Rating Distribution Comparison')
plt.legend()
plt.subplot(1, 2, 2)
prediction_errors = np.array(actual_ratings) - np.array(predicted_ratings)
plt.hist(prediction_errors, bins=20)
if self.use_chinese:
plt.xlabel('预测误差(实际-预测)')
plt.ylabel('频率')
plt.title('预测误差分布')
else:
plt.xlabel('Prediction Error (Actual-Predicted)')
plt.ylabel('Frequency')
plt.title('Prediction Error Distribution')
plt.tight_layout()
plt.savefig(os.path.join(self.output_path, 'prediction_analysis.png'))
plt.close()
return rmse
def fill_missing_ratings(self):
"""
填补缺失评分
返回:
DataFrame: 填补后的完整评分矩阵
"""
print("\n填补缺失评分...")
# 获取原始评分矩阵和预测评分矩阵
original_ratings = self.user_movie_matrix.values
predicted_ratings = self.user_movie_predictions.values
# 创建填补后的矩阵(保持原有评分,填补缺失评分)
filled_ratings = original_ratings.copy()
# 将原始值为0的位置即缺失值填充为预测值
mask = (filled_ratings == 0)
filled_ratings[mask] = predicted_ratings[mask]
# 将超出范围的预测值裁剪到有效范围内
filled_ratings = np.clip(filled_ratings, 1, 5)
# 转换为DataFrame
self.filled_ratings_matrix = pd.DataFrame(
filled_ratings,
index=self.user_movie_matrix.index,
columns=self.user_movie_matrix.columns
)
missing_count = np.sum(original_ratings == 0)
total_cells = original_ratings.size
print(f"总评分单元格数: {total_cells}")
print(f"原始缺失评分数: {missing_count} ({missing_count / total_cells * 100:.2f}%)")
print(f"已预测填补: {missing_count} 个评分")
return self.filled_ratings_matrix
def save_results(self):
"""
保存结果
返回:
self: 返回实例本身以支持链式调用
"""
print("\n保存结果...")
# 保存填补后的完整评分矩阵
ratings_file = os.path.join(self.output_path, 'filled_ratings_matrix.csv')
self.filled_ratings_matrix.to_csv(ratings_file)
# 将评分矩阵转换为评分列表格式(更易处理)
ratings_list = []
for user_id in self.filled_ratings_matrix.index:
for movie_id in self.filled_ratings_matrix.columns:
rating = self.filled_ratings_matrix.loc[user_id, movie_id]
if rating > 0: # 排除可能的0值评分
ratings_list.append({
'userId': user_id,
'movieId': movie_id,
'rating': rating,
'isOriginal': self.user_movie_matrix.loc[user_id, movie_id] > 0
})
# 保存为CSV
ratings_df = pd.DataFrame(ratings_list)
ratings_list_file = os.path.join(self.output_path, 'filled_ratings_list.csv')
ratings_df.to_csv(ratings_list_file, index=False)
print(f"已保存填补后的评分矩阵: {ratings_file}")
print(f"已保存填补后的评分列表: {ratings_list_file}")
return self
def compare_factor_performance(self, train_matrix, test_data, factors_list=[50, 100, 150, 200]):
"""
比较不同潜在因子数量的性能
参数:
train_matrix (ndarray): 训练评分矩阵
test_data (DataFrame): 测试数据
factors_list (list): 要测试的潜在因子数量列表
返回:
dict: 每个因子数量对应的RMSE
"""
print("\n比较不同潜在因子数量的性能...")
results = {}
for n_factors in factors_list:
print(f"测试潜在因子数量: {n_factors}")
# 训练模型
start_time = time.time()
predicted_matrix = self.matrix_factorization_svd(train_matrix, n_factors=n_factors)
training_time = time.time() - start_time
# 评估模型
rmse = self.evaluate_model(test_data, predicted_matrix)
results[n_factors] = {
'rmse': rmse,
'training_time': training_time
}
print(f"训练时间: {training_time:.2f}")
# 可视化结果
factors = list(results.keys())
rmse_values = [results[f]['rmse'] for f in factors]
training_times = [results[f]['training_time'] for f in factors]
fig, ax1 = plt.figure(figsize=(10, 6)), plt.gca()
color1 = 'tab:blue'
ax1.set_xlabel('Latent Factors' if not self.use_chinese else '潜在因子数量')
ax1.set_ylabel('RMSE', color=color1)
ax1.plot(factors, rmse_values, 'o-', color=color1, label='RMSE')
ax1.tick_params(axis='y', labelcolor=color1)
ax2 = ax1.twinx()
color2 = 'tab:red'
ax2.set_ylabel('Training Time (s)' if not self.use_chinese else '训练时间(秒)', color=color2)
ax2.plot(factors, training_times, 's-', color=color2,
label='Training Time' if not self.use_chinese else '训练时间')
ax2.tick_params(axis='y', labelcolor=color2)
if self.use_chinese:
plt.title('潜在因子数量对性能的影响')
else:
plt.title('Impact of Latent Factors on Performance')
fig.tight_layout()
plt.savefig(os.path.join(self.output_path, 'factors_performance.png'))
plt.close()
# 找出最佳因子数量
best_factor = min(factors, key=lambda f: results[f]['rmse'])
print(f"最佳潜在因子数量: {best_factor}, RMSE: {results[best_factor]['rmse']:.4f}")
return results, best_factor
def get_sample_predictions(self, n_samples=10):
"""
获取样例预测
参数:
n_samples (int): 样例数量
返回:
DataFrame: 样例预测结果
"""
print(f"\n获取{n_samples}个预测样例...")
# 获取原始矩阵中的缺失值位置
original_matrix = self.user_movie_matrix.values
mask = (original_matrix == 0)
# 找出所有缺失值的索引
missing_indices = np.where(mask)
if len(missing_indices[0]) == 0:
print("没有缺失评分需要预测")
return None
# 随机选择样例
random_indices = np.random.choice(
len(missing_indices[0]),
min(n_samples, len(missing_indices[0])),
replace=False
)
samples = []
for idx in random_indices:
user_idx = missing_indices[0][idx]
movie_idx = missing_indices[1][idx]
user_id = self.user_movie_matrix.index[user_idx]
movie_id = self.user_movie_matrix.columns[movie_idx]
# 获取电影标题和类型
movie_info = self.movies_df[self.movies_df['movieId'] == movie_id]
if not movie_info.empty:
movie_title = movie_info.iloc[0]['title']
movie_genres = movie_info.iloc[0]['genres']
else:
movie_title = f"Unknown ({movie_id})"
movie_genres = "Unknown"
# 获取用户信息
user_info = self.users_df[self.users_df['userId'] == user_id]
if not user_info.empty:
user_gender = user_info.iloc[0]['gender']
user_age = user_info.iloc[0]['age']
else:
user_gender = "Unknown"
user_age = "Unknown"
# 获取预测评分
predicted_rating = self.user_movie_predictions.iloc[user_idx, movie_idx]
samples.append({
'userId': user_id,
'userGender': user_gender,
'userAge': user_age,
'movieId': movie_id,
'movieTitle': movie_title,
'movieGenres': movie_genres,
'predictedRating': round(predicted_rating, 2)
})
samples_df = pd.DataFrame(samples)
# 打印样例
print("\n预测样例:")
for _, row in samples_df.iterrows():
print(f"用户 {row['userId']} ({row['userGender']}, 年龄组 {row['userAge']}) -> "
f"电影 '{row['movieTitle']}' ({row['movieGenres']}) -> "
f"预测评分: {row['predictedRating']}")
return samples_df
def analyze_user_movie_patterns(self):
"""
分析用户-电影模式
返回:
self: 返回实例本身以支持链式调用
"""
print("\n分析用户-电影评分模式...")
# 分析用户评分活跃度
user_rating_counts = self.ratings_df['userId'].value_counts()
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
sns.histplot(user_rating_counts, bins=30, kde=True)
if self.use_chinese:
plt.xlabel('用户评分数量')
plt.ylabel('用户数')
plt.title('用户评分活跃度分布')
else:
plt.xlabel('Number of Ratings per User')
plt.ylabel('Number of Users')
plt.title('User Rating Activity Distribution')
# 分析电影受欢迎程度
movie_rating_counts = self.ratings_df['movieId'].value_counts()
plt.subplot(1, 2, 2)
sns.histplot(movie_rating_counts, bins=30, kde=True)
if self.use_chinese:
plt.xlabel('电影评分数量')
plt.ylabel('电影数')
plt.title('电影受欢迎程度分布')
else:
plt.xlabel('Number of Ratings per Movie')
plt.ylabel('Number of Movies')
plt.title('Movie Popularity Distribution')
plt.tight_layout()
plt.savefig(os.path.join(self.output_path, 'user_movie_patterns.png'))
plt.close()
# 分析不同类型电影的平均评分
movie_genres = []
for genres in self.movies_df['genres']:
movie_genres.extend(genres.split('|'))
movie_genres = list(set(movie_genres))
genre_avg_ratings = {}
genre_rating_counts = {}
for genre in movie_genres:
# 找出属于该类型的电影
genre_movies = self.movies_df[self.movies_df['genres'].str.contains(genre)]['movieId'].values
# 获取这些电影的评分
genre_ratings = self.ratings_df[self.ratings_df['movieId'].isin(genre_movies)]['rating']
if len(genre_ratings) > 0:
genre_avg_ratings[genre] = genre_ratings.mean()
genre_rating_counts[genre] = len(genre_ratings)
# 绘制类型平均评分
plt.figure(figsize=(12, 6))
genres = list(genre_avg_ratings.keys())
avg_ratings = list(genre_avg_ratings.values())
# 根据平均评分排序
sorted_indices = np.argsort(avg_ratings)
sorted_genres = [genres[i] for i in sorted_indices]
sorted_ratings = [avg_ratings[i] for i in sorted_indices]
plt.barh(sorted_genres, sorted_ratings, color='skyblue')
if self.use_chinese:
plt.xlabel('平均评分')
plt.ylabel('电影类型')
plt.title('各类型电影平均评分')
else:
plt.xlabel('Average Rating')
plt.ylabel('Movie Genre')
plt.title('Average Rating by Movie Genre')
plt.grid(axis='x', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.savefig(os.path.join(self.output_path, 'genre_average_ratings.png'))
plt.close()
return self
def run_pipeline(self, n_factors=100, test_size=0.2, tune_factors=True):
"""
运行完整的矩阵分解推荐系统流程
参数:
n_factors (int): 默认潜在因子数量
test_size (float): 测试集比例
tune_factors (bool): 是否调优潜在因子数量
返回:
self: 返回实例本身以支持链式调用
"""
# 记录开始时间
start_time = time.time()
# 加载数据
self.load_data()
# 预处理数据
self.preprocess_data()
# 分析用户-电影模式
self.analyze_user_movie_patterns()
# 分割训练集和测试集
train_data, test_data = self.train_test_split_matrix(test_size=test_size)
# 创建训练矩阵
train_matrix = self.create_training_matrix(train_data)
# 如果需要调优潜在因子数量
if tune_factors:
# 比较不同因子数量的性能
_, best_factor = self.compare_factor_performance(
train_matrix,
test_data,
factors_list=[50, 100, 150, 200]
)
n_factors = best_factor
# 使用最佳因子数量训练模型
self.matrix_factorization_svd(train_matrix, n_factors=n_factors)
# 评估模型
self.evaluate_model(test_data, self.user_movie_predictions.values)
# 获取样例预测
self.get_sample_predictions(n_samples=10)
# 填补缺失评分
self.fill_missing_ratings()
# 保存结果
self.save_results()
# 报告总运行时间
total_time = time.time() - start_time
print(f"\n总运行时间: {total_time:.2f}")
return self
if __name__ == "__main__":
# 创建并运行推荐系统
recommender = MovieLensMatrixFactorization()
recommender.run_pipeline(n_factors=100, test_size=0.2, tune_factors=True)
print("\n矩阵分解评分预测系统执行完成!")
print("已生成填补后的完整评分矩阵。")