用梯度提升决策树做股票走势预测

上一篇写到如何用Python计算指标,但是仅仅用这些常见指标,来对股票走势进行判断,胜率并不高,原因很简单,这些指标所有人都在看,就容易被庄家操控,另外,所有指标都只是人类对股票价格数据的理解,是不够全面的,毕竟人类在面对海量数据时的能力是远低于机器的。

梯度提升决策树HistGradientBoosting算法,我同时测试了随机森林、xgboost、lightGBM等算法,它给我的印象是又快又好,具体算法可以自行去网上搜,这里就不赘述了。网上看了很多用机器学习对股价进行预测的文章,大部分都是基于对收盘价序列,进行下一天的收盘价的预测,这种方法基本属于痴人说梦,只要写一个下一天收盘价等于上一天收盘价的函数,就能战胜绝大多数模型。

一般情况下,我们对股价趋势进行预测,不是只看收盘价,也不是只看前一天的收盘价,而是会看很多天的开盘价、收盘价、最高价和最低价,同时也要看成交量,成交量可能还更重要。(当然,还有更多的东西要看,比如市场情绪、政策趋向、突发事件、板块走势等等,这些数据很多都是非结构化的文本数据,所以大模型才是正解,当然,我们可以认为,这些数据,已经部分的反应在了股票成交数据中)更进一步,如果要分析tick数据(分笔的成交数据),理论上讲可以获得更多信息,而且可以更实时而精准地找到买卖点,但是,投入有点大,我的笔记本存不下,也跑不动,更重要的是,我只是写着玩,并不是真正去做股票投资,所以用日成交数据足够了。

所以,数据用的是N天的收盘价、开盘价、最高价、最低价、换手率,又加上了5日、10日、20日和100日的均线,用来反映更长期的变化信息。这里N天,到底是用10天数据去预测,还是用30天数据去预测,是个重要的超参数,虽然理论上讲N越大,预测越准,但是同样会带来性能问题。然后把这些数据直接平铺,当做训练的数据集。停牌时的数据,自动用上一日的填充。这里要说明的是HistGradientBoosting是支持缺失值的,但是考虑到更多其他算法使用,还是做了填充。同样,理论上讲HistGradientBoosting是不需要对数据做归一化的,但是不太股票股价差异太大,不归一化的话,模型泛化能力会非常差,没有用sklearn的minmaxscaler,因为发现它归一化出来的数字会损失掉非常多。这里,直接用每个股票价格的平均值当分母,去除股价,来做归一化,每只股票都是用自己的归一化算法,保证出来的数值,各个股票之间不会差太多,换手率不用做调整,

第二个问题是,为什么是预测趋势,而不是预测股价。这里考虑的其实是,到底用分类,还是用回归。预测股价的问题是,股价的影响因素太多,不够稳定,经常一涨一跌,对模型的稳定性不利,趋势就稳定很多,所以这个算法里面,预测的是5日均线在5天后比现在高10%。当然, 到底用5日还是7日均线,到底比较的是10天还是15天,到底高10%还是20%,这些都可以调,都是超参数。不过,理论上讲,这些参数选得越值越大,预测的准确性会越低,所以这个算法最好还是做短线。

下面上代码:

def check_buy_sell(a, b):
    #print('a='+str(a)+' b='+str(b))
    if b > a * 1.10:
        return 1
    elif b < a * 0.90:
        return -1
    return 0
    
