Spark - 动态获取UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
def getPreprocessFunc(meanList: List[Double], stdList: List[Double]): Vector => Vector = (featureVec: Vector) => {
var featureListBuffer = new ListBuffer[Double]()
for (i <- 0 until featureVec.size){
if (stdList(i) > 0)
featureListBuffer += (featureVec(i) - meanList(i)) / stdList(i)
else
featureListBuffer += featureVec(i)
}
Vectors.dense(featureListBuffer.toList.toArray)
}

val preprocessUdf = udf(getPreprocessFunc(meanList, stdList))
df.withColumn("featureVecCol", preprocessUdf(col("featureVecCol")))