分享

spark分类模型实践

本帖最后由 pig2 于 2018-3-21 14:59 编辑
问题导读:
1、如何理解分offset构建模型?
2、数据处理使用那些核心包?
3、应用哪些模型算法?
4、如何进行数据清洗?




1 数据探索

sparkSQL支持用sql对数据集进行分析,数据探索工作仍然大部分放在spark中来完成

1.1 发现的一些相关性
对应的数值越接近1表示正相关性越大,越接近-1表示负相关性越大,越接近0表示相关性越小

  • 销售额的相关度往往好于销量
    毛利率、销量以及库存周转率的权衡在销售额上综合体现了?
    销售任务的导向作用?

  • 排除极low款与爆款的前提下
    新货前30天预测后30天相关性较大

  • 新货前30天预测整个商品季相关性较大
    放开爆款,反而销量的相关度上去了
    销售额的相关度有所下降
15060597682343.jpg
15060602769621.jpg
15060632132189.jpg
  • 冬装数据太奇葩了,基本依托于两个大活动走货
    考虑要把冬装单独拆分出模型来搞
    其它季节货品使用一个预测模型
    只保留冬季的情况
15063072795952.jpg

1.2 决定尝试分offset构建模型
1.2.1 预测商品级销量分类段划分:offset_total_quantity
2018-03-21_124556.png

1.2.2 参考周期划分:
  •     重点调优放在前三个档,因为参考周期太长,预测的意义也就小了
  •     前3天:offset3_quantity
  •     前7天:offset7_quantity
  •     前15天:offset15_quantity
  •     前30天:offset30_quantity

2 开撸
代码的注释基本都用的英文,不是为了装逼,是怕有字符集兼容问题。。。

2.1 包引入
    大致分为三类: 数据操作类、sklearn相关、可视化相关。
[mw_shl_code=python,true]# package import
from math import log
import pandas as pd
from pandas import DataFrame
import numpy as np
from string import Template
from sklearn import preprocessing
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score,f1_score
from sklearn.externals import joblib
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.plotly as py
import plotly.graph_objs as go[/mw_shl_code]

核心包简介
  •     pandas: 数据集读取操作查询转换输出库。
  •     sklearn: scikit-learn提供的ML相关方法实现库。
        preprocessing: 特征预处理相关。
        model_selection: model所需的数据集选取生成。
        metrics: 模型效果评估相关方法。
        externals: 模型持久化相关。
  •     plotly: 发现的一个第三方可视化库,比matplotlib操作起来简单,生成图形可以交互分享,但是间歇性被墙。。

3. 辅助函数声明
3.1 生成对应offset的类标
类标生成辅助方法,方法会塞入到pandas dataframe的apply方法中,默认会传入row
[mw_shl_code=python,true]def gen_hot_product_label(row, offset, column_index):
    """Classification label generator.
    Args:
        row: pandas dataframe row.
        offset: Classification offset, such as 1000, 1600, 10000, 50000.
        column_index: Dataframe row[column_index], such as row[12].
    Return:
        Result of row_column value above offset. For example:
        
        1: row_column >= offset.
        0: row_column < offset.
   
    """
    if (row[column_index] >= offset):
        return 1
    else:
        return 0[/mw_shl_code]

3.2 特征变换
    减少特征之间或者特征与类标之间取值差距,blablabla
方法
    log辅助方法
[mw_shl_code=python,true]def log_quantity(row, column_index):
    """Log the value."""
    return log(row[column_index])[/mw_shl_code]

标准化转换
[mw_shl_code=python,true]from sklearn.preprocessing import StandardScaler
def standardScalerTransform(X_train, X_test):
    """StandardScaler transform."""
    sc = StandardScaler()
    sc.fit(X_train)
    return (sc.transform(X_train), sc.transform(X_test))[/mw_shl_code]

min-Max转换
[mw_shl_code=python,true]from sklearn.preprocessing import MinMaxScaler
def minMaxTransform(X_train, X_test):
    """MinMaxScaler transform."""
    sc = MinMaxScaler()
    sc.fit(X_train)
    return (sc.transform(X_train), sc.transform(X_test))[/mw_shl_code]

3.3 样本均匀化
前情回顾
正负样本分布不均匀,需要均匀化处理,使得正负样本数基本一致。
隆重介绍imblearn库,提供各种样本均匀化算法的实现。
2018-03-21_124833.png

under-sampling

