This repository has been archived on 2026-04-12. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
PyDataAnalysis/Others/portfolio_analysis.py
2025-12-09 16:01:02 +08:00

1783 lines
72 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import yfinance as yf
import akshare as ak
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
from datetime import datetime, timedelta
import os
import warnings
warnings.filterwarnings('ignore')
# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
matplotlib.rcParams['axes.unicode_minus'] = False
def create_output_directory():
"""创建输出目录"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_dir = f"portfolio_analysis_{timestamp}"
# 创建主目录
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 创建子目录
subdirs = ['tables', 'charts', 'data']
for subdir in subdirs:
subdir_path = os.path.join(output_dir, subdir)
if not os.path.exists(subdir_path):
os.makedirs(subdir_path)
print(f"输出目录已创建: {output_dir}/")
return output_dir
def get_a_stock_data(symbol='515450', years=5):
"""
使用AKShare获取A股515450近五年的数据
"""
print(f"正在通过AKShare获取A股 {symbol}{years}年数据...")
# 计算5年前的日期
end_date = datetime.now()
start_date = end_date - timedelta(days=years * 365)
# 格式化日期
start_str = start_date.strftime('%Y%m%d')
end_str = end_date.strftime('%Y%m%d')
try:
print("尝试使用ETF基金接口获取A股数据...")
# 方法1:尝试使用ETF基金接口
try:
df = ak.fund_etf_hist_em(symbol=symbol, period="daily",
start_date=start_str, end_date=end_str,
adjust="qfq")
print("使用ETF基金接口成功")
except Exception as e1:
print(f"ETF基金接口失败: {e1}")
# 方法2:尝试使用股票接口
print("尝试使用股票接口...")
try:
# 尝试不同的后缀
for suffix in ['.SZ', '.SS', '']:
try:
stock_symbol = f"{symbol}{suffix}" if suffix else symbol
df = ak.stock_zh_a_hist(symbol=stock_symbol, period="daily",
start_date=start_str, end_date=end_str,
adjust="qfq")
print(f"使用股票接口成功: {stock_symbol}")
break
except:
continue
except Exception as e2:
print(f"股票接口失败: {e2}")
raise ValueError(f"无法通过AKShare获取A股{symbol}数据")
if df.empty:
raise ValueError(f"未能获取到A股 {symbol} 的数据")
print(f"获取到 {len(df)} 行数据")
# 重命名列
column_mapping = {
'日期': 'Date',
'时间': 'Date',
'收盘': 'Close',
'收盘价': 'Close',
'price': 'Close',
'最新价': 'Close'
}
for old_col, new_col in column_mapping.items():
if old_col in df.columns:
df = df.rename(columns={old_col: new_col})
# 确保Date列是datetime类型
if 'Date' in df.columns:
df['Date'] = pd.to_datetime(df['Date'])
df = df.set_index('Date')
else:
# 如果没有Date列,使用索引
df.index = pd.to_datetime(df.index)
# 按日期排序
df = df.sort_index()
# 提取收盘价
close_price = None
for col in ['Close', '收盘', '收盘价', 'price', '最新价']:
if col in df.columns:
close_price = df[col]
print(f"使用 '{col}' 列作为收盘价")
break
if close_price is None and len(df.columns) > 0:
print("警告: 未找到标准收盘价列,尝试使用第一列数值列")
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
close_price = df[col]
print(f"使用 '{col}' 列作为收盘价")
break
if close_price is None:
raise ValueError("无法提取收盘价数据")
# 转换为数值类型并处理缺失值
close_price = pd.to_numeric(close_price, errors='coerce')
close_price = close_price.ffill().bfill()
# 移除时区信息
close_price.index = close_price.index.tz_localize(None)
print(f"A股 {symbol} 数据获取成功!")
print(f"数据时间范围: {close_price.index[0].date()}{close_price.index[-1].date()}")
print(f"总交易日数: {len(close_price)}")
return close_price
except Exception as e:
print(f"通过AKShare获取A股 {symbol} 数据时出错: {e}")
raise
def get_us_stock_data(symbol='QLD', years=5):
"""
使用yfinance获取美股数据(使用QLD
"""
print(f"正在通过yfinance获取美股 {symbol}{years}年数据...")
# 计算5年前的日期
end_date = datetime.now()
start_date = end_date - timedelta(days=years * 365)
# 格式化日期为字符串
start_str = start_date.strftime('%Y-%m-%d')
end_str = end_date.strftime('%Y-%m-%d')
print(f"时间范围: {start_str}{end_str}")
try:
# 创建Ticker对象
ticker = yf.Ticker(symbol)
# 获取历史数据
stock_data = ticker.history(start=start_str, end=end_str)
if stock_data.empty:
print(f"警告: {symbol} 数据为空,尝试获取最大历史数据...")
stock_data = ticker.history(period="max")
if stock_data.empty:
raise ValueError(f"未能获取到美股 {symbol} 的数据")
# 截取最近5年的数据
cutoff_date = end_date - timedelta(days=years * 365)
stock_data = stock_data[stock_data.index >= pd.Timestamp(cutoff_date)]
print("使用最大历史数据截取成功")
print(f"美股 {symbol} 数据下载成功!")
print(
f"数据时间范围: {stock_data.index[0].date()}{stock_data.index[-1].date()}") # 修复这里:使用stock_data而不是stock_prices
print(f"总交易日数: {len(stock_data)}")
# 提取收盘价
close_price = None
# 优先使用调整后收盘价
if 'Adj Close' in stock_data.columns:
close_price = stock_data['Adj Close']
print("使用 'Adj Close' 列作为收盘价")
elif 'Close' in stock_data.columns:
close_price = stock_data['Close']
print("使用 'Close' 列作为收盘价")
else:
# 尝试寻找包含'Close'的列
for col in stock_data.columns:
if 'close' in str(col).lower():
close_price = stock_data[col]
print(f"使用 '{col}' 列作为收盘价")
break
if close_price is None:
print("警告: 未找到标准收盘价列,尝试使用第一列数值列")
for col in stock_data.columns:
if pd.api.types.is_numeric_dtype(stock_data[col]):
close_price = stock_data[col]
print(f"使用 '{col}' 列作为收盘价")
break
if close_price is None:
raise ValueError("无法提取收盘价数据")
# 转换为数值类型并处理缺失值
close_price = pd.to_numeric(close_price, errors='coerce')
close_price = close_price.ffill().bfill()
# 移除时区信息
close_price.index = close_price.index.tz_localize(None)
return close_price
except Exception as e:
print(f"通过yfinance获取美股 {symbol} 数据时出错: {e}")
raise
def get_exchange_rate_data(years=5):
"""
获取人民币兑美元汇率数据(无参数版本)
"""
print("正在获取人民币兑美元汇率数据...")
# 计算日期范围
end_date = datetime.now()
start_date = end_date - timedelta(days=years * 365)
try:
# 直接调用函数,无任何参数
df = ak.currency_boc_safe()
if df.empty:
raise ValueError("未能获取到汇率数据")
print(f"获取到 {len(df)} 条汇率数据")
print(f"数据列名: {df.columns.tolist()}")
print(f"数据示例:\n{df.head()}")
# 检查数据列名,根据实际列名重命名
# 注意:列名可能是中文的
column_mapping = {}
# 查找日期列
date_columns = ['日期', 'Date', 'date', 'time', '交易日', '交易日期']
for col in date_columns:
if col in df.columns:
column_mapping[col] = 'Date'
break
# 查找汇率列(美元兑人民币中间价)
rate_columns = ['美元', 'USD', '中间价', '现汇卖出价', '现汇买入价', 'price', 'rate', '汇率']
for col in rate_columns:
if col in df.columns:
column_mapping[col] = 'Exchange_Rate'
break
# 如果找到了需要重命名的列,执行重命名
if column_mapping:
df = df.rename(columns=column_mapping)
print(f"已将列名映射为: {column_mapping}")
# 确保有Date列
if 'Date' not in df.columns and len(df.columns) > 0:
# 使用第一列作为日期
first_col = df.columns[0]
df = df.rename(columns={first_col: 'Date'})
print(f"将第一列 '{first_col}' 作为Date列")
# 确保有Exchange_Rate列
if 'Exchange_Rate' not in df.columns and len(df.columns) > 1:
# 使用第二列作为汇率
second_col = df.columns[1]
df = df.rename(columns={second_col: 'Exchange_Rate'})
print(f"将第二列 '{second_col}' 作为Exchange_Rate列")
# 转换为datetime类型
df['Date'] = pd.to_datetime(df['Date'], errors='coerce')
df = df.dropna(subset=['Date'])
df = df.set_index('Date')
df = df.sort_index()
# 提取汇率数据
if 'Exchange_Rate' in df.columns:
exchange_rate = df['Exchange_Rate']
else:
# 如果没有Exchange_Rate列,尝试使用第一个数值列
for col in df.columns:
if pd.api.types.is_numeric_dtype(df[col]):
exchange_rate = df[col]
print(f"使用数值列 '{col}' 作为汇率数据")
break
else:
raise ValueError("找不到汇率数据列")
# 转换为数值类型
exchange_rate = pd.to_numeric(exchange_rate, errors='coerce')
exchange_rate = exchange_rate.ffill().bfill()
# 筛选指定日期范围
exchange_rate = exchange_rate[(exchange_rate.index >= start_date) &
(exchange_rate.index <= end_date)]
if len(exchange_rate) == 0:
print("警告: 指定日期范围内无汇率数据,使用全部数据")
exchange_rate = exchange_rate
print(f"汇率数据获取成功!")
print(f"数据时间范围: {exchange_rate.index[0].date()}{exchange_rate.index[-1].date()}")
print(f"可用数据点数: {len(exchange_rate)}")
print(f"最新汇率: 1美元 = {exchange_rate.iloc[-1]:.4f} 人民币")
# 如果需要,可以插值生成每日数据
if len(exchange_rate) < years * 365 * 0.5: # 如果数据点少于预期的一半
print("数据点较少,生成完整日期序列...")
full_index = pd.date_range(start=start_date, end=end_date, freq='D')
exchange_rate = exchange_rate.reindex(full_index)
exchange_rate = exchange_rate.ffill().bfill()
print(f"插值后数据点数: {len(exchange_rate)}")
return exchange_rate
except Exception as e:
print(f"获取汇率数据时出错: {e}")
import traceback
traceback.print_exc()
print("将使用固定汇率7.1进行估算")
# 创建完整的日期序列
all_dates = pd.date_range(start=start_date, end=end_date, freq='D')
exchange_rate = pd.Series(7.1, index=all_dates, name='Exchange_Rate')
return exchange_rate
def get_stock_data():
"""
获取A股515450和美股QLD近五年的收盘价数据,以及汇率数据
"""
print("=" * 80)
print("获取近五年A股515450与美股QLD数据")
print("=" * 80)
all_data = {}
try:
# 获取A股515450数据
print("\n1. 获取A股515450数据...")
a_stock_price = get_a_stock_data('515450', years=5)
a_stock_price.name = 'A股_515450'
all_data['A股_515450'] = a_stock_price
# 获取美股QLD数据
print("\n2. 获取美股QLD数据...")
us_stock_price = get_us_stock_data('QLD', years=5)
us_stock_price.name = '美股_QLD'
all_data['美股_QLD'] = us_stock_price
# 获取汇率数据
print("\n3. 获取汇率数据...")
exchange_rate = get_exchange_rate_data(years=5)
all_data['汇率_USD_CNY'] = exchange_rate
# 创建空的DataFrame
combined_data = pd.DataFrame()
# 逐个添加数据
for name, data in all_data.items():
temp_df = pd.DataFrame({name: data})
if combined_data.empty:
combined_data = temp_df
else:
combined_data = combined_data.merge(temp_df,
left_index=True,
right_index=True,
how='outer')
# 按日期排序
combined_data = combined_data.sort_index()
# 处理缺失值 - 前向填充和后向填充
combined_data = combined_data.ffill().bfill()
combined_data = combined_data.dropna()
except Exception as e:
print(f"\n数据获取失败: {e}")
raise
if combined_data.empty:
raise ValueError("数据合并后为空,请检查数据源")
# 计算实际的时间跨度
start_date = combined_data.index[0]
end_date = combined_data.index[-1]
actual_days = (end_date - start_date).days
actual_years = actual_days / 365.25
print(f"\n数据获取完成!")
print(f"总数据行数: {len(combined_data)}")
print(f"实际时间范围: {start_date.date()}{end_date.date()}")
print(f"实际时间跨度: {actual_days} 天 ({actual_years:.2f} 年)")
print(f"可用数据列: {list(combined_data.columns)}")
return combined_data
def get_user_weights(asset_names):
"""
获取用户输入的投资权重
"""
print("\n" + "=" * 60)
print("投资组合权重配置")
print("=" * 60)
# 过滤掉汇率列
filtered_assets = [asset for asset in asset_names if '汇率' not in asset]
print(f"可用资产: {', '.join(filtered_assets)}")
print("请输入每个资产的权重百分比(总和应为100%")
weights = {}
while True:
try:
total_weight = 0
for i, asset in enumerate(filtered_assets):
if i == 0:
prompt = f"请输入 {asset} 的权重百分比 (0-100): "
else:
remaining = 100 - total_weight
prompt = f"请输入 {asset} 的权重百分比 (剩余 {remaining}%): "
weight_input = input(prompt).strip()
if not weight_input:
raise ValueError("请输入一个有效的数字")
weight = float(weight_input)
if weight < 0 or weight > 100:
raise ValueError("权重必须在 0 到 100 之间")
if total_weight + weight > 100:
raise ValueError(f"权重总和不能超过100%,当前已输入: {total_weight}%")
weights[asset] = weight / 100
total_weight += weight
# 检查权重总和
if abs(total_weight - 100) < 0.01:
break
elif total_weight < 100:
response = input(
f"\n权重总和为 {total_weight}%,未达到100%。是否将剩余的 {100 - total_weight}% 分配给最后一个资产? (y/n): ").lower()
if response == 'y':
last_asset = filtered_assets[-1]
weights[last_asset] += (100 - total_weight) / 100
break
else:
print("请重新配置权重")
weights = {}
continue
else:
print(f"权重总和 {total_weight}% 超过100%,请重新配置")
weights = {}
continue
except ValueError as e:
print(f"输入错误: {e}")
print("请重新输入")
weights = {}
# 显示配置结果
print("\n投资组合配置结果:")
print("-" * 40)
for asset, weight in weights.items():
print(f"{asset}: {weight * 100:.1f}%")
# 转换为列表格式
weight_list = [weights[asset] for asset in filtered_assets]
return weight_list, filtered_assets
def get_investment_plan():
"""
获取用户的投资计划
"""
print("\n" + "=" * 60)
print("投资计划配置")
print("=" * 60)
while True:
try:
# 获取初始本金
principal_input = input("请输入初始本金金额(人民币): ").strip()
if not principal_input:
principal = 10000.0
print(f"使用默认本金: {principal:.2f}")
else:
principal = float(principal_input)
if principal <= 0:
raise ValueError("本金必须大于0")
# 获取每月定投金额
monthly_input = input("请输入每月定投金额(人民币,输入0表示不定投): ").strip()
if not monthly_input:
monthly_investment = 0.0
print(f"使用默认值: 无每月定投")
else:
monthly_investment = float(monthly_input)
if monthly_investment < 0:
raise ValueError("每月投资金额不能为负数")
# 获取投资起始日期
start_date_input = input("请输入投资起始日期(YYYY-MM-DD,留空使用数据起始日期): ").strip()
if not start_date_input:
start_date = None
print("使用数据起始日期")
else:
start_date = pd.to_datetime(start_date_input)
break
except ValueError as e:
print(f"输入错误: {e}")
print("请重新输入")
except Exception as e:
print(f"输入错误: {e}")
print("请重新输入")
return {
'initial_principal': principal,
'monthly_investment': monthly_investment,
'start_date': start_date
}
def calculate_investment_simulation(prices, weights, asset_names, investment_plan):
"""
计算投资模拟结果(完全重构版)- 正确计算份额和收益
"""
print("\n计算投资模拟结果...")
# 提取股票价格和汇率
stock_prices = prices[asset_names].copy()
exchange_rate = prices['汇率_USD_CNY'].copy()
# 确定投资起始日期
if investment_plan['start_date']:
start_date = pd.Timestamp(investment_plan['start_date'])
if start_date < stock_prices.index[0]:
print(f"警告: 指定起始日期 {start_date.date()} 早于数据起始日期 {stock_prices.index[0].date()}")
print(f"将使用数据起始日期: {stock_prices.index[0].date()}")
start_date = stock_prices.index[0]
else:
start_date = stock_prices.index[0]
# 筛选投资期间的数据
investment_prices = stock_prices[stock_prices.index >= start_date].copy()
investment_exchange = exchange_rate[exchange_rate.index >= start_date].copy()
if investment_prices.empty:
raise ValueError("投资起始日期后无有效数据")
print(f"投资起始日期: {start_date.date()}")
print(f"投资结束日期: {investment_prices.index[-1].date()}")
print(f"投资天数: {len(investment_prices)}")
# === 重新设计:简化但正确的投资模拟 ===
# 1. 计算每日投资组合的人民币价值
portfolio_daily_value_cny = pd.Series(0.0, index=investment_prices.index)
for i, asset in enumerate(asset_names):
asset_weight = weights[i]
if '美股' in asset:
# 美股:美元价格 × 汇率 = 人民币价格
usd_price = investment_prices[asset]
# 对齐汇率和价格的索引
aligned_exchange = investment_exchange.reindex(usd_price.index).ffill().bfill()
cny_price = usd_price * aligned_exchange
portfolio_daily_value_cny += cny_price * asset_weight
else:
# A股:直接使用人民币价格
portfolio_daily_value_cny += investment_prices[asset] * asset_weight
print(f"投资组合初始价值(理论): {portfolio_daily_value_cny.iloc[0]:.2f}")
print(f"投资组合最终价值(理论): {portfolio_daily_value_cny.iloc[-1]:.2f}")
print(f"投资组合总增长率: {portfolio_daily_value_cny.iloc[-1] / portfolio_daily_value_cny.iloc[0]:.4f}")
# 2. 计算投资组合的日收益率
portfolio_daily_returns = portfolio_daily_value_cny.pct_change().fillna(0)
# 3. 模拟实际投资过程(更直观的方法)
initial_principal = investment_plan['initial_principal']
monthly_investment = investment_plan['monthly_investment']
# 初始化投资记录
investment_records = pd.DataFrame(index=investment_prices.index)
investment_records['投资组合价值'] = portfolio_daily_value_cny
investment_records['投资组合日收益率'] = portfolio_daily_returns
investment_records['现金投入_累计'] = 0.0
investment_records['持有资产价值'] = 0.0
investment_records['累计收益率'] = 0.0
# 4. 处理初始投资
first_date = investment_records.index[0]
# 假设初始投资时购买1单位投资组合
initial_investment_ratio = initial_principal / portfolio_daily_value_cny.iloc[0]
investment_records.loc[first_date, '现金投入_累计'] = initial_principal
investment_records.loc[first_date, '持有资产价值'] = initial_principal # 初始时资产价值等于投入本金
# 5. 处理每月定投
cash_investments = {first_date: initial_principal} # 记录每笔现金投入
if monthly_investment > 0:
# 找到每月第一个交易日进行定投
all_dates = investment_records.index
# 按月份分组,取每月的第一个交易日
monthly_first_dates = []
current_month = None
for date in all_dates:
month_key = (date.year, date.month)
if month_key != current_month:
monthly_first_dates.append(date)
current_month = month_key
# 排除第一个月(已经处理了初始投资)
if monthly_first_dates and monthly_first_dates[0] == first_date:
monthly_first_dates = monthly_first_dates[1:]
print(f"定投月份数(除首月): {len(monthly_first_dates)}")
for invest_date in monthly_first_dates:
cash_investments[invest_date] = monthly_investment
# 6. 计算每日的资产价值
# 先按时间顺序处理所有投资
sorted_dates = sorted(cash_investments.keys())
# 初始化变量
total_cash_invested = 0
current_asset_value = 0
for i, date in enumerate(investment_records.index):
# 检查今天是否有现金投入
if date in cash_investments:
cash_today = cash_investments[date]
total_cash_invested += cash_today
# 如果有现有资产,先计算到今天的增长
if i > 0 and current_asset_value > 0:
prev_date = investment_records.index[i - 1]
days_return = portfolio_daily_value_cny.iloc[i] / portfolio_daily_value_cny.iloc[i - 1] - 1
current_asset_value *= (1 + days_return)
# 新投入的现金按当前投资组合价值折算成资产
investment_ratio_today = cash_today / portfolio_daily_value_cny.iloc[i]
current_asset_value += cash_today # 简化:直接增加资产价值
# 记录
investment_records.loc[date, '现金投入_累计'] = total_cash_invested
investment_records.loc[date, '持有资产价值'] = current_asset_value
# 如果是当天有投入,需要重新计算收益率
if current_asset_value > 0 and total_cash_invested > 0:
investment_records.loc[date, '累计收益率'] = (current_asset_value / total_cash_invested - 1)
else:
# 没有现金投入的日子,只计算资产价值变化
if i > 0:
# 计算从昨天到今天投资组合的增长
prev_date = investment_records.index[i - 1]
days_return = portfolio_daily_value_cny.iloc[i] / portfolio_daily_value_cny.iloc[i - 1] - 1
# 更新资产价值
if current_asset_value > 0:
current_asset_value *= (1 + days_return)
# 记录
investment_records.loc[date, '现金投入_累计'] = total_cash_invested
investment_records.loc[date, '持有资产价值'] = current_asset_value
# 计算累计收益率
if current_asset_value > 0 and total_cash_invested > 0:
investment_records.loc[date, '累计收益率'] = (current_asset_value / total_cash_invested - 1)
# 7. 前向填充缺失值
investment_records['现金投入_累计'] = investment_records['现金投入_累计'].ffill()
investment_records['持有资产价值'] = investment_records['持有资产价值'].ffill()
investment_records['累计收益率'] = investment_records['累计收益率'].ffill()
# 8. 计算汇总指标
total_invested = investment_records['现金投入_累计'].iloc[-1]
final_value = investment_records['持有资产价值'].iloc[-1]
total_profit = final_value - total_invested
total_return_pct = (final_value / total_invested - 1) * 100 if total_invested > 0 else 0
# 计算年化收益率
days_invested = (investment_records.index[-1] - investment_records.index[0]).days
years_invested = days_invested / 365.25
if years_invested > 0 and total_invested > 0:
annualized_return = ((final_value / total_invested) ** (1 / years_invested) - 1) * 100
else:
annualized_return = 0
# 计算各资产的投资金额和收益
asset_investment = {}
asset_final_value = {}
asset_returns = {}
for i, asset in enumerate(asset_names):
asset_weight = weights[i]
asset_investment[asset] = total_invested * asset_weight
# 计算该资产的最终价值
if '美股' in asset:
# 美股:需要考虑汇率变化和资产本身收益
initial_price = investment_prices[asset].iloc[0]
final_price = investment_prices[asset].iloc[-1]
initial_exchange = investment_exchange.iloc[0]
final_exchange = investment_exchange.iloc[-1]
# 美元计价的收益率
usd_return = final_price / initial_price - 1
# 汇率变化
exchange_return = final_exchange / initial_exchange - 1
# 人民币计价的综合收益率
total_return = (1 + usd_return) * (1 + exchange_return) - 1
asset_final_value[asset] = asset_investment[asset] * (1 + total_return)
asset_returns[asset] = total_return * 100
print(
f" {asset}: 美元收益={usd_return * 100:.1f}%, 汇率收益={exchange_return * 100:.1f}%, 综合收益={total_return * 100:.1f}%")
else:
# A股:直接计算人民币收益
initial_price = investment_prices[asset].iloc[0]
final_price = investment_prices[asset].iloc[-1]
asset_return = final_price / initial_price - 1
asset_final_value[asset] = asset_investment[asset] * (1 + asset_return)
asset_returns[asset] = asset_return * 100
print(f" {asset}: 人民币收益={asset_return * 100:.1f}%")
# 汇总结果
simulation_results = {
'investment_records': investment_records,
'summary': {
'初始本金_元': initial_principal,
'每月定投_元': monthly_investment,
'总投资额_元': total_invested,
'最终资产总值_元': final_value,
'总收益_元': total_profit,
'总收益率_%': total_return_pct,
'投资天数': days_invested,
'投资年数': years_invested,
'年化收益率_%': annualized_return,
'投资起始日期': start_date.date(),
'投资结束日期': investment_records.index[-1].date(),
'资产配置': {asset: f"{weights[i] * 100:.1f}%" for i, asset in enumerate(asset_names)},
'各资产投资金额_元': asset_investment,
'各资产最终价值_元': asset_final_value,
'各资产收益率_%': asset_returns,
'投资组合增长倍数': portfolio_daily_value_cny.iloc[-1] / portfolio_daily_value_cny.iloc[0],
'现金投资笔数': len(cash_investments)
}
}
# 打印详细的投资模拟结果
print(f"\n投资模拟结果详情:")
print(f"- 初始本金: {initial_principal:.2f}")
print(f"- 每月定投: {monthly_investment:.2f}")
print(f"- 总投资额: {total_invested:.2f}")
print(f"- 最终资产: {final_value:.2f}")
print(f"- 总收益: {total_profit:.2f}")
print(f"- 总收益率: {total_return_pct:.1f}%")
print(f"- 年化收益: {annualized_return:.1f}%")
print(f"- 投资天数: {days_invested}")
print(f"- 投资年数: {years_invested:.2f}")
print(f"- 现金投资笔数: {len(cash_investments)}")
print(f"- 投资组合理论增长: {portfolio_daily_value_cny.iloc[-1] / portfolio_daily_value_cny.iloc[0]:.4f}")
return simulation_results
def save_investment_simulation_results(simulation_results, output_dir):
"""
保存投资模拟结果
"""
print("保存投资模拟结果...")
# 1. 保存详细投资记录
records_df = simulation_results['investment_records']
records_file = os.path.join(output_dir, 'tables', 'investment_simulation_details.csv')
records_df.to_csv(records_file, encoding='utf-8-sig')
print(f"✓ 详细投资记录已保存: tables/investment_simulation_details.csv")
# 2. 保存投资汇总
summary = simulation_results['summary']
summary_data = []
summary_data.append({'指标': '初始本金', '数值': f"{summary['初始本金_元']:.2f}"})
summary_data.append({'指标': '每月定投金额', '数值': f"{summary['每月定投_元']:.2f}"})
summary_data.append({'指标': '总投资金额', '数值': f"{summary['总投资额_元']:.2f}"})
summary_data.append({'指标': '最终资产总值', '数值': f"{summary['最终资产总值_元']:.2f}"})
summary_data.append({'指标': '总收益', '数值': f"{summary['总收益_元']:.2f}"})
summary_data.append({'指标': '总收益率', '数值': f"{summary['总收益率_%']:.2f}%"})
summary_data.append({'指标': '投资天数', '数值': f"{summary['投资天数']}"})
summary_data.append({'指标': '投资年数', '数值': f"{summary['投资年数']:.2f}"})
summary_data.append({'指标': '年化收益率', '数值': f"{summary['年化收益率_%']:.2f}%"})
summary_data.append({'指标': '投资起始日期', '数值': str(summary['投资起始日期'])})
summary_data.append({'指标': '投资结束日期', '数值': str(summary['投资结束日期'])})
summary_data.append({'指标': '投资组合增长倍数', '数值': f"{summary['投资组合增长倍数']:.4f}"})
summary_data.append({'指标': '现金投资笔数', '数值': f"{summary['现金投资笔数']}"})
summary_df = pd.DataFrame(summary_data)
summary_file = os.path.join(output_dir, 'tables', 'investment_simulation_summary.csv')
summary_df.to_csv(summary_file, encoding='utf-8-sig', index=False)
print(f"✓ 投资模拟汇总已保存: tables/investment_simulation_summary.csv")
# 3. 保存各资产明细
asset_details = []
for asset, weight in summary['资产配置'].items():
asset_details.append({
'资产名称': asset,
'配置权重': weight,
'投资金额(元)': f"{summary['各资产投资金额_元'][asset]:.2f}",
'最终价值(元)': f"{summary['各资产最终价值_元'][asset]:.2f}",
'资产收益(元)': f"{summary['各资产最终价值_元'][asset] - summary['各资产投资金额_元'][asset]:.2f}",
'资产收益率(%)': f"{summary['各资产收益率_%'][asset]:.2f}"
})
asset_details_df = pd.DataFrame(asset_details)
asset_file = os.path.join(output_dir, 'tables', 'investment_asset_details.csv')
asset_details_df.to_csv(asset_file, encoding='utf-8-sig', index=False)
print(f"✓ 各资产投资明细已保存: tables/investment_asset_details.csv")
return summary_df, asset_details_df
def create_investment_charts(simulation_results, output_dir):
"""
创建投资模拟图表
"""
print("生成投资模拟图表...")
investment_records = simulation_results['investment_records']
summary = simulation_results['summary']
# 图表1: 资产增长曲线
plt.figure(figsize=(14, 8))
plt.plot(investment_records.index, investment_records['持有资产价值'] / 10000,
label='资产总值(万元)', color='#2e8b57', linewidth=2.5)
plt.plot(investment_records.index, investment_records['现金投入_累计'] / 10000,
label='总投资额(万元)', color='#4169e1', linewidth=2.5, linestyle='--')
plt.title(
f'投资资产增长曲线\n最终资产: {summary["最终资产总值_元"] / 10000:.2f}万元 | 总收益: {summary["总收益_元"] / 10000:.2f}万元',
fontsize=14, fontweight='bold')
plt.ylabel('金额 (万元)', fontsize=12)
plt.xlabel('日期', fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
growth_chart_file = os.path.join(output_dir, 'charts', '08_investment_growth.png')
plt.savefig(growth_chart_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 资产增长曲线已保存: charts/08_investment_growth.png")
# 图表2: 累计收益率曲线
plt.figure(figsize=(14, 8))
plt.plot(investment_records.index, investment_records['累计收益率'] * 100,
color='#ff7f0e', linewidth=2.5)
plt.axhline(y=0, color='black', linestyle='-', linewidth=0.5)
plt.fill_between(investment_records.index, 0, investment_records['累计收益率'] * 100,
where=investment_records['累计收益率'] * 100 >= 0,
color='#ff7f0e', alpha=0.3)
plt.title(f'投资累计收益率\n总收益率: {summary["总收益率_%"]:.2f}% | 年化收益率: {summary["年化收益率_%"]:.2f}%',
fontsize=14, fontweight='bold')
plt.ylabel('累计收益率 (%)', fontsize=12)
plt.xlabel('日期', fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()
returns_chart_file = os.path.join(output_dir, 'charts', '09_investment_returns.png')
plt.savefig(returns_chart_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 累计收益率曲线已保存: charts/09_investment_returns.png")
# 图表3: 投资进度图(如果每月定投)
if summary['每月定投_元'] > 0:
plt.figure(figsize=(14, 8))
# 提取每月第一天的数据
monthly_data = investment_records.resample('MS').first()
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(14, 12))
# 子图1: 每月投资额和资产总值
ax1.bar(monthly_data.index, monthly_data['现金投入_累计'] / 10000,
alpha=0.6, color='#4169e1', label='累计投资额(万元)')
ax1.plot(monthly_data.index, monthly_data['持有资产价值'] / 10000,
color='#2e8b57', linewidth=2.5, label='资产总值(万元)')
ax1.set_title('月度投资进度', fontsize=13, fontweight='bold')
ax1.set_ylabel('金额 (万元)', fontsize=11)
ax1.legend(fontsize=10)
ax1.grid(True, alpha=0.3)
# 子图2: 每月收益率
monthly_returns = monthly_data['累计收益率'].diff() * 100
colors = ['#2ecc71' if x >= 0 else '#e74c3c' for x in monthly_returns]
ax2.bar(monthly_data.index, monthly_returns, color=colors, alpha=0.7)
ax2.set_title('月度收益率', fontsize=13, fontweight='bold')
ax2.set_ylabel('月收益率 (%)', fontsize=11)
ax2.set_xlabel('日期', fontsize=11)
ax2.grid(True, alpha=0.3)
plt.tight_layout()
progress_chart_file = os.path.join(output_dir, 'charts', '10_investment_progress.png')
plt.savefig(progress_chart_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 投资进度图已保存: charts/10_investment_progress.png")
def calculate_portfolio_performance(prices, weights, asset_names):
"""
计算投资组合收益和指标
"""
print("\n计算投资组合表现...")
# 确保数据按日期排序
stock_prices = prices[asset_names].copy()
stock_prices = stock_prices.sort_index()
# 计算日收益率
returns = stock_prices.pct_change().dropna()
print(f"可用交易日数: {len(returns)}")
if len(returns) < 50:
print(f"注意: 只有 {len(returns)} 个交易日,但会继续分析")
# 计算投资组合收益率
portfolio_returns = pd.Series(0.0, index=returns.index)
for i, asset in enumerate(asset_names):
portfolio_returns += returns[asset] * weights[i]
# 计算累计收益率
cumulative_returns = (1 + portfolio_returns).cumprod() - 1
# 计算最大回撤
cumulative_value = (1 + portfolio_returns).cumprod()
running_max = cumulative_value.expanding().max()
drawdown = (cumulative_value - running_max) / running_max
max_drawdown = drawdown.min()
# 计算时间跨度
start_date = stock_prices.index[0]
end_date = stock_prices.index[-1]
total_days = (end_date - start_date).days
total_years = total_days / 365.25
# 计算年化收益率
if total_years > 0:
final_cumulative_return = cumulative_returns.iloc[-1]
if final_cumulative_return >= -1:
annual_return = (1 + final_cumulative_return) ** (1 / total_years) - 1
else:
annual_return = -1.0
else:
annual_return = 0.0
# 年化波动率
if len(portfolio_returns) >= 252:
annual_volatility = portfolio_returns.std() * np.sqrt(252)
else:
trading_days = len(portfolio_returns)
annual_volatility = portfolio_returns.std() * np.sqrt(252 * (trading_days / 252))
# 夏普比率
risk_free_rate = 0.02
if portfolio_returns.std() > 0:
excess_return = annual_return - risk_free_rate
sharpe_ratio = excess_return / annual_volatility if annual_volatility > 0 else 0
else:
sharpe_ratio = 0.0
# 索提诺比率
downside_returns = portfolio_returns[portfolio_returns < 0]
downside_std = downside_returns.std() if len(downside_returns) > 0 else 0
if downside_std > 0:
annual_downside_risk = downside_std * np.sqrt(252)
sortino_ratio = (annual_return - risk_free_rate) / annual_downside_risk
else:
sortino_ratio = 0.0
# 其他统计指标
positive_days = (portfolio_returns > 0).sum()
negative_days = (portfolio_returns < 0).sum()
win_rate = positive_days / len(portfolio_returns) if len(portfolio_returns) > 0 else 0
# 收集指标
metrics = {
'总年数': total_years,
'交易日数': len(portfolio_returns),
'年化收益率': annual_return,
'年化波动率': annual_volatility,
'夏普比率': sharpe_ratio,
'最大回撤': max_drawdown,
'索提诺比率': sortino_ratio,
'累计收益率': cumulative_returns.iloc[-1],
'起始日期': str(stock_prices.index[0].date()),
'结束日期': str(stock_prices.index[-1].date()),
'正收益天数': positive_days,
'负收益天数': negative_days,
'胜率': win_rate,
'平均日收益': portfolio_returns.mean(),
'收益标准差': portfolio_returns.std(),
'年化下行风险': annual_downside_risk if 'annual_downside_risk' in locals() else 0
}
return portfolio_returns, cumulative_returns, drawdown, metrics, returns
def save_stock_price_table(prices, asset_names, output_dir):
"""
保存股票价格表格到CSV文件
"""
print("生成股票价格表格...")
stock_prices = prices[asset_names].copy()
# 1. 保存原始价格数据
price_file = os.path.join(output_dir, 'tables', 'stock_prices_detailed.csv')
stock_prices.to_csv(price_file, encoding='utf-8-sig')
print(f"✓ 详细价格数据已保存: tables/stock_prices_detailed.csv")
# 2. 创建汇总表格
summary_data = []
for asset in asset_names:
asset_data = stock_prices[asset]
# 基础信息
start_price = asset_data.iloc[0]
end_price = asset_data.iloc[-1]
max_price = asset_data.max()
min_price = asset_data.min()
avg_price = asset_data.mean()
std_price = asset_data.std()
# 收益率计算
total_return = (end_price - start_price) / start_price * 100
days = (stock_prices.index[-1] - stock_prices.index[0]).days
years = days / 365.25
if years > 0:
annual_return = ((1 + total_return / 100) ** (1 / years) - 1) * 100
else:
annual_return = 0
summary_data.append({
'资产名称': asset,
'起始价格': start_price,
'结束价格': end_price,
'总收益率%': total_return,
'年化收益率%': annual_return,
'最高价格': max_price,
'最低价格': min_price,
'平均价格': avg_price,
'价格标准差': std_price,
'数据点数': len(asset_data),
'起始日期': stock_prices.index[0].date(),
'结束日期': stock_prices.index[-1].date()
})
# 创建DataFrame并保存
summary_df = pd.DataFrame(summary_data)
summary_file = os.path.join(output_dir, 'tables', 'stock_prices_summary.csv')
summary_df.to_csv(summary_file, encoding='utf-8-sig', index=False)
print(f"✓ 价格汇总数据已保存: tables/stock_prices_summary.csv")
return summary_df
def save_returns_table(prices, asset_names, returns, portfolio_returns, cumulative_returns, output_dir):
"""
保存收益率表格到CSV文件
"""
print("生成收益率表格...")
# 1. 保存日收益率数据
daily_returns_file = os.path.join(output_dir, 'tables', 'daily_returns.csv')
returns.to_csv(daily_returns_file, encoding='utf-8-sig')
print(f"✓ 日收益率数据已保存: tables/daily_returns.csv")
# 2. 创建各资产收益率统计表格
asset_returns_summary = []
for asset in returns.columns:
asset_ret = returns[asset]
# 计算各项统计
mean_return = asset_ret.mean() * 100
std_return = asset_ret.std() * 100
max_return = asset_ret.max() * 100
min_return = asset_ret.min() * 100
positive_days = (asset_ret > 0).sum()
total_days = len(asset_ret)
win_rate = positive_days / total_days * 100
# 计算累计收益率
cumulative_ret = (1 + asset_ret).cumprod() - 1
total_cumulative = cumulative_ret.iloc[-1] * 100
asset_returns_summary.append({
'资产': asset,
'日均收益率%': mean_return,
'日收益率标准差%': std_return,
'单日最大收益%': max_return,
'单日最大亏损%': min_return,
'累计收益率%': total_cumulative,
'正收益天数': positive_days,
'总天数': total_days,
'胜率%': win_rate
})
# 添加投资组合收益率统计
port_mean_return = portfolio_returns.mean() * 100
port_std_return = portfolio_returns.std() * 100
port_max_return = portfolio_returns.max() * 100
port_min_return = portfolio_returns.min() * 100
port_positive_days = (portfolio_returns > 0).sum()
port_total_days = len(portfolio_returns)
port_win_rate = port_positive_days / port_total_days * 100
port_total_cumulative = cumulative_returns.iloc[-1] * 100
asset_returns_summary.append({
'资产': '投资组合',
'日均收益率%': port_mean_return,
'日收益率标准差%': port_std_return,
'单日最大收益%': port_max_return,
'单日最大亏损%': port_min_return,
'累计收益率%': port_total_cumulative,
'正收益天数': port_positive_days,
'总天数': port_total_days,
'胜率%': port_win_rate
})
# 保存收益率汇总
returns_summary_df = pd.DataFrame(asset_returns_summary)
returns_summary_file = os.path.join(output_dir, 'tables', 'returns_summary.csv')
returns_summary_df.to_csv(returns_summary_file, encoding='utf-8-sig', index=False)
print(f"✓ 收益率汇总数据已保存: tables/returns_summary.csv")
# 3. 保存月度收益率数据
all_assets = list(returns.columns) + ['投资组合']
all_returns = pd.concat([returns, portfolio_returns.rename('投资组合')], axis=1)
# 计算月度收益率
monthly_returns = all_returns.resample('M').apply(lambda x: (1 + x).prod() - 1)
monthly_returns_file = os.path.join(output_dir, 'tables', 'monthly_returns.csv')
monthly_returns.to_csv(monthly_returns_file, encoding='utf-8-sig')
print(f"✓ 月度收益率数据已保存: tables/monthly_returns.csv")
# 4. 保存年度收益率数据
yearly_returns = all_returns.resample('Y').apply(lambda x: (1 + x).prod() - 1)
yearly_returns_file = os.path.join(output_dir, 'tables', 'yearly_returns.csv')
yearly_returns.to_csv(yearly_returns_file, encoding='utf-8-sig')
print(f"✓ 年度收益率数据已保存: tables/yearly_returns.csv")
return returns_summary_df
def save_portfolio_summary_table(prices, asset_names, weights, portfolio_returns, cumulative_returns, metrics, returns,
output_dir):
"""
保存投资组合总结表格到CSV文件
"""
print("生成投资组合总结表格...")
stock_prices = prices[asset_names].copy()
# 1. 投资组合配置表格
config_data = []
total_investment = 10000 # 假设初始投资10000元
for i, asset in enumerate(asset_names):
weight_pct = weights[i] * 100
allocation = total_investment * weights[i]
# 计算该资产的表现
asset_ret = returns[asset]
asset_cumulative = (1 + asset_ret).cumprod() - 1
asset_total_return = asset_cumulative.iloc[-1] * 100
asset_annual_vol = asset_ret.std() * np.sqrt(252) * 100
config_data.append({
'资产': asset,
'权重%': weight_pct,
'配置金额(假设¥10,000)': allocation,
'累计收益%': asset_total_return,
'年化波动率%': asset_annual_vol
})
config_df = pd.DataFrame(config_data)
config_file = os.path.join(output_dir, 'tables', 'portfolio_configuration.csv')
config_df.to_csv(config_file, encoding='utf-8-sig', index=False)
print(f"✓ 投资组合配置表格已保存: tables/portfolio_configuration.csv")
# 2. 投资组合绩效指标表格
performance_data = [
{'指标类别': '时间信息', '指标名称': '分析期间', '数值': f"{metrics['起始日期']}{metrics['结束日期']}"},
{'指标类别': '时间信息', '指标名称': '总交易日数', '数值': f"{metrics['交易日数']}"},
{'指标类别': '时间信息', '指标名称': '分析周期', '数值': f"{metrics['总年数']:.2f}"},
{'指标类别': '收益指标', '指标名称': '累计收益率', '数值': f"{metrics['累计收益率'] * 100:.2f}%"},
{'指标类别': '收益指标', '指标名称': '年化收益率', '数值': f"{metrics['年化收益率'] * 100:.2f}%"},
{'指标类别': '收益指标', '指标名称': '日均收益率', '数值': f"{metrics['平均日收益'] * 100:.4f}%"},
{'指标类别': '收益指标', '指标名称': '正收益天数', '数值': f"{metrics['正收益天数']}"},
{'指标类别': '收益指标', '指标名称': '胜率', '数值': f"{metrics['胜率'] * 100:.1f}%"},
{'指标类别': '风险指标', '指标名称': '年化波动率', '数值': f"{metrics['年化波动率'] * 100:.2f}%"},
{'指标类别': '风险指标', '指标名称': '最大回撤', '数值': f"{metrics['最大回撤'] * 100:.2f}%"},
{'指标类别': '风险指标', '指标名称': '年化下行风险', '数值': f"{metrics['年化下行风险'] * 100:.2f}%"},
{'指标类别': '风险指标', '指标名称': '日收益率标准差', '数值': f"{metrics['收益标准差'] * 100:.4f}%"},
{'指标类别': '风险调整后收益', '指标名称': '夏普比率', '数值': f"{metrics['夏普比率']:.3f}"},
{'指标类别': '风险调整后收益', '指标名称': '索提诺比率', '数值': f"{metrics['索提诺比率']:.3f}"}
]
performance_df = pd.DataFrame(performance_data)
performance_file = os.path.join(output_dir, 'tables', 'portfolio_performance.csv')
performance_df.to_csv(performance_file, encoding='utf-8-sig', index=False)
print(f"✓ 投资组合绩效指标表格已保存: tables/portfolio_performance.csv")
# 3. 回撤分析表格
cumulative_value = (1 + portfolio_returns).cumprod()
running_max = cumulative_value.expanding().max()
drawdown_series = (cumulative_value - running_max) / running_max
# 找到回撤超过5%的事件
drawdown_events = []
in_drawdown = False
current_drawdown = {'start': None, 'trough': None, 'end': None, 'max_dd': 0}
for date, dd in drawdown_series.items():
if dd < -0.05: # 超过5%的回撤
if not in_drawdown:
in_drawdown = True
current_drawdown['start'] = date
current_drawdown['max_dd'] = dd
current_drawdown['trough'] = date
else:
if dd < current_drawdown['max_dd']:
current_drawdown['max_dd'] = dd
current_drawdown['trough'] = date
else:
if in_drawdown:
in_drawdown = False
current_drawdown['end'] = date
# 计算回撤持续天数
duration = (current_drawdown['end'] - current_drawdown['start']).days
drawdown_events.append({
'开始日期': current_drawdown['start'].strftime('%Y-%m-%d'),
'最低点日期': current_drawdown['trough'].strftime('%Y-%m-%d'),
'结束日期': current_drawdown['end'].strftime('%Y-%m-%d'),
'持续天数': duration,
'最大回撤%': current_drawdown['max_dd'] * 100
})
current_drawdown = {'start': None, 'trough': None, 'end': None, 'max_dd': 0}
if drawdown_events:
drawdown_df = pd.DataFrame(drawdown_events)
drawdown_file = os.path.join(output_dir, 'tables', 'drawdown_analysis.csv')
drawdown_df.to_csv(drawdown_file, encoding='utf-8-sig', index=False)
print(f"✓ 回撤分析表格已保存: tables/drawdown_analysis.csv")
else:
print("⚠ 无超过5%的重大回撤事件")
# 4. 相关性分析表格
correlation_matrix = returns.corr()
correlation_file = os.path.join(output_dir, 'tables', 'correlation_matrix.csv')
correlation_matrix.to_csv(correlation_file, encoding='utf-8-sig')
print(f"✓ 相关性矩阵表格已保存: tables/correlation_matrix.csv")
# 5. 投资建议总结
summary_points = []
# 基于收益率评价
if metrics['年化收益率'] > 0.10:
summary_points.append('优秀收益:年化收益率超过10%')
elif metrics['年化收益率'] > 0.05:
summary_points.append('良好收益:年化收益率在5%-10%之间')
elif metrics['年化收益率'] > 0:
summary_points.append('正收益:年化收益率为正')
else:
summary_points.append('负收益:年化收益率为负')
# 基于夏普比率评价
if metrics['夏普比率'] > 1.0:
summary_points.append('优秀风险调整收益:夏普比率超过1.0')
elif metrics['夏普比率'] > 0.5:
summary_points.append('良好风险调整收益:夏普比率在0.5-1.0之间')
elif metrics['夏普比率'] > 0:
summary_points.append('正风险调整收益:夏普比率为正')
else:
summary_points.append('负风险调整收益:夏普比率为负')
# 基于最大回撤评价
if abs(metrics['最大回撤']) < 0.10:
summary_points.append('风险控制优秀:最大回撤小于10%')
elif abs(metrics['最大回撤']) < 0.20:
summary_points.append('风险控制良好:最大回撤在10%-20%之间')
elif abs(metrics['最大回撤']) < 0.30:
summary_points.append('风险较高:最大回撤在20%-30%之间')
else:
summary_points.append('风险很高:最大回撤超过30%')
# 基于胜率评价
if metrics['胜率'] > 0.55:
summary_points.append('高胜率:超过55%的交易日期获得正收益')
elif metrics['胜率'] > 0.45:
summary_points.append('适中胜率:在45%-55%之间')
else:
summary_points.append('低胜率:少于45%的交易日期获得正收益')
# 保存投资建议
summary_df = pd.DataFrame({'投资建议': summary_points})
summary_file = os.path.join(output_dir, 'tables', 'investment_recommendations.csv')
summary_df.to_csv(summary_file, encoding='utf-8-sig', index=False)
print(f"✓ 投资建议表格已保存: tables/investment_recommendations.csv")
return config_df, performance_df
def save_charts(prices, asset_names, portfolio_returns, cumulative_returns, drawdown, metrics, weights, output_dir):
"""
保存分析图表为PNG图片
"""
print("生成分析图表...")
stock_prices = prices[asset_names].copy()
# 图表1: 价格走势对比图
plt.figure(figsize=(14, 8))
colors = ['#1f77b4', '#ff7f0e']
for i, asset in enumerate(asset_names):
normalized = stock_prices[asset] / stock_prices[asset].iloc[0] * 100
plt.plot(stock_prices.index, normalized, label=f"{asset} ({weights[i] * 100:.0f}%)",
color=colors[i], linewidth=2.5)
plt.title(f'归一化价格走势 (基准=100)\n{metrics["起始日期"]}{metrics["结束日期"]}',
fontsize=14, fontweight='bold')
plt.ylabel('价格指数', fontsize=12)
plt.xlabel('日期', fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
price_chart_file = os.path.join(output_dir, 'charts', '01_price_trend.png')
plt.savefig(price_chart_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 价格走势图已保存: charts/01_price_trend.png")
# 图表2: 投资组合累计收益
plt.figure(figsize=(14, 8))
plt.plot(cumulative_returns.index, cumulative_returns * 100,
color='#2e8b57', linewidth=3)
plt.fill_between(cumulative_returns.index, 0, cumulative_returns * 100,
where=cumulative_returns >= 0, color='#2e8b57', alpha=0.3)
plt.title(f'投资组合累计收益率\n年化收益率: {metrics["年化收益率"] * 100:.2f}%',
fontsize=14, fontweight='bold')
plt.ylabel('累计收益 (%)', fontsize=12)
plt.xlabel('日期', fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()
cumulative_chart_file = os.path.join(output_dir, 'charts', '02_cumulative_returns.png')
plt.savefig(cumulative_chart_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 累计收益图已保存: charts/02_cumulative_returns.png")
# 图表3: 回撤分析
plt.figure(figsize=(14, 8))
plt.fill_between(drawdown.index, drawdown * 100, 0,
where=drawdown < 0, color='#ff6b6b', alpha=0.6)
plt.plot(drawdown.index, drawdown * 100, color='#c44d58', linewidth=2)
plt.title(f'投资组合回撤分析\n最大回撤: {metrics["最大回撤"] * 100:.2f}%',
fontsize=14, fontweight='bold')
plt.ylabel('回撤幅度 (%)', fontsize=12)
plt.xlabel('日期', fontsize=12)
plt.grid(True, alpha=0.3)
plt.tight_layout()
drawdown_chart_file = os.path.join(output_dir, 'charts', '03_drawdown_analysis.png')
plt.savefig(drawdown_chart_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 回撤分析图已保存: charts/03_drawdown_analysis.png")
# 图表4: 日收益率分布
plt.figure(figsize=(14, 8))
returns_pct = portfolio_returns * 100
n_bins = min(50, len(returns_pct) // 5)
plt.hist(returns_pct, bins=n_bins, alpha=0.7, color='#4ecdc4',
edgecolor='black', density=True)
plt.axvline(x=returns_pct.mean(), color='red', linestyle='--',
linewidth=2, label=f'均值: {returns_pct.mean():.2f}%')
# 添加正态分布曲线
try:
from scipy.stats import norm
mu, std = returns_pct.mean(), returns_pct.std()
x = np.linspace(returns_pct.min(), returns_pct.max(), 100)
p = norm.pdf(x, mu, std)
plt.plot(x, p, 'k', linewidth=2, label='正态分布')
except:
pass
plt.title('日收益率分布', fontsize=14, fontweight='bold')
plt.xlabel('日收益率 (%)', fontsize=12)
plt.ylabel('概率密度', fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
distribution_chart_file = os.path.join(output_dir, 'charts', '04_returns_distribution.png')
plt.savefig(distribution_chart_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 收益率分布图已保存: charts/04_returns_distribution.png")
# 图表5: 月度收益率热力图(如果数据足够)
if len(portfolio_returns) >= 60:
plt.figure(figsize=(14, 10))
monthly_returns = portfolio_returns.resample('M').apply(lambda x: (1 + x).prod() - 1)
if len(monthly_returns) > 0:
years = sorted(set(monthly_returns.index.year))
monthly_matrix = pd.DataFrame(index=years, columns=range(1, 13))
for year in years:
for month in range(1, 13):
month_data = monthly_returns[(monthly_returns.index.year == year) &
(monthly_returns.index.month == month)]
if not month_data.empty:
monthly_matrix.loc[year, month] = month_data.iloc[0]
monthly_matrix_numeric = monthly_matrix.astype(float) * 100
plt.imshow(monthly_matrix_numeric.values, cmap='RdYlGn',
aspect='auto', alpha=0.9, vmin=-15, vmax=15)
# 添加数值标签
for i in range(len(years)):
for j in range(12):
value = monthly_matrix_numeric.iloc[i, j]
if not np.isnan(value):
color = 'black' if abs(value) < 8 else 'white'
plt.text(j, i, f'{value:.1f}%', ha='center', va='center',
fontsize=9, color=color, fontweight='bold')
plt.xticks(range(12), ['1月', '2月', '3月', '4月', '5月', '6月',
'7月', '8月', '9月', '10月', '11月', '12月'],
rotation=45)
plt.yticks(range(len(years)), years)
plt.colorbar(label='月收益率 (%)')
plt.title('月度收益率热力图', fontsize=14, fontweight='bold')
plt.xlabel('月份', fontsize=12)
plt.ylabel('年份', fontsize=12)
plt.tight_layout()
heatmap_file = os.path.join(output_dir, 'charts', '05_monthly_returns_heatmap.png')
plt.savefig(heatmap_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 月度收益率热力图已保存: charts/05_monthly_returns_heatmap.png")
# 图表6: 年度收益率条形图
if len(portfolio_returns) >= 252:
plt.figure(figsize=(14, 8))
yearly_returns = portfolio_returns.resample('Y').apply(lambda x: (1 + x).prod() - 1)
if len(yearly_returns) > 0:
yearly_returns.index = yearly_returns.index.year
colors_bar = ['#2ecc71' if x >= 0 else '#e74c3c' for x in yearly_returns]
bars = plt.bar(yearly_returns.index.astype(str), yearly_returns.values * 100,
color=colors_bar, edgecolor='black', alpha=0.8)
# 添加数值标签
for i, v in enumerate(yearly_returns.values):
plt.text(i, v * 100 + (0.5 if v >= 0 else -1), f'{v * 100:.1f}%',
ha='center', va='bottom' if v >= 0 else 'top',
fontsize=10, fontweight='bold')
plt.title('年度收益率', fontsize=14, fontweight='bold')
plt.ylabel('收益率 (%)', fontsize=12)
plt.xlabel('年份', fontsize=12)
plt.grid(True, alpha=0.3, axis='y')
plt.tight_layout()
yearly_file = os.path.join(output_dir, 'charts', '06_yearly_returns.png')
plt.savefig(yearly_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 年度收益率图已保存: charts/06_yearly_returns.png")
# 图表7: 滚动收益分析
if len(portfolio_returns) >= 60:
plt.figure(figsize=(14, 8))
window_size = min(60, len(portfolio_returns) // 2)
rolling_return = portfolio_returns.rolling(window=window_size).mean() * 100
plt.plot(rolling_return.index, rolling_return, color='#4169e1', linewidth=2)
plt.axhline(y=portfolio_returns.mean() * 100, color='red',
linestyle='--', linewidth=1.5, label=f'平均: {portfolio_returns.mean() * 100:.2f}%')
plt.title(f'滚动{window_size}日平均收益率', fontsize=14, fontweight='bold')
plt.ylabel('日收益率 (%)', fontsize=12)
plt.xlabel('日期', fontsize=12)
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)
plt.tight_layout()
rolling_file = os.path.join(output_dir, 'charts', '07_rolling_returns.png')
plt.savefig(rolling_file, dpi=300, bbox_inches='tight')
plt.close()
print(f"✓ 滚动收益图已保存: charts/07_rolling_returns.png")
print(f"✓ 所有图表已保存至 {output_dir}/charts/ 目录")
def save_raw_data(prices, asset_names, returns, portfolio_returns, cumulative_returns, drawdown, output_dir):
"""
保存原始数据文件
"""
print("保存原始数据文件...")
stock_prices = prices[asset_names].copy()
# 保存原始价格数据
raw_prices_file = os.path.join(output_dir, 'data', 'raw_prices.csv')
stock_prices.to_csv(raw_prices_file, encoding='utf-8-sig')
print(f"✓ 原始价格数据已保存: data/raw_prices.csv")
# 保存汇率数据
exchange_rate_file = os.path.join(output_dir, 'data', 'exchange_rate.csv')
prices['汇率_USD_CNY'].to_csv(exchange_rate_file, encoding='utf-8-sig')
print(f"✓ 汇率数据已保存: data/exchange_rate.csv")
# 保存收益率数据
raw_returns_file = os.path.join(output_dir, 'data', 'raw_returns.csv')
returns.to_csv(raw_returns_file, encoding='utf-8-sig')
print(f"✓ 原始收益率数据已保存: data/raw_returns.csv")
# 保存投资组合收益率数据
portfolio_data = pd.DataFrame({
'日收益率': portfolio_returns,
'累计收益率': cumulative_returns,
'回撤': drawdown
})
portfolio_file = os.path.join(output_dir, 'data', 'portfolio_performance.csv')
portfolio_data.to_csv(portfolio_file, encoding='utf-8-sig')
print(f"✓ 投资组合表现数据已保存: data/portfolio_performance.csv")
# 创建README文件
readme_content = f"""# 投资组合分析结果
## 分析概要
- 分析资产: A股515450 + 美股QLD
- 分析期间: {stock_prices.index[0].date()}{stock_prices.index[-1].date()}
- 总交易日数: {len(stock_prices)}
- 生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## 目录结构
{output_dir}/
├── tables/ # 各种分析表格(CSV格式)
│ ├── stock_prices_detailed.csv # 详细价格数据
│ ├── stock_prices_summary.csv # 价格汇总统计
│ ├── daily_returns.csv # 日收益率数据
│ ├── returns_summary.csv # 收益率汇总统计
│ ├── monthly_returns.csv # 月度收益率
│ ├── yearly_returns.csv # 年度收益率
│ ├── portfolio_configuration.csv # 投资组合配置
│ ├── portfolio_performance.csv # 绩效指标
│ ├── drawdown_analysis.csv # 回撤分析
│ ├── correlation_matrix.csv # 相关性矩阵
│ ├── investment_recommendations.csv # 投资建议
│ ├── investment_simulation_details.csv # 投资模拟详情
│ ├── investment_simulation_summary.csv # 投资模拟汇总
│ └── investment_asset_details.csv # 各资产投资明细
├── charts/ # 分析图表(PNG格式)
│ ├── 01_price_trend.png # 价格走势图
│ ├── 02_cumulative_returns.png # 累计收益图
│ ├── 03_drawdown_analysis.png # 回撤分析图
│ ├── 04_returns_distribution.png # 收益率分布图
│ ├── 05_monthly_returns_heatmap.png # 月度收益率热力图
│ ├── 06_yearly_returns.png # 年度收益率图
│ ├── 07_rolling_returns.png # 滚动收益图
│ ├── 08_investment_growth.png # 资产增长曲线
│ ├── 09_investment_returns.png # 投资累计收益率
│ └── 10_investment_progress.png # 投资进度图
└── data/ # 原始数据文件
├── raw_prices.csv # 原始价格数据
├── raw_returns.csv # 原始收益率数据
├── portfolio_performance.csv # 投资组合表现数据
└── exchange_rate.csv # 汇率数据
## 使用说明
1. 所有表格均为CSV格式,可用Excel、Python pandas等工具打开
2. 所有图表均为PNG格式,分辨率为300dpi
3. 原始数据可用于进一步分析
4. 投资建议基于历史数据分析,仅供参考
## 注意事项
- 本分析基于历史数据,不代表未来表现
- 投资有风险,决策需谨慎
- 建议结合其他分析工具和个人风险偏好进行投资决策
"""
readme_file = os.path.join(output_dir, 'README.txt')
with open(readme_file, 'w', encoding='utf-8') as f:
f.write(readme_content)
print(f"✓ README文件已生成: README.txt")
def main():
"""
主程序
"""
print("=" * 80)
print("A股515450与美股QLD投资组合分析系统")
print("=" * 80)
# 创建输出目录
output_dir = create_output_directory()
try:
# 1. 获取股票数据
print("\n步骤1: 获取股票数据")
prices = get_stock_data()
# 2. 获取用户输入的权重
print("\n步骤2: 配置投资组合权重")
weights, asset_names = get_user_weights(list(prices.columns))
# 3. 获取投资计划
print("\n步骤3: 配置投资计划")
investment_plan = get_investment_plan()
# 4. 计算投资组合表现
print("\n步骤4: 计算投资组合表现")
portfolio_returns, cumulative_returns, drawdown, metrics, returns = calculate_portfolio_performance(
prices, weights, asset_names
)
# 5. 计算投资模拟
print("\n步骤5: 计算投资模拟")
simulation_results = calculate_investment_simulation(prices, weights, asset_names, investment_plan)
# 6. 保存所有表格到CSV文件
print("\n步骤6: 生成分析表格(CSV格式)")
save_stock_price_table(prices, asset_names, output_dir)
save_returns_table(prices, asset_names, returns, portfolio_returns, cumulative_returns, output_dir)
save_portfolio_summary_table(prices, asset_names, weights, portfolio_returns, cumulative_returns, metrics,
returns,
output_dir)
save_investment_simulation_results(simulation_results, output_dir)
# 7. 保存原始数据
save_raw_data(prices, asset_names, returns, portfolio_returns, cumulative_returns, drawdown, output_dir)
# 8. 生成并保存所有图表为PNG文件
print("\n步骤7: 生成分析图表(PNG格式)")
save_charts(prices, asset_names, portfolio_returns, cumulative_returns, drawdown, metrics, weights, output_dir)
create_investment_charts(simulation_results, output_dir)
# 9. 在终端显示关键结果摘要
print("\n" + "=" * 80)
print("投资组合关键结果摘要")
print("=" * 80)
print(f"分析期间: {metrics['起始日期']}{metrics['结束日期']}")
print(f"投资周期: {metrics['总年数']:.2f}")
print(f"年化收益率: {metrics['年化收益率'] * 100:.2f}%")
print(f"累计收益率: {metrics['累计收益率'] * 100:.2f}%")
print(f"年化波动率: {metrics['年化波动率'] * 100:.2f}%")
print(f"最大回撤: {metrics['最大回撤'] * 100:.2f}%")
print(f"夏普比率: {metrics['夏普比率']:.3f}")
print(f"索提诺比率: {metrics['索提诺比率']:.3f}")
print(f"胜率: {metrics['胜率'] * 100:.1f}%")
# 10. 显示投资模拟结果摘要
summary = simulation_results['summary']
print("\n" + "=" * 80)
print("投资模拟结果摘要")
print("=" * 80)
print(f"初始本金: {summary['初始本金_元']:.2f}")
print(f"每月定投: {summary['每月定投_元']:.2f}")
print(f"总投资额: {summary['总投资额_元']:.2f}")
print(f"最终资产总值: {summary['最终资产总值_元']:.2f}")
print(f"总收益: {summary['总收益_元']:.2f}")
print(f"总收益率: {summary['总收益率_%']:.2f}%")
print(f"年化收益率: {summary['年化收益率_%']:.2f}%")
print(f"投资期间: {summary['投资起始日期']}{summary['投资结束日期']}")
print(f"投资天数: {summary['投资天数']} 天 ({summary['投资年数']:.2f} 年)")
print(f"投资笔数: {summary['现金投资笔数']}")
print(f"投资组合理论增长倍数: {summary['投资组合增长倍数']:.4f}")
print("\n" + "=" * 80)
print(f"分析完成! 所有结果已保存至: {output_dir}/")
print("=" * 80)
print("包含以下内容:")
print("✓ 表格文件 (CSV格式): tables/ 目录")
print("✓ 图表文件 (PNG格式): charts/ 目录")
print("✓ 原始数据: data/ 目录")
print("✓ 说明文件: README.txt")
print("=" * 80)
except KeyboardInterrupt:
print("\n\n程序被用户中断")
return 1
except Exception as e:
print(f"\n错误: {str(e)}")
import traceback
traceback.print_exc()
print("程序退出")
return 1
return 0
if __name__ == "__main__":
# 检查必要库是否安装
required_libs = ['akshare', 'yfinance', 'pandas', 'numpy', 'matplotlib']
missing_libs = []
for lib in required_libs:
try:
__import__(lib)
except ImportError:
missing_libs.append(lib)
if missing_libs:
print(f"错误: 缺少必要的库: {', '.join(missing_libs)}")
print("请运行以下命令安装:")
print(f"pip install {' '.join(missing_libs)}")
if 'akshare' in missing_libs:
print("\n注意: AKShare可能需要额外配置,如果安装失败可以尝试:")
print("pip install akshare --upgrade")
exit(1)
# 运行主程序
print("程序启动 - A股515450与美股QLD投资组合分析")
exit_code = main()
exit(exit_code)