Spark - DataFrame UDF返回多个col

  • 使用Array

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    def addClusteringCols(featureVecSepDf: DataFrame, model: KMeansModel): DataFrame = {
    val clusteringUdf = udf((featureVec: Vector) => {
    //(label, distance)
    val rtv = KpiClusteringThreshold.calDist2Center(featureVec, model.clusterCenters)
    Array(rtv._1, rtv._2) // same type
    })
    featureVecSepDf.withColumn("udfResult", clusteringUdf(col($(featureVecCol)))).cache
    .withColumn("label", col("udfResult")(0))
    .withColumn("distance", col("udfResult")(1))
    .drop("udfResult")
    }
  • 使用Tuple

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    val calUdf = udf((r: Row) => {
    val chi2 = KpiChiSquareThreshold.calChiSquareValue(baseline, r.toSeq.toList.asInstanceOf[List[Long]])
    if (chi2 > kpiChiSquareThr)
    (1, Math.abs(chi2 - kpiChiSquareThr), if (Math.abs(chi2 - kpiChiSquareThr) / kpiChiSquareThr > 1) 1.0 else Math.abs(chi2 - kpiChiSquareThr) / kpiChiSquareThr)
    else
    (0, 0.0, 0.0)
    })
    df = df.withColumn("detect", calUdf(struct(featureList.map(f => col(f)): _*)))
    .withColumn($(outputCol), col("detect._1"))
    .withColumn($(absDevCol), col("detect._2"))
    .withColumn($(relDevCol), col("detect._3"))
    .drop("detect")

Spark - 过滤DataFrame

使用when

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def filterDataframe(df: DataFrame, featureCols: Array[String],
diskrwInfo: (String, String, Double),
timestampInfo: (String, Int)): DataFrame = {
var newDf = df.withColumn("invalidFlag", lit(true))

// 特征不能都接近0
for (c <- featureCols) {
newDf = newDf.withColumn("invalidFlag", when(col(c) > 0.001, false)
.otherwise(col("invalidFlag")))
}

// 磁盘读写之和不能小于阈值
newDf = newDf.withColumn("invalidFlag", when(col(diskrwInfo._1) + col(diskrwInfo._2) < diskrwInfo._3, true)
.otherwise(col("invalidFlag")))

// 时间戳不能早于阈值
newDf = newDf.withColumn("invalidFlag", when(col(timestampInfo._1) % (24 * 60 * 60 * 1000) < timestampInfo._2 * 60 * 60 * 1000, true)
.otherwise(col("invalidFlag")))

newDf = newDf.filter("invalidFlag = false").drop(col("invalidFlag"))
newDf
}

使用UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def filterOutliers(featureVecDf: DataFrame, quantileThrArr: Array[Double]): DataFrame = {
val filterUdf = udf((featureVec: Vector) => {
var flag = false
for (i <- quantileThrArr.indices) {
if (featureVec(i) > quantileThrArr(i))
flag = true
}
flag
})
val newDf = featureVecDf.withColumn("flag", filterUdf(col($(featureVecCol))))
.filter("flag = false")
.drop(col("flag"))

newDf
}

使用filter,多条件

1
featureDf = featureDf.filter(featureList.map(f => col(f) >= 0.001).reduce(_ or _))

Spark - 动态获取UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
def getPreprocessFunc(meanList: List[Double], stdList: List[Double]): Vector => Vector = (featureVec: Vector) => {
var featureListBuffer = new ListBuffer[Double]()
for (i <- 0 until featureVec.size){
if (stdList(i) > 0)
featureListBuffer += (featureVec(i) - meanList(i)) / stdList(i)
else
featureListBuffer += featureVec(i)
}
Vectors.dense(featureListBuffer.toList.toArray)
}

val preprocessUdf = udf(getPreprocessFunc(meanList, stdList))
df.withColumn("featureVecCol", preprocessUdf(col("featureVecCol")))

Spark - 将DataFrame中的Vector col切分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def splitVectorCol(df: DataFrame, append: Boolean = false): DataFrame = {
lazy val first = df.first()(0)
val numAttrs = first.asInstanceOf[Vector].size
val attrs = Array.tabulate(numAttrs)(n => "feature_" + n)

val vecToArray = udf((featureVec: Vector) => featureVec.toArray)
val dfArr = df.withColumn("featureArr", vecToArray(col(df.schema.head.name)))

// Create a SQL-like expression using the array
val sqlExpr = attrs.zipWithIndex.map { case (alias, idx) => col("featureArr").getItem(idx).as(alias) }

// Extract Elements from dfArr
if (append)
dfArr.select(col("*") +: sqlExpr : _*)
else
dfArr.select(sqlExpr: _*)
}

卡方值检验两序列是否来自同一分布

卡方检验常用于检查某序列是否属于某分布。
计算(整体)卡方值,如果卡方值较大,则认为不属于同一分布。
$$\begin{aligned} \chi^{2} = \sum \frac{(A-E)^{2}}{E} = \sum_{i=1}^{k}\frac{(A_{i}-E_{i})^{2}}{E_{i}} \end{aligned}$$
其中,$A_{i}$为各点(指标)出现频次,$E_{i}$为期望。

检验两序列是否来自同一分布的常用场景为:一个序列为训练集累加获取的基线序列(因为是对分布情况的分析,累加即可无需平均);而另一个序列为待检验的测试序列。
期望使用两序列频次求和后平均来计算。
对各点求算两序列的卡方值(之和),较大的点为异常指标点。
也可直接对序列的所有点卡方值累加,比较各点卡方值之和,如果检测序列的各点sum值较大,则认为检测序列整体上分布于训练模板不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# -*- coding: UTF-8 -*-
import numpy as np
import pandas as pd


def calc_chi2_value(data):
S = data.values.sum()
# SA = data.iloc[:, 0].sum()
# SB = data.iloc[:, 1].sum()
# A、B出现错误码总数
SA, SB = data.sum()

# A、B出现错误码总数比例
PA = SA / (S * 1.0)
PB = SB / (S * 1.0)

# 各错误码出现总数list
Si = data.sum(axis=1)

# A、B各错误码出现总数预期
EAi = Si * PA
EBi = Si * PB

E = np.hstack((EAi.values.reshape(EAi.shape[0], 1), EBi.values.reshape(EBi.shape[0], 1)))

# 对A、B各点计算卡方值
chis = (data - E) * (data - E) / E

# 各个错误码
CHIi = np.nansum(chis, axis=1)

chi_values = pd.DataFrame(CHIi, columns=["value"], index=data.index)

return chi_values


class Chi2:

def __init__(self, topk=10):
self.topk = topk
self.base_value = None

"""
the base matrix.

data: one column dataframe
"""
def train(self, data):
self.base_value = data

"""
the data matrix

data: one column dataframe
return the chi2 values for each sample.
"""
def predict(self, data):
ch_matrix = pd.concat([self.base_value, data], axis=1)
return calc_chi2_value(ch_matrix)

"""
the data matrix

data: one column dataframe
return the chi2 values for each sample and the index of the topk
"""
def predict_topk(self, data):
chi_values = self.predict(data)
ranks = chi_values['value'].rank(ascending=False)
topk_index = ranks <= self.topk
return chi_values, topk_index


if __name__ == "__main__":
df = pd.DataFrame(np.arange(20).reshape((10, 2)))

chi2 = Chi2(2)
chi2.train(pd.DataFrame([1, 2, 3, 4, 5]))

chi_values, ranks = chi2.predict_topk(pd.DataFrame([12, 4, 6, 8, 10]))
print(chi_values)
print(ranks)