#!/usr/bin/env python # -*- coding: utf-8 -*- """ MovieLens评分预测系统 =================== 使用矩阵分解方法(SVD)预测MovieLens数据集中缺失的评分数据。 数据集特征: - 1,000,209条评分 - 6,040位用户 - 3,952部电影 - 评分范围:1-5分 """ import os import pandas as pd import numpy as np import matplotlib.pyplot as plt import seaborn as sns from scipy.sparse.linalg import svds from sklearn.metrics import mean_squared_error from sklearn.model_selection import train_test_split import time import matplotlib # 设置中文字体支持 try: # 首先尝试设置微软雅黑(Windows)或Heiti SC(macOS) plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'Heiti SC', 'SimHei', 'DejaVu Sans'] plt.rcParams['axes.unicode_minus'] = False # 解决负号显示问题 # 验证字体是否支持中文 matplotlib.font_manager.findfont('SimHei') use_chinese = True except: print("警告: 系统中没有找到支持中文的字体,将使用英文标签") use_chinese = False # 设置随机种子确保结果可复现 np.random.seed(42) class MovieLensMatrixFactorization: """MovieLens数据集矩阵分解模型实现""" def __init__(self, data_path='./dataset', output_path='./result'): """ 初始化矩阵分解模型 参数: data_path (str): 数据文件所在路径 output_path (str): 输出结果保存路径 """ self.data_path = data_path self.output_path = output_path self.ratings_df = None self.users_df = None self.movies_df = None self.user_movie_matrix = None self.user_movie_matrix_mean = None self.user_movie_predictions = None self.filled_ratings_matrix = None self.use_chinese = use_chinese # 创建输出目录(如果不存在) if not os.path.exists(output_path): os.makedirs(output_path) def load_data(self): """ 加载MovieLens数据集 返回: self: 返回实例本身以支持链式调用 """ print("\n加载MovieLens数据集...") # 加载评分数据 ratings_file = os.path.join(self.data_path, 'ratings.dat') self.ratings_df = pd.read_csv( ratings_file, sep='::', engine='python', names=['userId', 'movieId', 'rating', 'timestamp'] ) # 加载电影数据 movies_file = os.path.join(self.data_path, 'movies.dat') self.movies_df = pd.read_csv( movies_file, sep='::', engine='python', names=['movieId', 'title', 'genres'], encoding='ISO-8859-1' ) # 加载用户数据 users_file = os.path.join(self.data_path, 'users.dat') self.users_df = pd.read_csv( users_file, sep='::', engine='python', names=['userId', 'gender', 'age', 'occupation', 'zipcode'] ) # 打印数据集基本信息 print(f"加载完成: {len(self.ratings_df)} 条评分, {len(self.movies_df)} 部电影, {len(self.users_df)} 位用户") print(f"评分范围: {self.ratings_df['rating'].min()}-{self.ratings_df['rating'].max()}") # 计算评分矩阵稀疏度 total_possible_ratings = len(self.users_df) * len(self.movies_df) sparsity = 100 * (1 - len(self.ratings_df) / total_possible_ratings) print(f"评分矩阵稀疏度: {sparsity:.2f}%") return self def preprocess_data(self): """ 预处理数据:创建用户-电影评分矩阵 返回: self: 返回实例本身以支持链式调用 """ print("\n创建用户-电影评分矩阵...") # 创建评分透视表 self.user_movie_matrix = self.ratings_df.pivot( index='userId', columns='movieId', values='rating' ).fillna(0) print(f"评分矩阵形状: {self.user_movie_matrix.shape}") return self def train_test_split_matrix(self, test_size=0.2): """ 将评分数据分割为训练集和测试集 参数: test_size (float): 测试集比例 返回: tuple: (训练数据, 测试数据) 元组 """ print(f"\n将数据分为训练集({100 - test_size * 100:.0f}%)和测试集({test_size * 100:.0f}%)...") # 将非零评分转换为列表格式 rating_records = [] for i in range(len(self.user_movie_matrix.index)): for j in range(len(self.user_movie_matrix.columns)): if self.user_movie_matrix.iloc[i, j] > 0: user_id = self.user_movie_matrix.index[i] movie_id = self.user_movie_matrix.columns[j] rating = self.user_movie_matrix.iloc[i, j] rating_records.append((user_id, movie_id, rating)) # 转换为DataFrame ratings_list_df = pd.DataFrame(rating_records, columns=['userId', 'movieId', 'rating']) # 分割训练集和测试集 train_data, test_data = train_test_split( ratings_list_df, test_size=test_size, random_state=42 ) print(f"训练集: {len(train_data)} 条评分, 测试集: {len(test_data)} 条评分") return train_data, test_data def create_training_matrix(self, train_data): """ 根据训练数据创建用户-电影评分矩阵 参数: train_data (DataFrame): 训练数据 返回: ndarray: 训练评分矩阵 """ # 创建训练矩阵 train_matrix = self.user_movie_matrix.copy().values # 将矩阵中所有数据清零(后面会填入训练数据) train_matrix[:] = 0 # 填入训练数据 for _, row in train_data.iterrows(): user_idx = self.user_movie_matrix.index.get_loc(row['userId']) movie_idx = self.user_movie_matrix.columns.get_loc(row['movieId']) train_matrix[user_idx, movie_idx] = row['rating'] return train_matrix def matrix_factorization_svd(self, train_matrix, n_factors=100): """ 使用SVD进行矩阵分解 参数: train_matrix (ndarray): 训练评分矩阵 n_factors (int): 潜在因子数量 返回: ndarray: 预测评分矩阵 """ print(f"\n使用SVD进行矩阵分解 (潜在因子数量: {n_factors})...") # 计算用户评分均值 user_ratings_mean = np.mean(train_matrix, axis=1).reshape(-1, 1) # 去中心化 ratings_centered = train_matrix - user_ratings_mean # 使用SVD分解矩阵 U, sigma, Vt = svds(ratings_centered, k=n_factors) # 重构sigma为对角矩阵 sigma_diag = np.diag(sigma) # 预测完整矩阵 all_user_predicted_ratings = user_ratings_mean + np.dot(np.dot(U, sigma_diag), Vt) # 保存均值用于后续预测 self.user_movie_matrix_mean = user_ratings_mean # 将预测结果转换为DataFrame self.user_movie_predictions = pd.DataFrame( all_user_predicted_ratings, index=self.user_movie_matrix.index, columns=self.user_movie_matrix.columns ) return all_user_predicted_ratings def evaluate_model(self, test_data, predicted_matrix): """ 评估模型性能 参数: test_data (DataFrame): 测试数据 predicted_matrix (ndarray): 预测评分矩阵 返回: float: RMSE值 """ print("\n评估模型性能...") # 收集实际评分和预测评分 actual_ratings = [] predicted_ratings = [] for _, row in test_data.iterrows(): user_idx = self.user_movie_matrix.index.get_loc(row['userId']) movie_idx = self.user_movie_matrix.columns.get_loc(row['movieId']) actual = row['rating'] predicted = predicted_matrix[user_idx, movie_idx] actual_ratings.append(actual) predicted_ratings.append(predicted) # 计算RMSE rmse = np.sqrt(mean_squared_error(actual_ratings, predicted_ratings)) print(f"测试集RMSE: {rmse:.4f}") # 绘制实际评分与预测评分散点图 plt.figure(figsize=(10, 6)) plt.scatter(actual_ratings, predicted_ratings, alpha=0.3) plt.plot([min(actual_ratings), max(actual_ratings)], [min(actual_ratings), max(actual_ratings)], 'r--', lw=2) # 设置标签(英文或中文) if self.use_chinese: plt.xlabel('实际评分') plt.ylabel('预测评分') plt.title('实际评分 vs 预测评分') else: plt.xlabel('Actual Ratings') plt.ylabel('Predicted Ratings') plt.title('Actual vs Predicted Ratings') plt.tight_layout() plt.savefig(os.path.join(self.output_path, 'rating_prediction_scatter.png')) plt.close() # 计算预测评分的分布 plt.figure(figsize=(12, 5)) plt.subplot(1, 2, 1) plt.hist(actual_ratings, bins=20, alpha=0.7, label='Actual' if not self.use_chinese else '实际评分') plt.hist(predicted_ratings, bins=20, alpha=0.7, label='Predicted' if not self.use_chinese else '预测评分') if self.use_chinese: plt.xlabel('评分') plt.ylabel('频率') plt.title('评分分布对比') else: plt.xlabel('Rating') plt.ylabel('Frequency') plt.title('Rating Distribution Comparison') plt.legend() plt.subplot(1, 2, 2) prediction_errors = np.array(actual_ratings) - np.array(predicted_ratings) plt.hist(prediction_errors, bins=20) if self.use_chinese: plt.xlabel('预测误差(实际-预测)') plt.ylabel('频率') plt.title('预测误差分布') else: plt.xlabel('Prediction Error (Actual-Predicted)') plt.ylabel('Frequency') plt.title('Prediction Error Distribution') plt.tight_layout() plt.savefig(os.path.join(self.output_path, 'prediction_analysis.png')) plt.close() return rmse def fill_missing_ratings(self): """ 填补缺失评分 返回: DataFrame: 填补后的完整评分矩阵 """ print("\n填补缺失评分...") # 获取原始评分矩阵和预测评分矩阵 original_ratings = self.user_movie_matrix.values predicted_ratings = self.user_movie_predictions.values # 创建填补后的矩阵(保持原有评分,填补缺失评分) filled_ratings = original_ratings.copy() # 将原始值为0的位置(即缺失值)填充为预测值 mask = (filled_ratings == 0) filled_ratings[mask] = predicted_ratings[mask] # 将超出范围的预测值裁剪到有效范围内 filled_ratings = np.clip(filled_ratings, 1, 5) # 转换为DataFrame self.filled_ratings_matrix = pd.DataFrame( filled_ratings, index=self.user_movie_matrix.index, columns=self.user_movie_matrix.columns ) missing_count = np.sum(original_ratings == 0) total_cells = original_ratings.size print(f"总评分单元格数: {total_cells}") print(f"原始缺失评分数: {missing_count} ({missing_count / total_cells * 100:.2f}%)") print(f"已预测填补: {missing_count} 个评分") return self.filled_ratings_matrix def save_results(self): """ 保存结果 返回: self: 返回实例本身以支持链式调用 """ print("\n保存结果...") # 保存填补后的完整评分矩阵 ratings_file = os.path.join(self.output_path, 'filled_ratings_matrix.csv') self.filled_ratings_matrix.to_csv(ratings_file) # 将评分矩阵转换为评分列表格式(更易处理) ratings_list = [] for user_id in self.filled_ratings_matrix.index: for movie_id in self.filled_ratings_matrix.columns: rating = self.filled_ratings_matrix.loc[user_id, movie_id] if rating > 0: # 排除可能的0值评分 ratings_list.append({ 'userId': user_id, 'movieId': movie_id, 'rating': rating, 'isOriginal': self.user_movie_matrix.loc[user_id, movie_id] > 0 }) # 保存为CSV ratings_df = pd.DataFrame(ratings_list) ratings_list_file = os.path.join(self.output_path, 'filled_ratings_list.csv') ratings_df.to_csv(ratings_list_file, index=False) print(f"已保存填补后的评分矩阵: {ratings_file}") print(f"已保存填补后的评分列表: {ratings_list_file}") return self def compare_factor_performance(self, train_matrix, test_data, factors_list=[50, 100, 150, 200]): """ 比较不同潜在因子数量的性能 参数: train_matrix (ndarray): 训练评分矩阵 test_data (DataFrame): 测试数据 factors_list (list): 要测试的潜在因子数量列表 返回: dict: 每个因子数量对应的RMSE """ print("\n比较不同潜在因子数量的性能...") results = {} for n_factors in factors_list: print(f"测试潜在因子数量: {n_factors}") # 训练模型 start_time = time.time() predicted_matrix = self.matrix_factorization_svd(train_matrix, n_factors=n_factors) training_time = time.time() - start_time # 评估模型 rmse = self.evaluate_model(test_data, predicted_matrix) results[n_factors] = { 'rmse': rmse, 'training_time': training_time } print(f"训练时间: {training_time:.2f}秒") # 可视化结果 factors = list(results.keys()) rmse_values = [results[f]['rmse'] for f in factors] training_times = [results[f]['training_time'] for f in factors] fig, ax1 = plt.figure(figsize=(10, 6)), plt.gca() color1 = 'tab:blue' ax1.set_xlabel('Latent Factors' if not self.use_chinese else '潜在因子数量') ax1.set_ylabel('RMSE', color=color1) ax1.plot(factors, rmse_values, 'o-', color=color1, label='RMSE') ax1.tick_params(axis='y', labelcolor=color1) ax2 = ax1.twinx() color2 = 'tab:red' ax2.set_ylabel('Training Time (s)' if not self.use_chinese else '训练时间(秒)', color=color2) ax2.plot(factors, training_times, 's-', color=color2, label='Training Time' if not self.use_chinese else '训练时间') ax2.tick_params(axis='y', labelcolor=color2) if self.use_chinese: plt.title('潜在因子数量对性能的影响') else: plt.title('Impact of Latent Factors on Performance') fig.tight_layout() plt.savefig(os.path.join(self.output_path, 'factors_performance.png')) plt.close() # 找出最佳因子数量 best_factor = min(factors, key=lambda f: results[f]['rmse']) print(f"最佳潜在因子数量: {best_factor}, RMSE: {results[best_factor]['rmse']:.4f}") return results, best_factor def get_sample_predictions(self, n_samples=10): """ 获取样例预测 参数: n_samples (int): 样例数量 返回: DataFrame: 样例预测结果 """ print(f"\n获取{n_samples}个预测样例...") # 获取原始矩阵中的缺失值位置 original_matrix = self.user_movie_matrix.values mask = (original_matrix == 0) # 找出所有缺失值的索引 missing_indices = np.where(mask) if len(missing_indices[0]) == 0: print("没有缺失评分需要预测") return None # 随机选择样例 random_indices = np.random.choice( len(missing_indices[0]), min(n_samples, len(missing_indices[0])), replace=False ) samples = [] for idx in random_indices: user_idx = missing_indices[0][idx] movie_idx = missing_indices[1][idx] user_id = self.user_movie_matrix.index[user_idx] movie_id = self.user_movie_matrix.columns[movie_idx] # 获取电影标题和类型 movie_info = self.movies_df[self.movies_df['movieId'] == movie_id] if not movie_info.empty: movie_title = movie_info.iloc[0]['title'] movie_genres = movie_info.iloc[0]['genres'] else: movie_title = f"Unknown ({movie_id})" movie_genres = "Unknown" # 获取用户信息 user_info = self.users_df[self.users_df['userId'] == user_id] if not user_info.empty: user_gender = user_info.iloc[0]['gender'] user_age = user_info.iloc[0]['age'] else: user_gender = "Unknown" user_age = "Unknown" # 获取预测评分 predicted_rating = self.user_movie_predictions.iloc[user_idx, movie_idx] samples.append({ 'userId': user_id, 'userGender': user_gender, 'userAge': user_age, 'movieId': movie_id, 'movieTitle': movie_title, 'movieGenres': movie_genres, 'predictedRating': round(predicted_rating, 2) }) samples_df = pd.DataFrame(samples) # 打印样例 print("\n预测样例:") for _, row in samples_df.iterrows(): print(f"用户 {row['userId']} ({row['userGender']}, 年龄组 {row['userAge']}) -> " f"电影 '{row['movieTitle']}' ({row['movieGenres']}) -> " f"预测评分: {row['predictedRating']}") return samples_df def analyze_user_movie_patterns(self): """ 分析用户-电影模式 返回: self: 返回实例本身以支持链式调用 """ print("\n分析用户-电影评分模式...") # 分析用户评分活跃度 user_rating_counts = self.ratings_df['userId'].value_counts() plt.figure(figsize=(12, 5)) plt.subplot(1, 2, 1) sns.histplot(user_rating_counts, bins=30, kde=True) if self.use_chinese: plt.xlabel('用户评分数量') plt.ylabel('用户数') plt.title('用户评分活跃度分布') else: plt.xlabel('Number of Ratings per User') plt.ylabel('Number of Users') plt.title('User Rating Activity Distribution') # 分析电影受欢迎程度 movie_rating_counts = self.ratings_df['movieId'].value_counts() plt.subplot(1, 2, 2) sns.histplot(movie_rating_counts, bins=30, kde=True) if self.use_chinese: plt.xlabel('电影评分数量') plt.ylabel('电影数') plt.title('电影受欢迎程度分布') else: plt.xlabel('Number of Ratings per Movie') plt.ylabel('Number of Movies') plt.title('Movie Popularity Distribution') plt.tight_layout() plt.savefig(os.path.join(self.output_path, 'user_movie_patterns.png')) plt.close() # 分析不同类型电影的平均评分 movie_genres = [] for genres in self.movies_df['genres']: movie_genres.extend(genres.split('|')) movie_genres = list(set(movie_genres)) genre_avg_ratings = {} genre_rating_counts = {} for genre in movie_genres: # 找出属于该类型的电影 genre_movies = self.movies_df[self.movies_df['genres'].str.contains(genre)]['movieId'].values # 获取这些电影的评分 genre_ratings = self.ratings_df[self.ratings_df['movieId'].isin(genre_movies)]['rating'] if len(genre_ratings) > 0: genre_avg_ratings[genre] = genre_ratings.mean() genre_rating_counts[genre] = len(genre_ratings) # 绘制类型平均评分 plt.figure(figsize=(12, 6)) genres = list(genre_avg_ratings.keys()) avg_ratings = list(genre_avg_ratings.values()) # 根据平均评分排序 sorted_indices = np.argsort(avg_ratings) sorted_genres = [genres[i] for i in sorted_indices] sorted_ratings = [avg_ratings[i] for i in sorted_indices] plt.barh(sorted_genres, sorted_ratings, color='skyblue') if self.use_chinese: plt.xlabel('平均评分') plt.ylabel('电影类型') plt.title('各类型电影平均评分') else: plt.xlabel('Average Rating') plt.ylabel('Movie Genre') plt.title('Average Rating by Movie Genre') plt.grid(axis='x', linestyle='--', alpha=0.7) plt.tight_layout() plt.savefig(os.path.join(self.output_path, 'genre_average_ratings.png')) plt.close() return self def run_pipeline(self, n_factors=100, test_size=0.2, tune_factors=True): """ 运行完整的矩阵分解推荐系统流程 参数: n_factors (int): 默认潜在因子数量 test_size (float): 测试集比例 tune_factors (bool): 是否调优潜在因子数量 返回: self: 返回实例本身以支持链式调用 """ # 记录开始时间 start_time = time.time() # 加载数据 self.load_data() # 预处理数据 self.preprocess_data() # 分析用户-电影模式 self.analyze_user_movie_patterns() # 分割训练集和测试集 train_data, test_data = self.train_test_split_matrix(test_size=test_size) # 创建训练矩阵 train_matrix = self.create_training_matrix(train_data) # 如果需要调优潜在因子数量 if tune_factors: # 比较不同因子数量的性能 _, best_factor = self.compare_factor_performance( train_matrix, test_data, factors_list=[50, 100, 150, 200] ) n_factors = best_factor # 使用最佳因子数量训练模型 self.matrix_factorization_svd(train_matrix, n_factors=n_factors) # 评估模型 self.evaluate_model(test_data, self.user_movie_predictions.values) # 获取样例预测 self.get_sample_predictions(n_samples=10) # 填补缺失评分 self.fill_missing_ratings() # 保存结果 self.save_results() # 报告总运行时间 total_time = time.time() - start_time print(f"\n总运行时间: {total_time:.2f}秒") return self if __name__ == "__main__": # 创建并运行推荐系统 recommender = MovieLensMatrixFactorization() recommender.run_pipeline(n_factors=100, test_size=0.2, tune_factors=True) print("\n矩阵分解评分预测系统执行完成!") print("已生成填补后的完整评分矩阵。")