使用 AI 和 Python 进行交易预测

IMG_256

我们探索了预测股票价格走势的各种方法,包括利用Facebook 的 Prophet等预测工具、季节性自回归综合移动平均 (SARIMA) 模型等统计方法、多项式回归等机器学习策略,以及最终的预测方法。 ,一种基于人工智能的循环神经网络(RNN)。

在众多的人工智能模型和技术中,我们发现长短期记忆 (LSTM) 模型能产生最有利的结果。 LSTM 模型是循环神经网络架构的一种变体,擅长处理序列预测挑战。与传统的前馈神经网络相反,LSTM 拥有类似记忆的结构,使其能够跨广泛的序列保存上下文数据。此功能使其特别适合时间序列预测、自然语言处理和其他依赖于序列数据的任务。它通过减轻梯度消失和爆炸问题解决了标准 RNN 的基本缺点,从而促进模型识别数据集中的长期依赖性的能力。因此,LSTM 已成为需要长时间深入理解数据的复杂任务的首选。

为了证明其功效,我们开发了概念验证。

准备步骤:

  • 安装最新的 Python 和 PIP。
  • 使用“ main.py ”文件创建一个Python项目。
  • 在项目中添加一个“ data ”目录。
  • 设置并激活虚拟环境。
1
2
python3 -m venv venv  
source venv/.bin/activate
  • 创建一个“requirements.txt”文件。
1
2
3
4
5
6
7
8
pandas  
numpy
scikit-learn
scipy
matplotlib
tensorflow
eodhd
python-dotenv
  • 确保您已在虚拟环境中升级 PIP 并安装依赖项。
1
2
pip install --upgrade pip  
python3 -m pip install -r requirements.txt

我们已将 EODHD API 的 API 密钥包含到“.env”文件中。

API_TOKEN=<YOUR_API_KEY_GOES_HERE>

一切应该都准备好了。如果您正在使用VSCode并希望使用与我们相同的“ .vscode/settings.json ”文件,就在这里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"python.formatting.provider": "none",
"python.formatting.blackArgs": ["--line-length", "160"],
"python.linting.flake8Args": [
"--max-line-length=160",
"--ignore=E203,E266,E501,W503,F403,F401,C901"
],
"python.analysis.diagnosticSeverityOverrides": {
"reportUnusedImport": "information",
"reportMissingImports": "none"
},
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter"
}
}

这是该项目的GitHub 存储库,以供您需要指导时使用。

构建代码

第一步是您需要导入必要的库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "1"

import pickle
import pandas as pd
import numpy as np
from dotenv import load_dotenv
from sklearn.metrics import mean_squared_error, mean_absolute_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.models import load_model
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from eodhd import APIClient

默认情况下,TensorFlow 通常会生成大量警告和调试详细信息。我们更喜欢更干净、更有序的输出,因此我们抑制这些通知。这是通过导入“os”模块后使用 os.environ 的特定行来实现的。

训练机器学习和人工智能模型的过程需要大量的微调,主要通过所谓的超参数进行管理。这个主题很复杂,掌握它在某种程度上是一种艺术形式。最佳超参数的选择受到多种因素的影响。根据我们通过EODHD API获取的每日 S&P 500 数据,我们开始采用一些广泛认可的设置。我们鼓励您修改这些内容以增强结果。目前,建议将序列长度保持在 20。

1
2
3
4
5
# Configurable hyperparameters
seq_length = 20
batch_size = 64
lstm_units = 50
epochs = 100

下一步涉及从我们的“.env”文件中检索EODHD API 的API_TOKEN。

1
2
3
4
5
6
7
8
9
10
# Load environment variables from the .env file
load_dotenv()

# Retrieve the API key
API_TOKEN = os.getenv("API_TOKEN")

if API_TOKEN is not None:
print(f"API key loaded: {API_TOKEN[:4]}********")
else:
raise LookupError("Failed to load API key.")

确保您拥有有效的 EODHD API 的 API_TOKEN 才能成功访问数据。

我们已经建立了几个可重用的函数,并将在下面进一步使用它们时详细介绍它们的函数。这些函数中包含注释以阐明它们的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def get_ohlc_data(use_cache: bool = False) -> pd.DataFrame:
ohlcv_file = "data/ohlcv.csv"

if use_cache:
if os.path.exists(ohlcv_file):
return pd.read_csv(ohlcv_file, index_col=None)
else:
api = APIClient(API_TOKEN)
df = api.get_historical_data(
symbol="HSPX.LSE",
interval="d",
iso8601_start="2010-05-17",
iso8601_end="2023-10-04",
)
df.to_csv(ohlcv_file, index=False)
return df
else:
api = APIClient(API_TOKEN)
return api.get_historical_data(
symbol="HSPX.LSE",
interval="d",
iso8601_start="2010-05-17",
iso8601_end="2023-10-04",
)

