GBDT全称Gradient Boosting Decision Trees,即梯度提升回归树。GBDT可以拆分2部分:GB+DT。GB是一种通用思想或者算法,GBDT只是众多GBM(Gradient Boosting Machine)里面的一种。所以先来看GB。
Boosting前面已经介绍过了,就是用多个弱学习器顺序迭代生成一个强学习器,GB里面的重点在于每次迭代的时候是拟合残差。所谓残差(residual)就是真实值和预测值的差值: $residual_i = y_i-f(x_i)$,它表示了数据模型中所不可能刻画的部分。所以GB的思想大致可以描述如下:
N次迭代后得到最终的强学习器。所以可以看到GB其实就是不断的迭代拟合残差。但至此并没有看到任何关于Gradient的影子。接着往下看:假设现在有样本集 $(x_1,y_1),(x_2,y_2),...(x_n,y_n)$ ,然后用一个模型$f(x)$去拟合这些数据。如果是回归模型的话,我们一般使用均方误差作为损失函数,即:$L(\theta) = \frac{1}{2}\sum_{i=0}^n(y_i-f(x_i))^2$
然后最优化算法求解模型的参数$\theta$。常用的最优化求解方法之一就是梯度下降(gradient descent)。对损失函数求梯度得到:$\nabla L(\theta) = \sum_{i=0}^n{(f(x_i)-y_i)}$
至此,可以看到我们前面定义的残差就是这里的负梯度:$residual_i = y_i-f(x_i) = -(f(xi)-y_i)$。也就是说前面对残差的拟合,其实就是对负梯度的拟合,而根据残差来更新集成后的模型实际就是根据负梯度来更新。这样来看,梯度提升方法就变成了广义上的梯度下降。这就是GB中Gradient的部分。
需要注意的是:尽管这里残差和负梯度的值完全一样,但二者代表的含义却是不一样的:负梯度指向的是单个模型参数更新的方向,残差(即梯度提升)则表示了集成模型下一个模型的拟合目标。梯度的不断下降可以让模型的参数逐渐收敛到最优参数上,而残差的不断拟合则让集成之后的模型越来越解决真实数据的生成机制。换言之,
有了从残差转换到梯度的思路以后,又可以再继续改进,不再局限于残差,而是可以从损失函数的负梯度角度去构造更丰富的提升方法,即构造更多的损失函数。因为基于残差的损失函数有一个明显的缺点就是对异常值比较敏感。看下面的例子:
上面的例子中,5*
是一个异常点。很明显,按照Boosting的思路,后续模型会对这个值关注过多,这不是一个好现象。所以一般回归类的损失函数会使用绝对损失(absolute loss)或者huber损失函数(huber loss)来代替平方损失函数:
如果GB中的弱学习器使用决策树,就是GBDT了。GBDT中一般使用CART决策树。
这便是GBDT算法,整体还是Boosting的思路,但具体到细节又和AdaBoost有明显的差别:AdaBoost中每个弱学习器的短板通过权重的加强得以凸显,而梯度提升中则将权重换成了梯度。
在解决分类问题时,GBDT 中的负梯度可以表示成样本归属于每个类别的真实概率和上一轮预测概率的差值,这个差值会被回归树拟合,拟合值与上一轮预测概率求和就是这一轮的输出。
下图来自Wikipedia:https://en.wikipedia.org/wiki/Gradient_boosting
初始化$F_0(x)$,表示第0棵树的预测值。那怎么初始化呢?取决于所选择的损失函数:
假如进行M轮迭代(即M个弱分类器),则对于第m轮($m=1,2,...,M$):
再总结一下关键点:
另外还需要说明一个概念:学习率(Learning Rate),背景是这样的:测试表明每次沿正确的方向前进一小步可以获得更好的预测性能,所以设计了一个超参learning rate,用来控制每轮模型的影响。一般会设置的比较小(比如0.1),以此来让模型使用更多的弱分类器。英文版:
It’s been shown through experimentation that taking small incremental steps towards the solution achieves a comparable bias with a lower overall vatiance (a lower variance leads to better accuracy on samples outside of the training data). Thus, to prevent overfitting, we introduce a hyperparameter called learning rate. When we make a prediction, each residual is multiplied by the learning rate. This forces us to use more decision trees, each taking a small step towards the final solution.
上面的算法流程里面没有给出来,但实际迭代的时候还会引入一个学习率$\nu$,完整的迭代公式是这样的:$F_m(x)=F_{m-1}(x)+\nu*\gamma_mH_m(x)$
一幅图表示就是(来自:How XGBoost Works):
整个介绍起来比较抽象,可参考下面例子中的第一个:Gradient Boosting Decision Tree Algorithm Explained来感受一下整个流程。
XGBoost(eXtreme Gradient Boosting)是对GBDT的优化和工程化的实现。优化可分为算法优化和工程实现方面的优化:
Algorithmic Enhancements:
- Regularization: It penalizes more complex models through both LASSO (L1) and Ridge (L2) regularization to prevent overfitting.
- Sparsity Awareness: XGBoost naturally admits sparse features for inputs by automatically ‘learning’ best missing value depending on training loss and handles different types of sparsity patterns in the data more efficiently.
- Weighted Quantile Sketch: XGBoost employs the distributed weighted Quantile Sketch algorithm to effectively find the optimal split points among weighted datasets.
- Cross-validation: The algorithm comes with built-in cross-validation method at each iteration, taking away the need to explicitly program this search and to specify the exact number of boosting iterations required in a single run.
System Optimization:
- Parallelization: XGBoost approaches the process of sequential tree building using parallelized implementation. This is possible due to the interchangeable nature of loops used for building base learners; the outer loop that enumerates the leaf nodes of a tree, and the second inner loop that calculates the features. This nesting of loops limits parallelization because without completing the inner loop (more computationally demanding of the two), the outer loop cannot be started. Therefore, to improve run time, the order of loops is interchanged using initialization through a global scan of all instances and sorting using parallel threads. This switch improves algorithmic performance by offsetting any parallelization overheads in computation.
- Tree Pruning: The stopping criterion for tree splitting within GBM framework is greedy in nature and depends on the negative loss criterion at the point of split. XGBoost uses ‘max_depth’ parameter as specified instead of criterion first, and starts pruning trees backward. This ‘depth-first’ approach improves computational performance significantly.
Hardware Optimization: This algorithm has been designed to make efficient use of hardware resources. This is accomplished by cache awareness by allocating internal buffers in each thread to store gradient statistics. Further enhancements such as ‘out-of-core’ computing optimize available disk space while handling big data-frames that do not fit into memory.
从算法角度来说,XGBoost主要对GBDT的目标函数进行了优化。GBDT的目标函数:
$Obj_{gbdt} = \sum_{i=1}^N{L(f_m(x_i), y_i)}=\sum_{i=1}^N{L(f_{m-1}(x_i) + h_m(x_i), y_i)}$
而XGBoost对目标函数的优化有两方面:
先看正则项:
$Obj_{xgboost} = \sum_{i=1}^N{L(f_m(x_i), y_i)} + \sum_{j=1}^m{\Omega f(x_j)}=\sum_{i=1}^N{L(f_{m-1}(x_i) + h_m(x_i), y_i) + \sum_{j=1}^m{\Omega f(x_j)}}$
加正则项就是Shrinkage的思想,可以防止过拟合、降低方差,获取更好的泛化效果。这个正则项的具体公式为:
$\Omega f(x_i) = \gamma T + \frac{1}{2}\lambda||\omega||^2$
其中$T$为树$f$的叶节点个数,$\omega$为所有叶节点输出回归值构成的向量,$\gamma,\lambda$为超参数。再看泰勒公式:
更细的部分可参考下面2篇文章:
梯度提升算法(Gradient Boosting Machine,GBM)其实是一类算法,GBDT只是其中一种,而XGBoost则是GBDT的一个优化版本(或者说是GBDT算法的一个具体的实现),这个优化不仅体现在算法层面,也包括很多工程实现方面的优化,比如并行、内存使用量少等。同类的还有微软开源的LightGBM,号称速度更快,内存使用更低,网上有很多对比评测文章,有兴趣的可以看看。
]]>理解随机森林的关键点在于理解“相关度低甚至不相关的多个决策树组合在一起的效果好于其中任何一个决策树”。这里拿一个例子做论证(注:此例来自第一个参考文章),做一个游戏:使用一个均匀分布的随机数产生器产生一个数字,如果这个数字大于等于40,则算你赢,可以获得一些钱;如果小于40,则算你输,你需要给对方同样数额的钱。现在有三种玩法供选择:
你会怎么选哪一个?我们计算一下赢钱的期望值:
三种选择赢钱的期望值是一样的,那到底该如何选?我们做一个模拟:每种情况都模拟10000次,代码如下:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme()
# Game 1
simulations = 10000 # number of Monte Carlo Simulations
games = 100 # number of times the game is played
threshold = 40 # threshold where if greater than or equal to you win
bet = 1 # dollar bet for the game
# outer loop is Monte Carlo sims and inner loop is games played
sim_results_1 = []
for sim in range(simulations):
result = []
for g in range(games):
number = int(np.random.uniform()*100) # get a random number to see who wins
if number >= threshold:
result.append(bet)
else:
result.append(-bet)
sim_results_1.append(sum(result)) # sim_results_1 stores results for Game 1
print('Game 1 Mean: ', round(np.mean(sim_results_1), 2))
print('Game 1 Prob Positive: ', round(sum([1 for i in sim_results_1 if i>0])/simulations, 2))
print('\n')
# Game 2 (structure of code is same as above)
simulations = 10000
games = 10
threshold = 40
bet = 10
sim_results_2 = []
for sim in range(simulations):
result = []
for g in range(games):
number = int(np.random.uniform()*100)
if number >= threshold:
result.append(bet)
else:
result.append(-bet)
sim_results_2.append(sum(result))
print('Game 2 Mean: ', round(np.mean(sim_results_2), 2))
print('Game 2 Prob Positive: ', round(sum([1 for i in sim_results_2 if i>0])/simulations, 2))
print('\n')
# Game 3 (structure of code is same as above)
simulations = 10000
games = 1
threshold = 40
bet = 100
sim_results_3 = []
for sim in range(simulations):
result = []
for g in range(games):
number = int(np.random.uniform()*100)
if number >= threshold:
result.append(bet)
else:
result.append(-bet)
sim_results_3.append(sum(result))
print('Game 3 Mean: ', round(np.mean(sim_results_3), 2))
print('Game 3 Prob Positive: ', round(sum([1 for i in sim_results_3 if i>0])/simulations, 2))
# Histogram that shows the distribution of the Monte Carlo Results for 2 spending levels
fig, ax = plt.subplots(figsize=(8,6))
sns.distplot(sim_results_1, kde=False, bins=60, label='Play 100 Times')
sns.distplot(sim_results_2, kde=False, bins=60, label='Play 10 Times', color='orange')
sns.distplot(sim_results_3, kde=False, bins=60, label='Play 1 Time', color='pink')
ax.set_xlabel('Money Won by You', fontsize=16)
ax.set_ylabel('Frequency',fontsize=16)
plt.legend()
plt.tight_layout()
plt.savefig(fname='game_hist', dpi=150)
plt.show()
模拟的输出以分布图如下:
# 代码输出
Game 1 Mean: 20.01
Game 1 Prob Positive: 0.97
Game 2 Mean: 20.2
Game 2 Prob Positive: 0.63
Game 3 Mean: 21.08
Game 3 Prob Positive: 0.61
那可以看到赢钱的均值和即我们先前计算的期望值是一致的,三种玩法都接近20(Game x Mean),但赢钱的概率却相差很大,玩法1是97%,玩法2是63%,玩法3是61%。当然模拟次数再多一些,还有有一些变化。随机森林的思想和这个是一样的,里面包含的决策树的个数就是这里玩的次数。
另外,随机森林还有一个非常关键的限定条件:各个决策树之间不相关或者关联度很低。类比到上面的游戏中,我们的假设是产生随机数的算法是遵从均匀分布的,也就是产生1~100之间的数字的概率是完全相等的。如果不是,那上面第一种玩法1最优的结论就不一定成立了。而随机森林实现各个决策树之间不相关或者关联度很低是通过“两个随机”实现的:
通过两个随机,可以降低最终生成的模型的方差。而且通过这种方式生成的树一般也无需剪枝。Wikipedia上面的描述是这样的:
Each decision tree in the forest considers a random subset of features when forming questions and only has access to a random set of the training data points. This increases diversity in the forest leading to more robust overall predictions and the name ‘random forest.’
随机森林还有一个变种:Extra Trees。也称Extremely Randomized Trees,一般翻译为极限树,是随机森林的一个变种,进行了更彻底的随机,主要有两点:
极限树的两个改动点在大部分情况下会进一步的降低方差,但可能会稍微增大一些偏差。
总结一下,随机森林的思路就是“群众的眼睛是雪亮的”,通过使用多个决策树组成一个“委员会”来进行预测或者回归,这些“群众”就是集成算法中的“若分类器”,他们相互之间没有关联度或者关联度低,而且一般在某一个点表现还可以。这点是通过“两个随机”去实现的,也是随机森林最核心的地方。
References:
]]>AdaBoost是Adaptive Boosting的缩写,即自适应提升法,是最成功的Boosting算法。具体算法如下:
Step1: Initialise the dataset and assign equal weight to each of the data point.
Step2: Provide this as input to the model and identify the wrongly classified data points.
Step3: Increase the weight of the wrongly classified data points.
Step4: if (got required results)
Goto step 5
else
Goto step 2
Step5: End
即初始的时候,赋予每个训练样本相同的权重;然后每次迭代后,增加分类错误样本的权重(数据集还是原来的数据集,只不过各个样本的权重变了),使得下一轮迭代时更加关注这些样本。
AdaBoost一般选用决策树桩(decision stump)作为弱学习器。所谓stump是指由一个决策节点和两个叶子节点组成的二叉树:
下图是一个二分类问题的学习过程示例,包含了三轮迭代:
AdaBoost的算法描述如下:
下面进行解释。
然后开始循环:
如何计算训练误差$\varepsilon_t$?公式如下:
$$ \varepsilon_t = \frac {\sum_{i=1}^N\omega_iI(y_i \neq h_t(x_i))}{\sum_{i=1}^N\omega_i} $$
其中$I(y_i \neq h_j(x_i))$的含义是如果$y_i \neq h_j(x_i)$ 成立,则返回1,否则返回0。下面看个具体的计算例子:
于是有
$$ \varepsilon_t = \frac{0.5*1+0.2*0+0.1*0+0.2*1}{0.5+0.2+0.1+0.2} = 0.7 $$
有时也称$\varepsilon_t$为“total error”表示的是“所有错误分类的样本权重之和”,其实和上面的公式是一致的,因为权重一般是做过归一化的,所以分母里面的所有权重之和其实为1;而且分子中分类正确的样本的$I$值为0,也就是上面的公式可以简化为:
$$ \varepsilon_t = \frac {\sum_{i=1}^N\omega_iI(y_i \neq h_t(x_i))}{\sum_{i=1}^N\omega_i} \\ = \sum_{i=1}^N\omega_iI(y_i \neq h_t(x_i)) = 错误分类样本的权重之和 = total\ error $$
那权重调整系数$\alpha_t$的作用是什么呢?看个具体的例子。假设现在有3个$\varepsilon$值:0.3、0.5、0.7。则对应的$\alpha$的值如下:
$$ \alpha(\varepsilon=0.3) = \frac{1}{2}*ln\frac{1-0.3}{0.3} = 0.42365 \\ \alpha(\varepsilon=0.5) = \frac{1}{2}*ln\frac{1-0.5}{0.5} = 0 \\ \alpha(\varepsilon=0.7) = \frac{1}{2}*ln\frac{1-0.7}{0.7} = -0.42365 \\ $$
可以看到,当弱分类器的准确率为0.5时,其权重为0;准确率大于0.5(即错误率小于0.5)时,权重大于0;准确率小于0.5(即错误率大于0.5)时,权重小于0。所以,错误率$\varepsilon$越小,$\alpha$越大,即当前模型的表现越好,在最终的生成器中占的权重就越大。所以$\alpha$也称为“amount of say”、“Performance”、“Importance”,都指的是当前分类器在最终分类器中的权重。另外注意计算的时候有时使用自然对数($ln=log_e$),有时使用常用对数($log_{10}$)。
再来看最重要的部分:样本权重更新。在Boosting的迭代中,我们每次要找的是错误率比较低的弱分类器。为了方便我们沿用上面$\alpha(\varepsilon=0.3)$的计算结果,看下分类错误和正确时计算出来的权重值:
可以看到当弱分类器在某个样本上分类正确的时候,该样本的权重会降低;否则就会提升,符合理论预期。最后提一下$Z_t$,这是一个规范化系数(normalization factor ),是为了让新计算出来的权重代表一个真正的分布,一般就是归一化。比如上面的例子所有样本的权重更新后加起来后做一下归一化。上面的公式可以简化一下:
最后一个问题,增加了预测错误的样本的权重之后,如何在下一轮迭代中更关注他们呢?实质是通过权重影响每一轮迭代数据集的选择来实现:比如我们有一个包含N个样本的数据集,每个样本都有一个权重(第一轮时权重相同,均为1/N),然后每一轮选择本轮使用的$m(m \le N)$个样本时,是根据权重随机采样的。也就是有些样本可能会被选择多次,有的可能一次也不会被选中。特别是当样本的权重被更新后,权重大的样本就更容易被选中,甚至选中多次了。这样这些样本自然会对后面的弱分类器产生比较多的影响。比如Pandas的sample
方法就可以实现在一个数据集上面按照权重采样。
另外,还有一种“Bucket”的方式,思路如下:原来有N个样本,每次选N个样本进行弱学习器的训练。第一轮大家权重一样,第二轮的时候,根据权重先将原数据集划分“Bucket”,权重高的样本会占据多个bucket。而每个bucket被选中的几率是一样的,所以权重高的样本就可能被多选几次。也就是第二轮的时候虽然还是N个样本,但某些权重低的可能没有被选,某些权重高的样本可能被选了多次。比如下面的例子(N=5,ref:AdaBoost Algorithm – A Complete Guide for Beginners):
第一轮,所有样本权重一样,可以理解为每个样本就是一个bucket:
第一轮结束的时候,各个样本的权重已经发生了变化(假设第4条数据分错了,它的权重提升了):
如上图,按照权重重新划分了数据集的bucket,分错的第2个样本占了比较多的bucket。所以在为第二轮迭代选择样本的时候,第4条被选中的概率就会比较大。比如算法随机的5个bucket是:0.38,0.26,0.98,0.40,0.55。那选出的样本就是下图:
可以看到,样本4在第二轮出现了3次,这样后面的分类器必然会对该样本倾斜。
上面的第一种情况其实是后面bucket这种方式的一种特殊情况。所以,需要注意的是,每轮迭代改变了样本的权重后,对下一轮的影响是体现在挑选数据集的时候。当数据集选好后,这些数据集又会被赋予相同的权重,开始新一轮的迭代。后续不断按此方式迭代,直到错误率降到某个阈值或者达到预设的迭代次数。
下面看两个例子,第一个偏理论,第二个偏实现,都有助于理解整个算法的流程和内在逻辑。
该例引用自:A Mathematical Explanation of AdaBoost in 5 Minutes。
如下图:有一个包含6个样本的数据集,共3个属性:x1,x2,x3,输出为Y。其中T代表True,F代表False。
Step1:初始化,给每个样本赋予相同的权重,即1/6:
Step2:使用上面的数据生成弱学习器stump:分别计算各个属性的基尼不纯度(gini impurity)。
$$ Gini\ Impurity = 1- Pr_{true}^2-Pr_{false}^2 $$
这里以$x_2$属性为例进行计算:
$$ Total \ Impurity(x_2) = 0.375*(\frac{1+3}{6})+0*(\frac{2+0}{6}) = 0.25 $$
同理可以计算出$x_1$和$x_3$的不纯度为:
$$ Total\ Impurity(x_1)=(1-(\frac{2}{2+2})^2-(\frac{2}{2+2})^2)*(\frac{2+2}{6}) \\ + (1-(\frac{1}{1+0})^2-(\frac{0}{1+0})^2)*(\frac{1+0}{6}) \\ = 0.33 \\ \\ Total\ Impurity(x_3)=(1-(\frac{0}{0+1})^2-(\frac{1}{0+1})^2)*(\frac{0+1}{6}) \\ + (1-(\frac{3}{3+2})^2-(\frac{2}{3+2})^2)*(\frac{3+2}{6}) \\ = 0.4 $$
可以看到$x_2$的基尼不纯度最低,所以使用它生成第一个stump。
Step3:计算“amount of say”。使用x2属性生成的stump会将第1个样本分错,其它都正确。这样的话total error就是1/6。所以:
$$ Amount\ of\ say = \frac{1}{2}*log{\frac{1-\frac{1}{6}}{\frac{1}{6}}} = 0.35 $$
Step4:计算下一个弱分类器(stump)的样本权重。
然后归一化得到下一个stump(即下一轮迭代)的样本及权重,如下图:
这样第一轮就结束了。第二轮迭代开始之前,要先根据样本权重选取训练数据,比如选择的样本可能如下(可以看到第一轮分类错误的样本这次被选了3次):
然后,将选中的样本权重初始化为相等的值,继续重复前面的过程:
直到训练误差足够小,或者弱分类器个数达到限制,则迭代终止。对所有弱分类器加权求和得到最终的强分类器。
上面的例子比较偏理论性,这个例子则是具体到代码实现层面的,限于篇幅和格式,就不在文章里面贴了。具体可以参见我的Github: implement-adaboost-from-scratch.ipynb。里面会根据上面的算法实现一个AdaBoost,并且最终和scikit-learn的AdaBoostClassifier
做对比。
该例参考自:Implementing the AdaBoost Algorithm From Scratch
AdaBoost算法的思想还是比较简单的,算法也容易理解,主要需要理解下面几个关键点:
掌握了这些,在做模型的调优以及一些超参设置上也就可以游刃有余了。
]]>决策树是一个非常简单的算法,至少其思想是非常简单的。生活中我们经常会使用,看几个例子。
场景1,母亲给女儿介绍男朋友,下面是二人的对话:
女儿:多大年纪了?
母亲:26。
女儿:长的帅不帅?
母亲:挺帅的。
女儿:收入高不?
母亲:不算很高,中等情况。
女儿:是公务员不?
母亲:是,在税务局上班呢。
女儿:那好,我去见见。
女儿通过年龄、长相、收入、是否是公务员将男人分为两个类别:见和不见。女孩选择见或不见的过程就是决策树决策的过程。假设女孩对男朋友的要求是:30岁以下、长相中等以上、高收入者或者中等收入以上的公务员。我们可以构造如下一个决策树:
其中,绿色节点表示判断条件,蓝色节点(叶子节点)表示决策结果,左右箭头称作分支。过去的专家系统往往就使用决策树。
再看一个例子,平面上有一些点,我们需要找到一个函数(曲线)把它们分开。如果是线性可分的情况,直觉上我们会画一条直线来切分平面,它的方程与x,y两个属性均有关,可以表示为:
而对于决策树,通常每次决策(平面划分)只与一个特征相关(x或y)。也就是说,我们只能画水平或竖直的线:
决策树同样适用于线性不可分的情况(并非最优划分):
接下来,我们使用下面的数据集看下构造决策树的一些关键点。该数据集有2个特征:f1和f2,然后label是是否属于鱼类,共5条样本:
能否在水中生存(f1) | 是否有脚蹼(f2) | 是否属于鱼类 | |
---|---|---|---|
1 | 是 | 是 | 是 |
2 | 是 | 是 | 是 |
3 | 是 | 否 | 否 |
4 | 否 | 是 | 否 |
5 | 否 | 是 | 否 |
如何构造根据f1、f2两个特征判断是否属于鱼类的决策树?下面是2种可能的决策树:
哪个更好?为什么?构造决策树时,需要确定在哪个特征/属性上面划分数据集,我们称该属性为分裂属性。如何确定分裂属性?
大原则:划分后,让无序数据变得更加有序。
那如何评估数据的有序程度呢?有两种:信息增益(Information Gain)和基尼不纯度(Gini Impurity)。
我们平时说“xxx事情包含的信息量很大”,直观感受就是这个事情的不确定性很大。其实有专门一门学科是专门研究信息的:信息论(这个课还是我大学时的专业课,当时觉得太理论,没什么意思,现在...唉)。这个学科的创始人香农(Claude Elwood Shannon)提出了量化一个系统包含信息量多少的概念——熵(Entropy),单位是比特(bit),它衡量的是随机变量的不确定性。其定义如下:
如果有一个系统S内存在多个事件$S = {E_1,...,E_n}$,每个事件的概率分布$P = {p_1, ..., p_n}$,则每个事件本身的信息为(单位是bit):$I_e=-log_2p_i$。
熵是信息的期望值,即整个系统的平均信息量:$H_s=\sum_{i=1}^n{p_iI_e}=-\sum_{i=1}^np_i{log_2p_i}$
举个例子,比如英语有26个字母,如果每个字母在文章中出现的次数均等的话,则在这篇文章中每个字母的信息量为:$I_e=-log_2\frac1{26}=4.7$。整个文章的熵为:$H_s=\sum_{i=1}^{26}\frac{1}{26}*4.7=\frac{1}{26}*4.7*26=4.7$。因为这里假设每个事件发生概率一样,所以单个事件信息量就等于整个系统的平均信息量。所以,熵描述的其实是随机变量的不确定性。对于确定的系统,熵为0。
那什么是信息增益?这就涉及到条件熵的概念:条件熵——在一个条件下,随机变量的不确定性。而信息增益就是“熵 - 条件熵”。表示在一个条件下,信息不确定性减少的程度。放到决策树这里,就是当选用某个特征划分数据集,系统前后信息发生的变化。计算公式为:$Gain_{split}=H(p)-\sum_{i=1}^k\frac{n_i}{n}H(i)$。即使用某个特征(split)划分数据集以后,得到的信息增益为划分前数据集的熵减去划分后的数据集的熵。下面以前面的鱼类为例,看具体如何计算:
划分前整个数据集为:{是,是,否,否,否} ,对应的熵为:$H=-\frac{2}{5}log_2\frac{2}{5}-\frac{3}{5}log_2\frac{3}{5}=0.97$
如果使用特征f1划分数据集,得到两个数据子集:
所以,按f1划分后获得的信息增益为:
$$ Gain_{f1}=H-(\frac{3}{5}H_{f1=是}+\frac{2}{5}H_{f1=否})=0.97-\frac{3}{5}*0.92-\frac{2}{5}*0=0.42 $$
同理,可以计算按照f2划分数据集以后得到的信息增益为:
$$ Gain_{f2}=H-(\frac{4}{5}H_{f1=是}+\frac{1}{5}H_{f1=否})=0.97-\frac{4}{5}*1-\frac{1}{5}*0=0.17 $$
通过对比,使用f1划分数据集,获得的信息增益大于使用f2划分,也就是使用f1划分使得系统的不确定性下降的更多,所以使用f1优于f2.
除了信息增益,还有一种常用的评价标准——基尼不纯度(Gini Impurity):将来自集合中的某种结果随机应用于集合中某一数据项的预期误差率。英文定义是这样的:
Gini impurity (named after Italian mathematician Corrado Gini) is a measure of how often a randomly chosen element from the set would be incorrectly labeled if it was randomly labeled according to the distribution of labels in the subset.
英文的定义其实更好理解一些,就是你随机从一个集合里面选择一个元素,然后根据这个集合的分布情况随机给这个数据选择一个类别,选择错误可能性的一个描述。维基百科给的计算公式如下:
其中$p_i$是选中第$i$个样本的概率。根据公式可以看到,基尼不纯度的取值范围是[0, 1)。当一个集合完全去定,即里面只有一种元素,则基尼不纯度为0,因为你随机选一个样本,再随机猜一个类别,肯定是不会错的,因为集合里面就只有一种样本;如果一个集合里面全是不同的元素(即混乱程度比较高),则基尼不纯度趋于1,也好理解,你随机选一个样本,随便猜一个种类,因为每个样本都不一样,当n趋于无穷大的时候,猜对的概率几乎为0。所以,不管是信息熵,还是基尼不纯度,衡量的都是一个集合的混乱程度。值越大,越混乱,包含的信息量也越大。对于决策树,使用基尼不纯度和熵的差别非常小:
以二分决策为例,此时p1+p2=1,因此:
对应的曲线图如下:
有了树的划分标准以后,就是根据特征进行递归划分数据集,满足下面任一条件,递归结束:
如果遍历完所有属性,类标签仍不唯一,一般采用多数表决的方法决定该叶子节点的分类。
决策树的一个缺点在于很容易过拟合,一般通过剪枝操作来解决改问题,根据剪枝的时机分为两种:
后剪枝(postpruning):在该方法中,初始决策树按照最大规模生长,然后进行剪枝的步骤,按照自底向上的方式修剪完全增长的决策树。修剪有两种方法:
两种方法各有优劣:与先剪枝相比,后剪枝倾向于产生更好的结果,因为不像先剪枝,后剪枝是根据完全增长的决策树做出的剪枝决策,先剪枝则可能过早终止了决策树的生长。然而,对于后剪枝,当子树被剪掉后,生长完全决策树的额外计算就被浪费了。
目前常见的决策树算法有:
ID3 (Iterative Dichotomiser 3) :该算法只能处理标称型数据集。我们之前构造决策树中使用的方法就是ID3算法,该算法使用信息增益作为分裂特征选取的标准。ID3算法可以归纳为以下几点:
C4.5:ID3的优化版本,主要有两个优化点:
其中除了CART使用基尼不纯度外,前面集中都是用的是信息增益作为选择分类属性的标准。下面看scikit-learn中决策树的一个例子:
from sklearn.datasets import load_iris
from sklearn import tree
import graphviz
iris = load_iris()
clf = tree.DecisionTreeClassifier()
clf = clf.fit(iris.data, iris.target)
dot_data = tree.export_graphviz(clf, out_file=None,
feature_names=iris.feature_names,
class_names=iris.target_names,
filled=True, rounded=True,
special_characters=True)
graph = graphviz.Source(dot_data)
graph.render("iris")
上面的代码生成的决策树如下:
目前已经很少有单独使用决策树作为最终算法模型的场景了,一般都会选取基于决策树的更好的集成算法。下面是决策树大致发展过程的一个概括:
后面的文章会介绍这些集成算法。
refs:
]]>最近准备整理一下之前关于集成学习的学习笔记,写一个关于集成学习的系列文章,毕竟目前用的比较多的机器学习算法基本都属于集成学习,整理一下,也算温习一下。有些笔记时间比较久了,里面的一些引用来源找不到了,所以有些引用可能附不全,敬请谅解。目前确定的几篇包括:
后面可能会根据时间补充一下其它一些现在也比较流行的算法,比如LightGBM。
本文是第一篇。
集成学习(ensemble learning)是将多个基学习器(base learners)进行集成,得到比每个单独基学习器更优的强学习器(strong learner)的方法。每个用于集成的基学习器都是弱学习器(weak learner),即性能只比随机猜测好一点点或只在某些方面表现好一点的学习器(classifiers that produce prediction that is slightly better than random guessing)。
那如何保证多个弱学习器集成在一起会变的更好,而不是更差呢?即如何实现“1+1>2”的效果呢?这对弱学习器提出了一些要求:
根据训练数据使用方法的不同,集成学习可以分为三种:
注意:
提升法的是通过改变训练数据的权重(或概率分布)来训练不同的弱分类器,然后组合为强分类器。下面是两个示意图:
Boosting的重点在于取新模型之长补旧模型之短来降低偏差(bias),尽可能获得无偏估计。
Bagging是Bootstrap Aggregation的缩写。Bootstrap也称为自助法,是一种有放回抽样方法。Bagging的的基本思想是对训练数据进行有放回抽样,每次抽样数据就训练一个模型,最终在这些模型上面取平均。具体算法如下:
- Step 1: Multiple subsets are created from the original data set with equal tuples, selecting observations with replacement.
- Step 2: A base model is created on each of these subsets.
- Step 3: Each model is learned in parallel from each training set and independent of each other.
- Step 4: The final predictions are determined by combining the predictions from all the models.
下面是一个示意图:
Bagging可以降低模型算法的方差,但并没有降低偏差的效果,所以也就没法提升预测的准确性,所以在选择弱分类器时要尽量选择偏差小的。
为什么Bagging能降低模型的方差?因为“如果对N个相互独立且方差相同的高斯分布取平均值,新分布的方差就会变成原始方差的 $1/N$”。Bagging采用独立有放回抽样得到N份数据,并训练得到N个模型,预测的时候最终会取这N个结果的平均(分类的话是取多数),这样就可以降低方差。
下面对Boosting和Bagging做了一些对比:
Boosting | Bagging | |
---|---|---|
样本选择 | 每一轮训练集不变,但权重根据上一轮进行调整 | 从原始数据集中通过独立、有放回抽样获得 |
样例权重 | 根据错误率不断调整,错误率越大的样本权重越大 | 使用均匀抽样,各个样本权重相同 |
模型权重 | 每个弱分类器都有相应的权重,分类误差越小的分类器权重越高 | 各个模型(弱分类器)权重相同 |
并行计算 | 各个分类器串行生成,因为后面的分类器需要前一轮的结果 | 各个分类器并行生成 |
目标 | 主要为了减小偏差 | 主要为了减小方差,并且解决过拟合问题 |
适用场景 | 分类器比较稳定(即方差比较小)和简单(即偏差比较大) | 分类器不稳定(即方差比较大),且偏差比较小的 |
代表算法 | AdaBoost、GBDT、XGBoost | 随机森林 |
除了提升法和装袋法之外,另一种知名度较低的集成方法是堆叠法。堆叠法(stacking)也叫堆叠泛化(stacked generalization),是层次化的集成方法,其思想和神经网络类似,只不过神经网络堆叠的对象是神经元和隐藏层,而集成方法堆叠的是同构或者异构的基学习器。
堆叠法先要用自助采样生成不同的数据子集,用数据子集训练第一层中不同的基学习器。第一层基学习器的输出再被送到第二层的元分类器(meta classifier)中作为输入,用来训练元分类器的参数。
堆叠法的思想和前两种方法有所不同。无论是提升法还是装袋法,其重点都落在单个模型的生成方式上,也就是如何训练出合适的基学习器,基学习器的形式一般是统一的。而堆叠法的重点在于如何将不同的基学习器的结果组合起来,研究的对象是让所有基学习器共同发挥出最大效果的组合策略。某种意义上说,堆叠法的训练数据不是原始的训练数据集,而是不同基学习器在训练数据集上的结果,起到的是模型平均(model averaging)的作用,提升法和装袋法都可以看成它的特例。正因如此,堆叠法除了被视为集成方法外,还可以看成是模型选择的一个手段。
以上这段摘自极客时间《机器学习40讲专栏》
最后补充介绍一下偏差和方差这两个重要的概念。
一个机器学习模型的误差可分为两类:
其中泛化误差又可以分为三部分:
用公式可表示为:
$$ generalization\_error = bias^2 + variance + noise $$
一般很难做到同时将偏差和方差都降到很低,只能在二者之间做权衡。下图中,靶心的红色是是预测正确的值,越往外,预测结果越差。四个图分别描述了误差和方差的高低情况:
上图来自:Understanding the Bias-Variance Tradeoff一文,关于偏差和方差的更多细节,也可以参考这篇文章。
]]>a = [['a', '1.2', '4.2'], ['b', '70', '0.03'], ['x', '5', '0']]
df = pd.DataFrame(a)
df.dtypes
# 0 object
# 1 object
# 2 object
# dtype: object
如何修改第2、3列的类型?扩展一下,如果有很多列的时候,如何高效的修改?
Pandas中主要有4种类型转换相关的方法:
to_numeric/to_datetime/to_timedelta
:可以参数转换为合适的对应类型。astype
infer_objects
convert_dtypes
to_numeric
将参数转换为合适的数值类型(float64/int64)。签名如下:
pandas.to_numeric(arg, errors='raise', downcast=None)
先看一些使用例子:
In [2]: s = pd.Series(["8", 6, "7.5", 3, "0.9"])
In [3]: s
Out[3]:
0 8
1 6
2 7.5
3 3
4 0.9
dtype: object
In [4]: pd.to_numeric(s)
Out[4]:
0 8.0
1 6.0
2 7.5
3 3.0
4 0.9
dtype: float64
可以使用apply()
方法批量转换DataFrame里面的列:
In [11]: df = pd.DataFrame([['1', '2', '3'],['4', '5', '6'],['7.1', '8.0', '9']], columns=['a','b', 'c'])
In [12]: df
Out[12]:
a b c
0 1 2 3
1 4 5 6
2 7.1 8.0 9
In [13]: df.dtypes
Out[13]:
a object
b object
c object
dtype: object
In [14]: df_1=df.apply(pd.to_numeric)
In [15]: df_1
Out[15]:
a b c
0 1.0 2.0 3
1 4.0 5.0 6
2 7.1 8.0 9
In [16]: df_1.dtypes
Out[16]:
a float64
b float64
c int64
dtype: object
也可以只对某些列进行转换:
In [18]: df[['a','b']]=df[['a','b']].apply(pd.to_numeric)
In [19]: df
Out[19]:
a b c
0 1.0 2.0 3
1 4.0 5.0 6
2 7.1 8.0 9
In [20]: df.dtypes
Out[20]:
a float64
b float64
c object
dtype: object
类型转换难免会产生错误,比如无法转换等,to_numeric
提供了一个参数errors
来让用户控制发生错误时如何处理,用有三个选项:
看一些例子:
In [21]: df=pd.DataFrame([['1','2'],['3','4'],['5','s']], columns=['a','b'])
In [22]: df
Out[22]:
a b
0 1 2
1 3 4
2 5 s
In [23]: df.dtypes
Out[23]:
a object
b object
dtype: object
In [24]: df.apply(pd.to_numeric)
ValueError: Unable to parse string "s" at position 2
In [25]: df.apply(pd.to_numeric, errors='coerce')
Out[25]:
a b
0 1 2.0
1 3 4.0
2 5 NaN
In [26]: df.apply(pd.to_numeric, errors='ignore')
Out[26]:
a b
0 1 2
1 3 4
2 5 s
to_numeric
默认会转换为float64或者int64,如果你想节省内存转换为小一些的类型的话,可以使用to_numeric提供的downcast
参数,可选值如下:
看一些例子:
In [29]: s = pd.Series(['1','2','-7'])
In [30]: s
Out[30]:
0 1
1 2
2 -7
dtype: object
In [31]: pd.to_numeric(s)
Out[31]:
0 1
1 2
2 -7
dtype: int64
In [32]: pd.to_numeric(s, downcast='integer')
Out[32]:
0 1
1 2
2 -7
dtype: int8
# 注意这里:因为 unsigned无法表示-7,所以这里实际没有发生downcast
In [33]: pd.to_numeric(s, downcast='unsigned')
Out[33]:
0 1
1 2
2 -7
dtype: int64
In [34]: pd.to_numeric(s, downcast='float')
Out[34]:
0 1.0
1 2.0
2 -7.0
dtype: float32
这里有2个注意点:
errors
参数对downcast这里是无效的。如果目标类型无法容纳被转换的值,就不会发生实际的转换。比如上面尝试转换为'unsigned'类型时,因为-7无法转换为unsigned,所以实际没有执行downcast。
to_datetime
把参数转换为datetime类型,相比于to_numeric
,函数原型复杂了一些。
pandas.to_datetime(arg, errors='raise', dayfirst=False, yearfirst=False, utc=None, format=None, exact=True, unit=None, infer_datetime_format=False, origin='unix', cache=True)
看一些使用例子:
# 可以使用这些关键字来构造表示时间日期的字典:[‘year’, ‘month’, ‘day’, ‘minute’, ‘second’, ‘ms’, ‘us’, ‘ns’]),复数也可以
In [35]: df = pd.DataFrame({'year': [2015, 2016], 'month': [2, 3], 'day':[4, 5]})
In [36]: df
Out[36]:
2016 3 5
In [37]: df.dtypes
Out[37]:
year int64
month int64
day int64
dtype: object
In [38]: pd.to_datetime(df)
Out[38]:
2016-03-05
dtype: datetime64[ns]
In [40]: pd.to_datetime(1490195805, unit='s')
Out[40]: Timestamp('2017-03-22 15:16:45')
In [41]: pd.to_datetime(1490195805433502912, unit='ns')
Out[41]: Timestamp('2017-03-22 15:16:45.433502912')
In [42]: pd.to_datetime("10/11/12", dayfirst=True)
Out[42]: Timestamp('2012-11-10 00:00:00')
In [43]: pd.to_datetime("10/11/12", yearfirst=True)
Out[43]: Timestamp('2010-11-12 00:00:00')
In [44]: pd.to_datetime("10/11/12", dayfirst=True, yearfirst=True)
Out[44]: Timestamp('2010-12-11 00:00:00')
`errors`字段含义同`to_numeric`:
看个例子:
In [46]: pd.Timestamp.min
Out[46]: Timestamp('1677-09-21 00:12:43.145225')
In [47]: pd.Timestamp.max
Out[47]: Timestamp('2262-04-11 23:47:16.854775807')
In [48]: pd.to_datetime('13000101', format='%Y%m%d', errors='raise')
OutOfBoundsDatetime: Out of bounds nanosecond timestamp: 1300-01-01 00:00:00
In [49]: pd.to_datetime('13000101', format='%Y%m%d', errors='ignore')
Out[49]: datetime.datetime(1300, 1, 1, 0, 0)
In [50]: pd.to_datetime('13000101', format='%Y%m%d', errors='coerce')
Out[50]: NaT
In [53]: pd.to_datetime('130000101', format='%Y%m%d', errors='ignore')
Out[53]: '130000101'
timedelta类型表示两个时间的绝对差值,to_timedelta
将参数转换为timedelta类型。方法签名如下:
pandas.to_timedelta(arg, unit=None, errors='raise')
unit
用于指定参数的单位,默认为ns
,合法的取值如下:
errors
的取值同to_datetime
。看几个使用例子:
In [55]: pd.to_timedelta('1 days 06:05:01.00003')
Out[55]: Timedelta('1 days 06:05:01.000030')
In [56]: pd.to_timedelta('15.5us')
Out[56]: Timedelta('0 days 00:00:00.000015500')
In [57]: pd.to_timedelta(['1 days 06:05:01.00003', '15.5us', 'nan'])
Out[57]: TimedeltaIndex(['1 days 06:05:01.000030', '0 days 00:00:00.000015500', NaT], dtype='timedelta64[ns]', freq=None)
In [58]: import numpy as np
In [59]: pd.to_timedelta(np.arange(5), unit='s')
Out[59]:
TimedeltaIndex(['0 days 00:00:00', '0 days 00:00:01', '0 days 00:00:02',
'0 days 00:00:03', '0 days 00:00:04'],
dtype='timedelta64[ns]', freq=None)
In [60]: pd.to_timedelta(np.arange(5), unit='d')
Out[60]: TimedeltaIndex(['0 days', '1 days', '2 days', '3 days', '4 days'], dtype='timedelta64[ns]', freq=None)
astype方法可以做任意类型的转换(当然未必能成功)。方法原型如下:
pd.DataFrame.astype(dtype, copy=True, errors='raise')
dtype
就是我们想要转换成的目标类型,可以使用Numpy的类型、Python的部分类型、Pandas特有的类型。copy
表示是否修改原数据。errors
可以取'raise'(失败是抛异常)或者'ignore'(失败时忽略并返回原值)。
看一些例子:
In [62]: df = pd.DataFrame({'col1': [1, 2], 'col2': [3, 4]})
In [63]: df.dtypes
Out[63]:
col1 int64
col2 int64
dtype: object
In [64]: df.astype('int32').dtypes
Out[64]:
col1 int32
col2 int32
dtype: object
In [65]: s = pd.Series([1, 3], dtype='int32')
In [66]: s
Out[66]:
0 1
1 3
dtype: int32
In [67]: s.astype('int64')
Out[67]:
0 1
1 3
dtype: int64
In [68]: s.astype('category')
Out[68]:
0 1
1 3
dtype: category
Categories (2, int64): [1, 3]
In [77]: s_date = pd.Series(['20220101', '20220102', '20220103'])
In [78]: s_date
Out[78]:
0 20220101
1 20220102
2 20220103
dtype: object
In [79]: s_date.astype('datetime64')
Out[79]:
0 2022-01-01
1 2022-01-02
2 2022-01-03
dtype: datetime64[ns]
In [81]: i = pd.Series([1,2,3])
In [82]: i
Out[82]:
0 1
1 2
2 3
dtype: int64
In [83]: i.astype(str)
Out[83]:
0 1
1 2
2 3
dtype: object
In [84]: s = pd.Series([1,2,-7])
In [85]: s.astype(np.int8)
Out[85]:
0 1
1 2
2 -7
dtype: int8
In [86]: s.astype(np.uint8)
Out[86]:
0 1
1 2
2 249
dtype: uint8
使用astype的时候,要注意范围,比如下面的转换不会报错,但不是我们想要的:
In [95]: s = pd.Series([1,2,-7])
# -7被转换为249
In [96]: s.astype(np.uint8)
Out[96]:
0 1
1 2
2 249
dtype: uint8
# 使用to_numeric就不会有问题
In [97]: pd.to_numeric(s, downcast='unsigned')
Out[97]:
0 1
1 2
2 -7
dtype: int64
pandas 0.21.0版本加入,会尝试推测类型。看个例子:
In [103]: df = pd.DataFrame({'a': [7, 1, 5], 'b': ['3','2','1']}, dtype='object')
In [104]: df.dtypes
Out[104]:
a object
b object
dtype: object
In [105]: df.infer_objects().dtypes
Out[105]:
a int64
b object
dtype: object
可以看到,infer_objects
其实还是比较“肤浅”的,如果要将'b'列也转换成数值型,可以使用前面介绍的方法。
convert_dtypes
会尝试将各列转换为最可能的类型,其特点是支持pd.NA
,方法签名如下:
DataFrame.convert_dtypes(infer_objects=True, convert_string=True, convert_integer=True, convert_boolean=True, convert_floating=True)
通过各个类型参数可以控制某些类型是否转换;infer_objects
为True时会尝试转换object类型为更具体的类型。看一些例子:
In [110]: df = pd.DataFrame(
...: {
...: 'a': pd.Series([1, 2, 3], dtype=np.dtype('int32')),
...: 'b': pd.Series(['x', 'y', 'z'], dtype=np.dtype('O')),
...: 'c': pd.Series([True, False, np.nan], dtype=np.dtype('O')),
...: 'd': pd.Series(['h', 'i', np.nan], dtype=np.dtype('O')),
...: 'e': pd.Series([10, np.nan, 20], dtype=np.dtype('float')),
...: 'f': pd.Series([np.nan, 100.5, 200], dtype=np.dtype('float')),
...: }
...: )
In [111]: df
Out[111]:
a b c d e f
0 1 x True h 10.0 NaN
1 2 y False i NaN 100.5
2 3 z NaN NaN 20.0 200.0
In [112]: df.dtypes
Out[112]:
a int32
b object
c object
d object
e float64
f float64
dtype: object
In [113]: dfn = df.convert_dtypes()
In [114]: dfn
Out[114]:
a b c d e f
0 1 x True h 10 NaN
1 2 y False i <NA> 100.5
2 3 z <NA> <NA> 20 200.0
In [115]: dfn.dtypes
Out[115]:
a Int32
b string
c boolean
d string
e Int64
f float64
dtype: object
End, that's all!
]]>本文简单梳理一下整个发展演进的过程。
Data Warehouse就是我们平时说的数据仓库(简称数仓),数仓最典型的代表就是MPP数据库。最原始的时候,数据量还不是很大,传统的单体数据库可以支撑平时的分析、决策、报表等需求。但随着后来应用的不断增多,数据量也激增,单体DB已经无法承载,于是便出现了数仓这种新型的架构。
数仓一般包含以下元素:
数仓的一些特点:
我觉得用一个不太准确但易于理解的描述就是数仓像是一个“分布式的数据库”,因为是分布式的,可以扩展节点,所以可以承载的数据量比以前大了。但它的灵魂依旧还是数据库,所以像传统单体DB中的一些特性在数仓中依旧存在,比如事务、模型(表结构)、隔离性等概念。简言之,数仓主要解决了传统单体DB无法承载越来越多的数据量的问题(当然还有一些其它功能)。
但随着技术的发展和业务需求的不断产生,数仓也开始暴露出一些问题:
于是,便出现了数据湖。
数据湖本质上可以看成是一个接近无限容量,且支持任何格式数据的廉价存储系统。像我们熟知的AWS S3、Azure Data Lake Storage (ADLS)、Google Cloud Storage (GCS)、阿里OSS、腾讯的COS等对象存储系统都可以认为是数据湖。
数据湖的特点是:
本来,数据湖的设计初衷是解决数仓容量和数据格式支持的不足,将所有格式的数据全部存储在数据湖里面,然后使用的时候直接使用湖里面的数据进行分析、查询、计算。但真正使用的时候,大家发现数据湖缺失了一些关键特性,导致湖里的数据无法直接使用。概括来说,主要存在三个方面的问题:
当然,还有其它一些问题,比如不支持事务、原子写、并发等。结果最终数据湖就变成了数据沼泽(“data swamps”):数据都扔进了数据湖,但无法直接使用。当真正需要使用的时候,还是要读出来放到其它地方(比如数仓)进行使用。但鉴于数仓又存在前面提到的问题,所以企业不得不同时维护一个数仓系统和一个数据湖系统,像极了计算领域的Lambda架构。但是同时维护两套系统的成本和复杂性是很高的,于是又出现了Data LakeHouse。
Data LakeHouse一种湖仓一体的新型架构:
可以看到,其实就是把互补的两套架构(数仓和数据湖)融合成了一个架构,这样只用维护一套系统,就可以解决所有问题。概括来说LakeHouse架构的主要特点有:
开放(Openness)
机器学习支持更好
低成本下更好的性能和可靠性
目前可以算得上是LakeHouse的开源系统有:Apache Hudi(Uber开源)、Apache Iceberg(Netflix开源)、Delta Lake(Databricks开源)。其中Delta Lake的这篇论文算是目前对Data LakeHouse架构的一个“标准定义”:Delta Lake: High-Performance ACID Table Storage over Cloud Object Stores。
Data Warehouse、Lake、LakeHouse对比:
Data warehouse | Data lake | Data lakehouse | |
---|---|---|---|
Data format | Closed, proprietary format | Open format | Open format |
Types of data | Structured data, with limited support for semi-structured data | All types: Structured data, semi-structured data, textual data, unstructured (raw) data | All types: Structured data, semi-structured data, textual data, unstructured (raw) data |
Data access | SQL-only, no direct access to file | Open APIs for direct access to files with SQL, R, Python and other languages | Open APIs for direct access to files with SQL, R, Python and other languages |
Reliability | High quality, reliable data with ACID transactions | Low quality, data swamp | High quality, reliable data with ACID transactions |
Governance and security | Fine-grained security and governance for row/columnar level for tables | Poor governance as security needs to be applied to files | Fine-grained security and governance for row/columnar level for tables |
Performance | High | Low | High |
Scalability | Scaling becomes exponentially more expensive | Scales to hold any amount of data at low cost, regardless of type | Scales to hold any amount of data at low cost, regardless of type |
Use case support | Limited to BI, SQL applications and decision support | Limited to machine learning | One data architecture for BI, SQL and machine learning |
不管是数仓,还是数据湖,亦或是现在的融合架构LakeHouse,都是为了解决不断发展和产生的业务需求而迭代产生的新架构和解决方案,特别是随着AI的发展,Machine Learning技术已经越来越成熟,慢慢已经成为数据分析的主要组成部分,所以现在新的架构在与AI生态的结合方面考虑的越来越多。目前LakeHouse正在快速发展,提供解决方案和一体化平台的商业公司也在逐渐增多,对于我们这些技术人,能不断见证和学习这些优秀的技术,也算是一件幸事和乐趣。
更多信息可参考下面引用部分的文章。
References:
Kafka定义了一个消费者组内分区分配的接口ConsumerPartitionAssignor
,该接口里面最核心的是assign
方法:
package org.apache.kafka.clients.consumer;
public interface ConsumerPartitionAssignor {
GroupAssignment assign(Cluster metadata, GroupSubscription groupSubscription);
// 省略其它
}
该方法的两个参数分别是当前集群信息metadata
和本组成员订阅的主题信息groupSubscription
,根据这两个信息计算分配方案GroupAssignment
。目前Kafka已经实现了几种具体的策略:
完整的UML图如下:
下面分别介绍。
该分配算法是逐个topic进行分配的(per-topic basis),对于每个topic:将分区按数值排序,将Consumer按member.id
(如果用户指定了group.instance.id
,则使用该id作为member.id
,否则随机生成)进行字典序排列。用分区数除以消费者个数得出每个Consumer应该分配的分区个数N(不能整除时向上取整),然后依次给每个Consumer一次分配N个分区(最后一个可能不足N个)。
比如现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2. 假设消费者排序后顺序为C0、C1,先开始分配topic t0,3个分区/2个Consumer等于1.5,向上取整为2,即每个Consumer分配2个分区,于是t0的分配结果为:
然后再分配topic t1,和上面同理,分配结果如下:
所以最终合并后的分配结果为:
Round Robin策略和Range的不同之处在于它是将所有topic的分区放在一起进行分配的。具体方式为:先将Consumer按member.id
进行排序,将所有分区按数值排序。然后将分区以round robin的方式依次(每次一次)分配给各个Consumer。如果Consumer订阅的topic有差异的话,在分配某个topic的Partition的时候,如果当前Consumer没有订阅该topic,就会跳过该Consumer。举两个例子:
例子1:组内Consumer订阅信息相同。假设现在组内有2个Consumer C0和C1,订阅了2个topic t0和t1,每个topic有3个分区,即:t0p0, t0p1, t0p2, t1p0, t1p1, t1p2。将这6个分区以round-robin的方式分配给C0和C1,分配结果为:
例子2:组内Consumer订阅信息不同。假设有3个Consumer C0、C1、C2;有3个topic t0、t1、t2,3个topic的分区数分别为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。其中C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2,则分配结果为:
StickyAssignor算法在分配时有2个重要的考虑点:
看2个例子。
例子1:假设有3个Consumer C0、C1、C2;有4个主题t0、t1、t2、t3,每个分区有2个分区,即所有分区为:t0p0, t0p1, t1p0, t1p1, t2p0, t2p1, t3p0, t3p1。现在所有Consumer都订阅了4个主题,则分配结果如下:
这个结果和前面RoundRobinAssignor的分配结果是一样的。但当发生重新分配的时候,就不一样了。假设,现在C1挂掉了,需要重新分配。如果是RoundRobinAssignor,重新分配后的结果如下:
但如果使用StickyAssignor的话,重新分配后的结果如下:
可以看到,StickyAssignor将C1的分区按照Round Robin的方式分配给了C0和C2,在保证均衡的前提下,最大限度的保留了原有分配方案。
例子2:假设有3个Consumer C0、C1、C2;有3个主题t0、t1、t2,分区数依次为1、2、3,即所有分区为:t0p0, t1p0, t1p1, t2p0, t2p1, t2p2。现在C0订阅了t0,C1订阅了t0、t1,C2订阅了t0、t1、t2。如果使用RoundRobin,前面已经展示过一次了,分配结果为:
但如果使用StickyAssignor的话,分配结果为:
此时,如果C0挂掉,RoundRobin重新分配后的结果为:
有3个分区分配没有变化,且分配不均匀。
但StickyAssignor重新分配的结果为:
有5个分区分配没有变化,且分配均匀。
该策略具体的分配方式和前面的StickyAssignor是一样的,但有一个重要的区别是该策略会使用Cooperative Rebalance,而StickyAssignor使用的则是Eager Rebalance,这两种Rebalance的区别参见我之前的文章,这里不再赘述。
就通用场景而言,进行分区分配的时候,一方面我们比较关注分配的均衡性;另一方面也会比较关注当发生Consumer Group Rebalance的时候,能否最大限度的保持原有的分配。从这两个角度来看:
group.instance.id
,这样相当于确定了Consumer的顺序,只要组内Consumer不变、订阅信息不变,就能有一个稳定的分配结果。而StickyAssignor和CooperativeAssignor则考虑了这点,但有一个注意点就是对于StickyAssignor,虽然会尽量保留原有的分配方案,但因为使用的是Eager Rebalance,所以在Rebalance的时候还是会回收所有分区,而CooperativeAssignor使用的是Cooperative Rebalance,所以只会回收有变化的分区。一般而言,建议新的系统(Kafka 2.4及之后版本)使用CooperativeAssignor。当然,我们也可以实现自己的PartitionAssignor。
]]>Kafka Consumer创建的时候都要指定一个组ID(group id),所有组ID一样的Consumer就组成了一个Consumer Group。对于一个Partition同一时刻只会分配给同一个Group内某一个Consumer,这就是大家熟知的Kafka消费模型。通过这个模型,Kafka的消费者(也就是应用/服务)可以很方便的实现Load Balance、HA和水平扩展。简单说这个模型就相当于现在有M个Partition,N个Consumer,然后把这M个Partition平均分配给N个Consumer,而且分配的时候有个限制条件:一个Partition只能分配给一个Consumer,Consumer Group Rebalance就是在需要的时候去做这个分配工作的,而且它的大原则是尽量保证Partition均衡的分配给组内各个Consumer。
那什么时候需要,或者说什么时候会发生Consumer Group Rebalance呢?看前面描述的职责就已经很明确了,当M或者N值发生变化的时候就需要Rebalance了,准确点就是:当Group内的Consumer个数或者Consumer订阅的Partition个数发生变化的时候就需要Rebalance了。下面列举一些常见的场景:
上面这些场景有些是主动的,有些是被动的,有些是无法避免的,有些是可以避免的,有些是正常的,有些是代码bug导致的...总之,当发现资源(Partition)有变更,需要重新分配以消除“贫富差距”的时候,就会发生Consumer Group Rebalance了。但是资源的分配不论是在现实世界,还是在分布式的世界中,都是一个难题。下面介绍Kafka是怎么做的。
实质上,Rebalance是一个抽象、通用的资源分配协议,它不光可以用于Partition这种资源的分配,在Kafka中,有多个地方都有使用:
网上关于这新老协议的细节讲述已经非常多了,这里就概括性的介绍一下。
如下图,Rebalance协议由2部分组成,一部分在Broker端,一部分在Client端:
这里注意一个细节就是一部分协议是在客户端的,而且用户可以按照约定好的协议进行自定义的实现,比如实现一个自己的资源分配方案,后面就会讲到。
下面还是以本文讨论的Consumer Group Rebalance的应用场景(即Partition资源的分配)来描述。对于每一个Consumer Group,都会有一个Coordinator节点(由某个Broker充当)负责这个Group的资源分配,也就是上面的Group Membership协议其实就是由这个Coordinator节点来实际运作的。假设现在新加入了一个Consumer,看下整个Rebalance过程的步骤:
FindCoordinator
请求,找到它所属的Group对应的Coordinator;JoinGroup
请求。该请求会携带客户端(即该Consumer)的一些用户配置(比如session.timeout.ms
、max.poll.interval.ms
)和一些元数据(比如订阅了哪些主题等)。JoinGroup
请求后,Coordinator通过心跳响应(Heartbeat
)响应通知组内其它成员要开始Rebalance了。然后其它Consumer像这个新加入的Consumer一样,也发送JoinGroup
请求给Coordinator。JoinGroup
请求以后,会给所有成员发送一个JoinGroup
响应。其中给Group Leader(加入组的第一个成员)发送的Response里面包含了成员信息、资源分配策略等元数据,其它成员则是一个空的Response。这个Leader拿到这些信息以后,本地计算出分配结果。SyncGroup
请求,Leader的请求中会包含自己计算的分配结果,其它成员则是空请求。SyncGroup
响应发送给各个成员。如果Consumer实现了ConsumerRebalanceListener
接口,则会调用该接口的onPartitionsAssignedMethod
方法。至此,整个Rebalance过程就结束了,这里再补充一些细节:
LeaveGroup
请求给Coordinator;如果是异常停止,Coordinator会通过心跳超时来判断Consumer已经没了。当然实际中,可能Consumer其实正常着,只是因为网络原因心跳超时了,或者Consumer里面没有及时调用poll
方法等。再放一个图(图片来自于引用文章From Eager to Smarter in Apache Kafka Consumer Rebalances,下同):
优化之前肯定要先分析清楚现有的问题,才能有针对性的进行优化。其实从前面的介绍我们已经很清楚,Rebalance要做的事情很简单:将M个资源(Partition/Task/Connector)平均分配给N个成员(Consumer/Instance/Worker),每个资源只能被一个成员拥有。事情本身不难,但难就难在需要在分布式环境中做这个分配工作。分布式环境中在任意时刻,网络可能分区、节点可能故障、还存在竞态条件(race condition),简单说就是分布式环境中无法实现可靠的通信,这让整个问题复杂化了。
前面介绍了现在的Rebalance开始的时候回收(revoke)所有成员的资源,然后大家一起参与Rebalance过程,等拿到新的资源分配方案,又重新开始工作。具体应用到Partition的分配,就是所有Consumer在发送JoinGroup
请求前需要停止从Partition消费,“上交”自己拥有的Partition。这样当Coordinator收到所有Consumer的JoinGroup
请求的时候,所有的Partition就处于未分配状态,此时整个系统达到了一个同步状态(Synchronization barrier):
所以,在重新分配之前,先回收所有资源其实是为了在不可靠的分布式环境中简化分配工作。然而,按现在这种方式,在分区被回收到收到新的分配方案之前,所有成员都无法工作,即“Stop The World”(借鉴了GC里面的概念),这也是Rebalance存在的最大的问题。默认Rebalance流程的超时时间为5分钟,也就是最差情况下,“Stop The World”效果可能持续5分钟。所以需要针对这个问题进行优化,思路也有两种:
社区在2.3版本中同时引入了两个优化方案:KIP-345: Static Membership和KIP-429: Kafka Consumer Incremental Rebalance Protocol分别按照上述两种思路进行优化,下面分别介绍。
Static Membership主要的优化目标是减少“闪断”场景导致的Rebalance,即解决的思路主要是尽量减少Rebalance的发生,我们看下是如何优化的。
在每次Rebalance的时候,Coordinator会随机给每个成员分配一个唯一的ID。然后当有新成员加入的时候,它的ID会是一个空字符串UNKNOWN_MEMBER_ID
,这样Coordinator就知道它是新加入的,需要进行Rebalance了。Static Membership方案是给Consumer增加了group.instance.id
选项,由用户负责设置以及保证唯一性,这个ID会替换原来由Coordinator每次Rebalance随机生成的ID(随机生成称之为“Dynamic Membership”),并且这个ID信息会加到JoinGroup
请求中。那这个ID有什么用呢?
举个例子:某一刻Consumer应用因为内存使用过高,被系统OOM Killer干掉了,然后很快又被守护进程或者人为启动起来的。这个时候,如果是以前的情况,Coordinator会认为是有新的Consumer加入,需要进行一轮Rebalance,但如果是Static Membership的情况下,Coordinator通过ID发现这个Consumer之前就有,就不会重新触发整个Rebalance,而是将缓存的之前分配给该Consumer的Partition直接返回给他,这样就一定程度上避免了因为闪断导致的Rebalance。
当然,这里我用了“闪断”,主要是想表达意外挂掉又很快恢复的情况,更具体点:
LeaveGroup
请求的场景。因为如果主动给Coordinator发送了LeaveGroup
请求的话,Coordinator会马上开始一轮Rebalance。session.timeout.ms
或者max.poll.interval.ms
时间内就恢复了,否则Coordinator会认为Consumer挂了,开始Rebalance。这里简单提一下这两个配置项。在0.10.0及之前的版本中,心跳是和poll在一个线程里面的,只有session.timeout.ms
一个参数。后来进行了优化拆分(KIP-62: Allow consumer to send heartbeats from a background thread),心跳是一个单独的线程,poll是一个线程,session.timeout.ms
仍然是心跳的超时时间,而max.poll.interval.ms
则是poll线程的超时时间。不管哪一个超时,Coordinator都会认为Consumer挂了,需要Rebalance。如果我们要使用Static Membership特性,需要给Consumer增加group.instance.id
设置。同时尽量将上面提到的超时时间设置的长一些。但显然弊端就是Consumer如果真的挂掉且无法恢复的话,Coordinator需要等较长一段时间才能发现,相当于牺牲了一定的可用性。果然没有免费的蛋糕。
不同于Static Membership,Incremental Cooperative Rebalancing的思路是尽量减少Rebalance中“Stop The World”的时间和范围。那怎么做的呢?有这么几个关键点:
JoinGroup
请求,但这次发送的时候资源并不会被回收(即不会停止工作),大家只是将自己目前拥有的资源信息加到元数据里面,发送给Coordinator。然后Coordinator把这些信息发送给Group Leader,Leader根据这些信息计算新的分配方案,计算的时候在保证均衡的情况下尽量对现有状态做最小改动(实际由实现的分配算法决定,默认的StickyAssianor策略就是这种),换句话说最核心的就是看哪些资源变更了成员,那就需要从原拥有者那里剔除这个资源,然后加到新的拥有者那里。SyncGroup
响应发送给各个成员。各个成员收到新的分配方案以后,会和自己的现状做对比,如果没有变化或者只是新增了资源,则不需要额外做什么。但如果发现有资源被回收,则继续Rebalance的流程,接下来的流程和老版本的协议几乎一样,也需要回收资源,并发送JoinGroup
请求,但这里仅回收需要被回收的资源。比如某个ConsumerRebalance之前拥有1、3、5三个分区,Rebalance中重新计算的新方案里面是1、3两个分区,则只回收5。可以看到Incremental Cooperative Rebalancing是将原有的Rebalance流程进行了细化(分成了多轮),延迟了资源回收的时间和范围,改进后的Rebalance流程如下图:
那如何使用Incremental Cooperative Rebalancing呢?通过配置项partition.assignment.strategy
进行配置,可以配置多个,越靠前优先级越高。前面提到了Rebalance协议分两部分,这里配置的其实就是客户端“Client Embedded Protocol”的实现类。2.8版本中已经支持的有:
我们也可以通过实现org.apache.kafka.clients.consumer.ConsumerPartitionAssignor接口来实现自定义的Assignor。如果想使用Incremental Cooperative Rebalancing,就配置最后一个CooperativeStickyAssignor即可。不同Assignor的细节本文就不展开了,另外规划了一篇文章《Kafka的消费者分区分配策略》。更多关于Incremental Cooperative Rebalancing的细节,可以参考本文引用部分的文章:
Kafka中的Rebalance本质上是解决分布式环境中资源分配的一种通用协议,由于分布式环境的复杂性,无法实现一个完美的方案,只能根据具体的场景进行有针对性的优化。比如实际中“闪断”是引起Rebalance的一种很常见且无法避免的原因,所以就有针对性的增加了Static Membership方案。另外Rebalance很严重的一个问题就是会“Stop The World”,然而实际中Rebalance的时候其实往往只需要变更极少量的资源所属权,所以就提出了Incremental Cooperative Rebalance方案,减少了Rebalance过程中“Stop The World”的时间和影响范围。好的架构不是设计出来的,而是进化而来的,Kafka Rebalance优化的脚步仍在继续。
另外,尽管现在已经做了诸多优化,效果也比较明显,但Rebalance仍然算是一个代价比较大的操作,实际应用的时候,我们还是要能避免的就避免。
References:
]]>有时我们会碰到网络是通畅的,但却连不上Kafka,特别是在多网卡环境或者云环境上很容易出现,这个其实和Kafka的监听配置有关系。本文介绍监听相关的配置,目前监听相关的参数主要有下面几个:
listeners
advertised.listeners
listener.security.protocol.map
inter.broker.listener.name
security.inter.broker.protocol
advertised.host.name
advertised.port
host.name
其中最重要的就是listeners
和advertised.listeners
:集群启动时监听listeners配置的地址,并将advertised.listeners
配置的地址写到Zookeeper里面,作为集群元数据的一部分。我们可以将客户端(生产者/消费者)连接Kafka集群进行操作的过程分成2步:
listeners
配置的连接信息(ip/host)连接到某个Broker(broker会定期获取并缓存zk中的元数据信息),获取元数据中advertised.listeners
配置的地址信息。advertised.listeners
连接信息和Kafka集群通信(读/写)。所以在存在内外网隔离的虚拟化环境中(比如Docker、公有云),外部客户端经常会出现可以连接到Kafka(第1步),但发送/消费数据时报连接超时(第2步),就是因为listeners
配置的是外网地址,而advertised.listeners
配置的却是内网地址。那这几个参数该如何配置呢?
先看连接信息的配置格式:{listener名字}://{HOST/IP}:{PORT}
。HOST/IP、PORT很清楚,主要是这个“listener名字”字段。要理解这个得了解listener.security.protocol.map
这个配置项:它的用途是配置listener名字和协议的映射(所以它是一个key-value的map),key是“listener名字”,value是“协议名称”,其默认值是“listener名字”和“协议名称”一样。有点绕,举个例子,比如:PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,冒号前面是key,即协议名字;后面是value,即协议名称。listener名字我们可以随便起,而协议名称则是固定可枚举的一个范围。所以如果我们自定义了listener名字,那就需要显式的设置其对应的协议名。
inter.broker.listener.name
和security.inter.broker.protocol
都是用于配置Broker之间通信的,前者配置名称(即listener.security.protocol.map
中的key),后者配置协议(即listener.security.protocol.map
中的value),默认值是PLAINTEXT
。这两个配置项同时只能配置一个。
为什么一个连接要搞这么复杂呢?主要是为了各种不同的场景需求。下面举一个复杂一点的应用场景进行说明。比如我们在一个公有云上面部署了一个Kafka集群,该环境有一个外网地址external_hostname
和一个内网地址internal_hostname
;且在内部中是无法获取外网地址的(公有云大多都是这样的)。然后想实现内部客户端访问集群时走内部地址,且不需要加密;而外部客户端访问时则走外部地址,且需要加密。要实现这个需求,可以对集群进行如下配置:
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://{internal_hostname}:19092,EXTERNAL://{external_hostname}:9092
inter.broker.listener.name=INTERNAL
其实更进一步,我们还可以通过可选的control.plane.listener.name参数单独定制集群Controller节点与其他Broker节点的连接,那配置信息就变为:
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROL:SSL
listeners=INTERNAL://0.0.0.0:19092,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://{internal_hostname}:19092,EXTERNAL://{external_hostname}:9092,CONTROL://{control_ip}:9094
inter.broker.listener.name=INTERNAL
control.plane.listener.name=CONTROL
最后给出这些配置项的默认值和一些注意事项:
listeners
如果不显式的配置,那会监听所有网卡,相当于配置了0.0.0.0。该配置项里面listeners名字和端口都必须是唯一的,不能重复。advertised.listeners
如果不配置,默认使用listeners
配置的值。如果listeners
也没有显式配置,则使用java.net.InetAddress.getCanonicalHostName()
获取的IP地址。如果listeners
配置的是0.0.0.0,则必须显式的配置advertised.listeners
,因为这个配置项必须是一个具体的地址,不允许是0.0.0.0(因为客户端无法根据这个地址连接到Broker)。另外,advertised.listeners
中的端口允许重复。listeners
和advertised.listeners
,有多个地址的时候,每一个地址都必须按照{listener名字}://{HOST/IP}:{PORT}
格式进行配置,多个地址用英文逗号分隔。java.net.InetAddress.getCanonicalHostName()
,有时使用IP会出现访问不通的情况。总结:listeners
地址是用于首次连接的;advertised.listeners
的地址是会写到zk里面,客户端通过listeners地址建立连接获取该地址信息,然后通过该地址和集群交互。所以对于客户端,这2个地址必须都是可以访问的才可以。