def prepareStockData(stock_id, days, end_date, n_days):

    #多取一些时间,保证均线计算正确
    df = get_stock_hist_data(stock_id, start_date='2018-01-01', end_date=end_date)

    df['SMA_5_5'] = df['SMA_5'].shift(periods=-5)
    df['buysell'] = df.apply(lambda x: check_buy_sell(x['SMA_5'], x['SMA_5_5']), axis=1)

    #再截取要用的数据
    df = df[len(df)-n_days-days-5:len(df)-5]

    #print(df[['SMA_5','SMA_5_5']])

    samplesX = pd.DataFrame()
    samplesY = pd.DataFrame()

    #跳过前面几天数据,从第days天开始取
    for i in range(days, len(df)):
   
        df_sample_X = df.iloc[i-days:i] #取前days天到前一天数据
        df_sample_Y = df.iloc[i:i+1]    #取当天数据
            
        if not samplesX.empty:
            samplesX = pd.concat([samplesX, df_sample_X])
            samplesY = pd.concat([samplesY, df_sample_Y])
        else:
            samplesX = df_sample_X.copy()
            samplesY = df_sample_Y.copy()

    size = len(samplesY)
  

    X = np.array(samplesX[['open','close','high','low','SMA_5','SMA_10','SMA_20','SMA_100']]).reshape(size, days*8)
    exchanges = np.array(samplesX[['exchange']]).reshape(size,days)
    
    Y = np.array(samplesY[['buysell']]).reshape(size,1) #.ravel()
    ScaleNum = np.mean(X.reshape(size*days*8, 1))


    X = X / ScaleNum

    X = np.concatenate((X, exchanges), axis=1)

    return X, Y, ScaleNum

然后开始进行训练,同时训练了一个随机森林的模型,用来比较时间和准确率。这里有两点,在调参时,训练数据要截断在某个时间,以防止模型中包含了测试集的信息。二是训练好的模型,连同归一化参数,都用joblib保存下来,方便预测时调用。

def prepareTrainData(stock_ids, days, end_date='2025-08-30'):

    ALL_X = np.array([])
    ALL_Y = np.array([])
    scalers = {}

    for index, row in stock_ids.iterrows():
        stock_id = row['stock_id']
        print('getting train data for stock_id = '+stock_id)
        
        X, Y, scaleNum = prepareStockData(stock_id, days, end_date=end_date, n_days=1000)
        scalers[stock_id] = scaleNum
        
        if len(ALL_X) > 0:
            ALL_X = np.vstack((ALL_X, X))
            ALL_Y = np.vstack((ALL_Y, Y))
        else:
            ALL_X = X
            ALL_Y = Y

    return ALL_X, ALL_Y, scalers

def trainAndSaveModel(days=60, end_date='2025-08-30'):
    
    stock_ids = read_stock_pool_from_file(POOL_FILE)
    #today = str(get_last_trading_day())

    ALL_X, ALL_Y, scalers = prepareTrainData(stock_ids, days=days, end_date=end_date)

    print(ALL_X.shape)
    print(ALL_Y.shape)

    n_samples, n_length = ALL_X.shape
    n_samples = round(n_samples * 0.75)

    X_train = ALL_X[:n_samples]
    X_test  = ALL_X[n_samples:]
    Y_train = ALL_Y[:n_samples]
    Y_test  = ALL_Y[n_samples:]

    print('start fit model random forest, time=' + str(datetime.datetime.now()))
    
    model_randomForest = RandomForestClassifier().fit(X_train, Y_train.ravel())
    print(model_randomForest.score(X_test, Y_test.ravel()))
    print('start fit model hist gradient boosting, time=' + str(datetime.datetime.now()))
    
    model_histGradientBoosting = HistGradientBoostingClassifier(loss='log_loss',max_iter=100).fit(X_train, Y_train.ravel())
    print(model_histGradientBoosting.score(X_test, Y_test.ravel()))
    print('end fit, time=' + str(datetime.datetime.now()))

    saveDir = 'models_20250924'
    os.mkdir(saveDir)
    
    joblib.dump(model_randomForest, saveDir+'/model_randomForest.joblib')
    joblib.dump(model_histGradientBoosting, saveDir+'/model_histGradientBoosting.joblib')
    joblib.dump(scalers, saveDir+'/scalers.joblib')

    return model_randomForest, model_histGradientBoosting, scalers

这个算法,用自己写的回测系统,有非常好而且稳定的收益率,这个后面再讲,包括怎么写一个自己的回测系统。