搜索
您的当前位置:首页正文

spark进行svd降维和kmeans聚类

来源:筏尚旅游网
import jieba 
import jieba.analyse 
import jieba.posseg as pseg 
from pyspark import SparkConf, SparkContext,SQLContext 
from pyspark.ml.feature import Word2Vec,CountVectorizer 

import pandas as pd 
from pyspark.ml.clustering import KMeans 
from pyspark.ml.feature import HashingTF, IDF, Tokenizer 
from pyspark.mllib.linalg.distributed import RowMatrix 
from pyspark.sql import Row 
from pyspark.ml.feature import VectorAssembler 
from pyspark.mllib.util import MLUtils 

conf = SparkConf().setAppName("cluster") 
sc = SparkContext(conf=conf) 
sqlContext=SQLContext(sc) 
#my_df 加载数据 
spark_df = sqlContext.createDataFrame(my_df) 

#计算tfidf 
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures") 
cvmodel =cv.fit(spark_df); 
cvResult= cvmodel.transform(spark_df); 
idf = IDF(inputCol="rawFeatures", outputCol="features") 
idfModel = idf.fit(cvResult)  
cvResult = idfModel.transform(cvResult) 


ddf = MLUtils.convertVectorColumnsFromML(cvResult, 'features') 
ddf=ddf.select('features').rdd.map(lambda row : row[0]) 

mat = RowMatrix(ddf) 
#奇异值分解 
svd = mat.computeSVD(k=60, computeU=True) 
#转成dataframe格式 
svd_u = svd.U.rows.map(lambda row : row.tolist()) 
svd_df = sqlContext.createDataFrame(svd_u) 
#kmeans聚类 
kmeans = KMeans().setK(60).setSeed(1) 
vecAssembler = VectorAssembler(inputCols=svd_df.schema.names, outputCol='features') 
svd_df = vecAssembler.transform(svd_df) 
#聚类结果 
c_result = svd_df.select('features') 
model = kmeans.fit(c_result) 
results = model.transform(svd_df)

因篇幅问题不能全部显示,请点此查看更多更全内容

Top