def create_sequences(data, seq_length):
x, y = [], []
for i in range(len(data) - seq_length):
x.append(data[i : i + seq_length])
y.append(data[i + seq_length, 3]) # The prediction target "close" is the 4th column (index 3)
return np.array(x), np.array(y)

def get_features(df: pd.DataFrame = None, feature_columns: list = ["open", "high", "low", "close", "volume"]) -> list:
return df[feature_columns].values

def get_target(df: pd.DataFrame = None, target_column: str = "close") -> list:
return df[target_column].values

def get_scaler(use_cache: bool = True) -> MinMaxScaler:
scaler_file = "data/scaler.pkl"

if use_cache:
if os.path.exists(scaler_file):
# Load the scaler
with open(scaler_file, "rb") as f:
return pickle.load(f)
else:
scaler = MinMaxScaler(feature_range=(0, 1))
with open(scaler_file, "wb") as f:
pickle.dump(scaler, f)
return scaler
else:
return MinMaxScaler(feature_range=(0, 1))

def scale_features(scaler: MinMaxScaler = None, features: list = []):
return scaler.fit_transform(features)

def get_lstm_model(use_cache: bool = False) -> Sequential:
model_file = "data/lstm_model.h5"

if use_cache:
if os.path.exists(model_file):
# Load the model
return load_model(model_file)
else:
# Train the LSTM model and save it
model = Sequential()
model.add(LSTM(units=lstm_units, activation='tanh', input_shape=(seq_length, 5)))
model.add(Dropout(0.2))
model.add(Dense(units=1))

model.compile(optimizer="adam", loss="mean_squared_error")
model.fit(x_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(x_test, y_test))

# Save the entire model to a HDF5 file
model.save(model_file)

return model

else:
# Train the LSTM model
model = Sequential()
model.add(LSTM(units=lstm_units, activation='tanh', input_shape=(seq_length, 5)))
model.add(Dropout(0.2))
model.add(Dense(units=1))

model.compile(optimizer="adam", loss="mean_squared_error")
model.fit(x_train, y_train, epochs=epochs, batch_size=batch_size, validation_data=(x_test, y_test))

return model

def get_predicted_x_test_prices(x_test: np.ndarray = None):
predicted = model.predict(x_test)

# Create a zero-filled matrix to aid in inverse transformation
zero_filled_matrix = np.zeros((predicted.shape[0], 5))

# Replace the 'close' column of zero_filled_matrix with the predicted values
zero_filled_matrix[:, 3] = np.squeeze(predicted)

# Perform inverse transformation
return scaler.inverse_transform(zero_filled_matrix)[:, 3]

def plot_x_test_actual_vs_predicted(actual_close_prices: list = [], predicted_x_test_close_prices = []) -> None:
# Plotting the actual and predicted close prices
plt.figure(figsize=(14, 7))
plt.plot(actual_close_prices, label="Actual Close Prices", color="blue")
plt.plot(predicted_x_test_close_prices, label="Predicted Close Prices", color="red")
plt.title("Actual vs Predicted Close Prices")
plt.xlabel("Time")
plt.ylabel("Price")
plt.legend()
plt.show()

def predict_next_close(df: pd.DataFrame = None, scaler: MinMaxScaler = None) -> float:
# Take the last X days of data and scale it
last_x_days = df.iloc[-seq_length:][["open", "high", "low", "close", "volume"]].values
last_x_days_scaled = scaler.transform(last_x_days)

# Reshape this data to be a single sequence and make the prediction
last_x_days_scaled = np.reshape(last_x_days_scaled, (1, seq_length, 5))

# Predict the future close price
future_close_price = model.predict(last_x_days_scaled)

# Create a zero-filled matrix for the inverse transformation
zero_filled_matrix = np.zeros((1, 5))

# Put the predicted value in the 'close' column (index 3)
zero_filled_matrix[0, 3] = np.squeeze(future_close_price)

# Perform the inverse transformation to get the future price on the original scale
return scaler.inverse_transform(zero_filled_matrix)[0, 3]

def evaluate_model(x_test: list = []) -> None:
# Evaluate the model
y_pred = model.predict(x_test)
mse = mean_squared_error(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mse)

print(f"Mean Squared Error: {mse}")
print(f"Mean Absolute Error: {mae}")
print(f"Root Mean Squared Error: {rmse}")

我们要强调的一个方面是在各种函数中包含“ use_cache ”变量。该策略旨在减少对EODHD API的不必要的 API 调用,并避免使用相同的日常数据对模型进行冗余的重新训练。激活“ use_cache ”变量可以将数据保存到“ data/ ”目录中的文件中。如果数据不存在,则会生成数据;如果已经存在,它将被加载。当脚本多次执行时,这种方法可以显着提高效率。要在每次运行时获取新数据,只需在调用函数时停用“ use_cache ”选项或清除“ data/ ”目录中的文件,即可达到相同的结果。