把多的砍掉,正样本多就砍正样本,负样本多就砍负样本的,最后就一致了。
至于如何砍就有很多算法了,这里选用了NearMiss算法。
[mw_shl_code=python,true]from imblearn.under_sampling import NearMiss
def under_samplingTransform(X, y):
    """Under-sampling NearMiss mode."""
    return NearMiss(random_state=0, version=1).fit_sample(X, y)[/mw_shl_code]

over-sampling
哪种样本少了,就想办法造一些,最后就一致了。
至于如何造就有很多算法了,这里选用了SMOTE的SVM模式算法。
[mw_shl_code=python,true]from imblearn.over_sampling import SMOTE, ADASYN
def over_samplingTransform(X, y):
    """Over-sampling SMOTE svm mode."""
    return SMOTE(kind='svm').fit_sample(X, y)[/mw_shl_code]

3.4 模型算法
最简单的是感知器算法,因为不能解决线性不可分问题,就忽略掉了。。

逻辑斯蒂回归
    唬人的名字,说是回归,其实是分类算法。。
    分类界用的很多。
[mw_shl_code=python,true]from sklearn.linear_model import LogisticRegression
def logisticRegModelGenerator(train_std, y_train):
    """LogisticRegression model generator."""
    return LogisticRegression(C=1000, random_state=0).fit(train_std, y_train)[/mw_shl_code]

随机森林
    理论上说可以忽略样本分布不均匀的问题(因为属于决策树类的算法)。
[mw_shl_code=python,true]from sklearn.ensemble import RandomForestClassifier  
def random_forest_classifier(train_x, train_y):   
    """Random Forest model generator."""
    return RandomForestClassifier(n_estimators=8).fit(train_x, train_y)[/mw_shl_code]

SVM
    忽然概念名词超多的算法,什么超平面啥的。。
[mw_shl_code=python,true]from sklearn.svm import SVC
def svm_classifier(train_x, train_y):   
    """SVM model generator."""
    return SVC(kernel='rbf', probability=True).fit(train_x, train_y)[/mw_shl_code]

GBDT
    梯度提升算法(实测在这个场景综合效果较好)
[mw_shl_code=python,true]from sklearn.ensemble import GradientBoostingClassifier   
def gradient_boosting_classifier(train_x, train_y):   
    """GBDT model generator."""
    return GradientBoostingClassifier(n_estimators=200).fit(train_x, train_y)[/mw_shl_code]


xgboost
    在kaggle大赛中叱咤风云的神级算法,在这个场景实测效果不如GBDT
    但xg有些好处,比如可以输出每轮学习时的精确度,以及输出目前输入特征的重要性分数,便于优化调参。
[mw_shl_code=python,true]from xgboost import XGBClassifier
from xgboost import plot_importance
from matplotlib import pyplot
def xgboost_classifier(train_x, train_y):
    """xgboost model generator."""
    model = XGBClassifier()
    model.fit(train_x, train_y)
   
    # Feature importance.
    # plot_importance(model)
    # pyplot.show()
   
    return model[/mw_shl_code]

3.5 预测类
    SalesProphet(销售预言家):预测辅助类
    因为各种特征offset、类标、算法的组合,不封装一个类的话,将来会死的。。(已经死过一轮了,改一个东东要累死。。)
    具体方法作用详见注释哈,总之就是传入参数,调用predit完事。
    (吐槽python 断言竟然只能在继承于testcase的类中使用。。)