我们现在继续讨论代码的核心……

1
2
3
4
if __name__ == "__main__":
# Retrieve 3369 days of S&P 500 data
df = get_ohlc_data(use_cache=True)
print(df)

最初,我们从EODHD API获取 OHLCV 数据并将其存入名为“ df ”的 Pandas DataFrame 中。 OHLCV 表示开盘价、最高价、最低价、收盘价和交易量,它们是交易蜡烛数据的标准属性。如前所述,启用缓存可以简化流程。我们还可以选择在屏幕上显示这些数据。

IMG_257

我们将一次性介绍以下代码块……

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
features = get_features(df)
target = get_target(df)

scaler = get_scaler(use_cache=True)
scaled_features = scale_features(scaler, features)

x, y = create_sequences(scaled_features, seq_length)

train_size = int(0.8 * len(x)) # Create a train/test split of 80/20%
x_train, x_test = x[:train_size], x[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

# Re-shape input to fit lstm layer
x_train = np.reshape(x_train, (x_train.shape[0], seq_length, 5)) # 5 features
x_test = np.reshape(x_test, (x_test.shape[0], seq_length, 5)) # 5 features
  • “特征”包含我们将用来预测目标的输入列表,即“接近”。
  • “target ”包含目标值列表,例如“ close ”。
  • “scaler ”代表一种用于标准化数字的方法,使它们具有可比性。例如,我们的数据集可能以接近值 784 开始,以 3538 结束。最后一行中较高的数字本身并不意味着对于预测目的具有更大的意义。标准化确保了可比性。
  • “scaled_features ”是这个缩放过程​​的结果,我们将用它来训练我们的人工智能模型。
  • “x_train ”和“ x_test ”分别表示我们将用于训练和测试 AI 模型的数据集,通常的做法是 80/20 分割。这意味着我们 80% 的交易数据分配用于训练,20% 保留用于测试模型。 “x”表示这些是特征或输入。
  • “y_train ”和“ y_test ”功能类似,但仅包含目标值,例如“ close ”。
  • 最后,必须重新调整数据以满足 LSTM 层的要求。

我们开发了一个函数来重新训练模型或加载之前训练的模型。

1
model = get_lstm_model(use_cache=True)

IMG_258

显示的图像可以让您一睹训练序列。您会发现,最初,“loss ”和“ val_loss ”指标可能并不紧密一致。然而,随着培训的进行,这些数字预计会趋同,表明取得了进展。

  • 损失:这是在训练数据集上计算的均方误差(MSE)。它反映了每个训练时期的预测标签和真实标签之间的“成本”或“误差”。我们的目标是通过连续的时期减少这个数字。
  • Val_loss:这个均方误差是根据验证数据集确定的,用于衡量模型在训练期间未遇到的数据上的性能。它是模型泛化到新的、未见过的数据的能力的指标。

如果您想查看测试集上的预测收盘价列表,可以使用此代码。

1
2
predicted_x_test_close_prices = get_predicted_x_test_prices(x_test)  
print("Predicted close prices:", predicted_x_test_close_prices)

就其本身而言,数据可能并不是特别具有启发性或易于可视化。然而,通过绘制实际收盘价与预测收盘价(请记住,这代表整个数据集的 20%),我们可以得到更清晰的图片,如下所示。

1
2
# Plot the actual and predicted close prices for the test data
plot_x_test_actual_vs_predicted(df["close"].tail(len(predicted_x_test_close_prices)).values, predicted_x_test_close_prices)

IMG_259

结果表明,该模型在测试阶段预测收盘价方面表现出色。

现在,转向最令人期待的方面:我们能否确定明天的预计收盘价?

1
2
3
4
5
 # Predict the next close price
predicted_next_close = predict_next_close(df, scaler)
print("Predicted next close price:", predicted_next_close)

Predicted next close price: 3536.906685638428

这是出于教育目的的基本示例,并且仅仅是一个开始。从这里开始,您可能会考虑合并额外的训练数据、调整超参数或将模型应用于不同的市场和时间间隔。

如果您想评估模型,可以包含此内容。

1
2
# Evaluate the model
evaluate_model(x_test)

在我们的场景中是……

Mean Squared Error: 0.00021641664334765608
Mean Absolute Error: 0.01157513692221611
Root Mean Squared Error: 0.014711106122506767

mean_squared_error ”和“ mean_absolute_error ”函数源自 scikit-learn 的指标模块,分别用于计算均方误差 (MSE) 和平均绝对误差 (MAE)。均方根误差 (RMSE) 是通过求 MSE 的平方根得出的。

这些指标提供了模型准确性的数字评估,而图形表示有助于直观地将预测值与实际数字进行比较。

这些指标提供了对模型性能的定量评估,而绘图有助于直观地将预测值与实际数据进行比较。

© 2024 Omer Wu bar All Rights Reserved. 本站访客数人次 本站总访问量