[mw_shl_code=python,true]class SalesProphet(object):
    """Sales prediction class.
   
    Args:
        datasource: Sales prediction datasource relied on.
        features_name: Feature names array.
        label_name: Label names String.
        model_type: Algorithm of model training.
        X: Feature data.
        y: Label data.
        X_train: X train data.
        X_test: X test data.
        y_train: y train data.
        y_test: y test data.
        y_pred: y data predicted.
        model: ML model fitted.
        accuracy_score: Model accuracy score.
        f1_score: Model f1 score.
        train_score: Model score in train set.
        test_score: Model score in test set.
    """
   
    def __init__(self, datasource, features_name, label_name, model_type='logistic'):
        """Inits SalesProphet with datasource, features_name, label_name, model_type(default is logistic)"""
        self.datasource = datasource
        self.features_name = features_name
        self.label_name = label_name
        self.model_type = model_type
   
    def feature_engineering(self):
        """Feature engineering about: X y generated, one-hot, sampling blabla..."""
        # assertIsNotNone(self.datasource, 'Guys, forget the datasource!!!')
        # assertNotEqual(len(self.features_name), 0, 'features is empty. WTF...')
        # assertNotEqual(len(self.label_name), 0, 'label is empty. WTF...')
        
        self.X = self.datasource[self.features_name].values
        self.y = self.datasource[self.label_name]
        
        # one-hot
        ohe = OneHotEncoder(categorical_features = [0, 1])
        self.X = ohe.fit_transform(self.X).toarray()
        
        self.X, self.y = over_samplingTransform(self.X, self.y)
        
    def train_test_transform(self):
        """Trainset and testset splitor and standard transform."""
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(self.X, self.y, test_size=0.3, random_state=0)
        self.X_train, self.X_test = standardScalerTransform(self.X_train, self.X_test)
        
    def model_fitting(self):
        """Algorithm of model selector."""
        model_alg_switcher = {
            'logistic': logisticRegModelGenerator,
            'rf': random_forest_classifier,
            'svm': svm_classifier,
            'gdbt': gradient_boosting_classifier,
            'xgboost': xgboost_classifier
        }
        func = model_alg_switcher.get(self.model_type, logisticRegModelGenerator)
        return func(self.X_train, self.y_train)
   
    def genPredictReport(self, printlog=False):
        """Model estimate report generation. Set printlog YES to pring log."""
        self.accuracy_score = accuracy_score(self.y_test, self.y_pred)
        self.f1_score = f1_score(self.y_test, self.y_pred, average='binary')
        self.train_score = self.model.score(self.X_train, self.y_train)
        self.test_score = self.model.score(self.X_test, self.y_test)
        
        if printlog:
            print('/--------------START-------------')
            print('|')
            print('| feature: %s' % self.features_name)
            print('| label: %s' % self.label_name)
            print('| model alg: %s' % self.model_type)
            print('|')
            print('|----Estimate score------')
            print('|')
            print('| accuracy is: %.2f' % self.accuracy_score)
            print('| f1_score is: %.2f' % self.f1_score)
            print('|')
            print('|---Over-fitting check---')
            print('|')
            print('| train-set score: %.2f' % self.train_score)
            print('| test-set score: %.2f' % self.test_score)
            print('|')
            print('|---------------END--------------/\n')
        
    def public_genReportChart(self):
        """y_test and y_pred chart generation."""
        t = np.arange(len(self.y_pred))
        # Create traces
        trace0 = go.Scatter(
            x = t,
            y = self.y_pred,
            mode = 'lines',
            name = 'predict'
        )
        trace1 = go.Scatter(
            x = t,
            y = self.y_test,
            mode = 'lines',
            name = 'real'
        )
        data = [trace0, trace1]
        py.iplot(data, filename='(%self.features_name)_(%self.labels_name)_(%self.model_type.model)')
   
    def public_saveModel(self):
        """Model persistence."""
        joblib.dump(self.model, '%(self.features_name)_%(self.labels_name)_(%self.model_type.model)')
   
   
    def predict(self):
        """Predict main method."""
        self.feature_engineering()
        self.train_test_transform()
        
        self.model = self.model_fitting()
        self.y_pred = self.model.predict(self.X_test)
        self.genPredictReport()[/mw_shl_code]

4. 数据准备
4.1 数据读取
从spark 导出准备好的数据到csv文件,pandas读取该csv中的数据。
[mw_shl_code=python,true]train_data = pd.read_csv("data/product_2016_offset_group.csv")[/mw_shl_code]

获取前5条数据看看情况
[mw_shl_code=python,true]train_data.head()[/mw_shl_code]

describe 可以对df中各列的综合指标进行集中展示。
比如中位数、均值等等,方便进一步分析数据。
[mw_shl_code=python,true]train_data.describe()[/mw_shl_code]

4.2 销量特征log变换
[mw_shl_code=python,true]train_data['log_3_quantity'] = train_data.apply(log_quantity, column_index=8, axis=1)
train_data['log_7_quantity'] = train_data.apply(log_quantity, column_index=9, axis=1)
train_data['log_15_quantity'] = train_data.apply(log_quantity, column_index=10, axis=1)
train_data['log_30_quantity'] = train_data.apply(log_quantity, column_index=11, axis=1)
train_data['log_total_quantity'] = train_data.apply(log_quantity, column_index=12, axis=1)
train_data.head()[/mw_shl_code]

4.3 类标生成
[mw_shl_code=python,true]train_data['hot_1000_product'] = train_data.apply(gen_hot_product_label, args=(1000, 12), axis=1)
train_data['hot_1600_product'] = train_data.apply(gen_hot_product_label, args=(1600, 12), axis=1)
train_data['hot_10000_product'] = train_data.apply(gen_hot_product_label, args=(10000, 12), axis=1)
train_data['hot_50000_product'] = train_data.apply(gen_hot_product_label, args=(50000, 12), axis=1)

train_data.head()
train_data[train_data.hot_1000_product == 1].count()

product_code                  1477
category_id                   1477
season                        1477
offset3_amount_actual         1477
offset7_amount_actual         1477
offset15_amount_actual        1477
offset30_amount_actual        1477
offset_total_amount_actual    1477
offset3_quantity              1477
offset7_quantity              1477
offset15_quantity             1477
offset30_quantity             1477
offset_total_quantity         1477
log_3_quantity                1477
log_7_quantity                1477
log_15_quantity               1477
log_30_quantity               1477
log_total_quantity            1477
hot_1000_product              1477
hot_1600_product              1477
hot_10000_product             1477
hot_50000_product             1477
dtype: int64[/mw_shl_code]

4.4 数据清洗
[mw_shl_code=python,true]train_data_normal = train_data[train_data.offset30_quantity <= train_data.offset_total_quantity]

train_data_normal[train_data_normal.offset_total_quantity < 0]

# drop null row
print(train_data_normal.isnull().sum())
train_data_valid = train_data_normal.dropna()

product_code                  0
category_id                   0
season                        0
offset3_amount_actual         0
offset7_amount_actual         0
offset15_amount_actual        0
offset30_amount_actual        0
offset_total_amount_actual    0
offset3_quantity              0
offset7_quantity              0
offset15_quantity             0
offset30_quantity             0
offset_total_quantity         0
log_3_quantity                0
log_7_quantity                0
log_15_quantity               0
log_30_quantity               0
log_total_quantity            0
hot_1000_product              0
hot_1600_product              0
hot_10000_product             0
hot_50000_product             0
dtype: int64[/mw_shl_code]

无序特征做onehot消除次序关系。
整理特征与类标。
循环生成预言家,让它预测,生成报告,然后把他丢到预言家数组里面(salesProphets)便于后面生成分析对比用的DataFrame。

[mw_shl_code=python,true]# category_id, season onehot
feature_disordered = ['category_id', 'season']
feature_cols_3 = feature_disordered + ['offset3_quantity']
feature_cols_7 = feature_disordered + ['offset7_quantity']
feature_cols_15 = feature_disordered + ['offset15_quantity']
feature_cols_30 = feature_disordered + ['offset30_quantity']
feature_offsets = [feature_cols_3, feature_cols_7, feature_cols_15, feature_cols_30]
label_names = ['hot_1000_product', 'hot_1600_product', 'hot_10000_product', 'hot_50000_product']
algs = ['logistic', 'rf', 'svm', 'gdbt','xgboost']
salesProphets = []
for alg in algs:
    for feature in feature_offsets:
        for y in label_names:
            salesProphet = SalesProphet(train_data_valid, feature, y, alg)
            salesProphet.predict()
            salesProphet.genPredictReport()
            salesProphets.append(salesProphet)[/mw_shl_code]

预言家数组生成综合对比DataFrame
# 整理生成报告DataFrame
[mw_shl_code=python,true]feature_column = []
label_column = []
model_alg_column = []
accuracy_column = []
f1_score_column = []
trainset_score_column = []
testset_score_column = []
for salesProphet in salesProphets:
    feature_column.append(salesProphet.features_name)
    label_column.append(salesProphet.label_name)
    model_alg_column.append(salesProphet.model_type)
    accuracy_column.append(salesProphet.accuracy_score)
    f1_score_column.append(salesProphet.f1_score)
    trainset_score_column.append(salesProphet.train_score)
    testset_score_column.append(salesProphet.test_score)
result_data = {'feature': feature_column, 'label': label_column,
               'model_alg': model_alg_column, 'accuracy': accuracy_column,
               'f1_score': f1_score_column, 'trainset_score': trainset_score_column,
               'testset_score': testset_score_column}
result_df = DataFrame(result_data)[/mw_shl_code]

分析报告保存
[mw_shl_code=python,true]result_df.to_csv('data/result_df.csv')[/mw_shl_code]

5结果
特征重要性分析

分数越高越重要
模型特征重要性分析.jpg

综合对比
分数越接近1越好
综合对比报告.png

来源:http://www.alithink.com/2017/11/ ... %E6%9C%AC%EF%BC%89/
作者:alithink

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条