Skip to content

Commit

Permalink
First end to end working version: but has bad performance issue
Browse files Browse the repository at this point in the history
  • Loading branch information
sboeschhuawei committed Jan 23, 2015
1 parent 32a90dc commit 0700335
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object PICLinalg {
sb.toString
}

def printVect(dvect: DVector) = {
def printVector(dvect: DVector) = {
dvect.mkString(",")
}

Expand Down Expand Up @@ -186,7 +186,7 @@ object PICLinalg {
val signum = Math.signum(dot(mat(0), eigen))
val lambda = dot(mat(0), eigen) / eigen(0)
eigen = eigen.map(_ * signum)
println(s"lambda=$lambda eigen=${printVect(eigen)}")
println(s"lambda=$lambda eigen=${printVector(eigen)}")
if (expLambda.length > 0) {
val compareVect = eigen.zip(expdat(k)).map { case (a, b) => a / b}
println(s"Ratio to expected: lambda=${lambda / expLambda(k)} " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import org.apache.spark.graphx._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.rdd.RDD

/**
* SpectralClusteringWithGraphx
*
*/
object PIClustering {

type DVector = Array[Double]
Expand All @@ -46,6 +42,7 @@ object PIClustering {
val DefaultMinAffinity = 1e-11

val LA = PICLinalg
val RDDLA = RDDLinalg

def cluster(sc: SparkContext,
points: Points,
Expand All @@ -57,26 +54,18 @@ object PIClustering {

val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
println(s"Vt(0)=${LA.printVector(initialVt.map{_._2}.toArray)}")
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)

val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G)
println(RDDLA.printMatrixFromEdges(G.edges))
val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations)
// TODO: avoid local collect and then sc.parallelize.
val localVect = vt.map{Vectors.dense(_)}
val vectRdd = sc.parallelize(localVect)
val model = KMeans.train(vectRdd, 3, 10)
(model, gUpdated, lambda, vt)
}

/*
vnorm[0]=2.019968019268192
Updating vertex[0] from 0.2592592592592593 to 0.2597973189724011
Updating vertex[1] from 0.19753086419753088 to 0.1695805301675885
Updating vertex[3] from 0.2654320987654321 to 0.27258531045499795
Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227
*/
def createInitialVector(sc: SparkContext,
labels: Seq[VertexId],
rowSums: Seq[Double]) = {
Expand Down Expand Up @@ -163,12 +152,6 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227
})
}

// def printGraph(G: DGraph) = {
// val collectedVerts = G.vertices.collect
// val nVertices = collectedVerts.length
// val msg = s"Graph Vertices:\n${printMatrix(collectedVerts, nVertices, nVertices)}"
// }
//
def scalarDot(d1: DVector, d2: DVector) = {
Math.sqrt(d1.zip(d2).foldLeft(0.0) { case (sum, (d1v, d2v)) =>
sum + d1v * d2v
Expand Down Expand Up @@ -224,11 +207,8 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227

def createNormalizedAffinityMatrix(sc: SparkContext, points: Points, sigma: Double) = {
val nVertices = points.length
val rowSums = for (bcx <- 0 until nVertices)
yield sc.accumulator[Double](bcx, s"ColCounts$bcx")
val affinityRddNotNorm = sc.parallelize({
val ivect = new Array[IndexedVector](nVertices)
var rsum = 0.0
for (i <- 0 until points.size) {
ivect(i) = new IndexedVector(points(i)._1, new DVector(nVertices))
for (j <- 0 until points.size) {
Expand All @@ -238,24 +218,25 @@ Updating vertex[2] from 0.2777777777777778 to 0.29803684040501227
0.0
}
ivect(i)._2(j) = dist
rsum += dist
}
rowSums(i) += rsum
}
ivect.zipWithIndex.map { case (vect, ix) =>
(ix, vect)
}
}, nVertices)
println(s"Affinity:\n${LA.printMatrix(affinityRddNotNorm.collect.map(_._2._2),
nVertices, nVertices)}")
val materializedRowSums = rowSums.map{ _.value}
val affinityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
val rowSums = affinityRddNotNorm.map { case (ix, (vid, vect)) =>
vect.foldLeft(0.0){ _ + _}
}
val materializedRowSums = rowSums.collect
val similarityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
(vid, vect.map {
_ / materializedRowSums(rowx)
})
}
println(s"W:\n${LA.printMatrix(affinityRdd.collect.map(_._2), nVertices, nVertices)}")
(affinityRdd, materializedRowSums)
println(s"W:\n${LA.printMatrix(similarityRdd.collect.map(_._2), nVertices, nVertices)}")
(similarityRdd, materializedRowSums)
}

def norm(vect: DVector): Double = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.clustering

import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.{EdgeRDD, Edge, VertexId}
import org.apache.spark.mllib.clustering.{PICLinalg => LA}
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partitioner, SparkContext}
Expand Down Expand Up @@ -204,4 +204,12 @@ object RDDLinalg {
vertices.map { case (vid, dval) => s"($vid,$dval)"}.mkString(" , ")
}

def printMatrixFromEdges(edgesRdd: EdgeRDD[_]) = {
val edgec = edgesRdd.collect
assert(edgec.size < 1e3,"Let us not print a large graph")
val sorted = edgec.sortWith { case (e1, e2) =>
e1.srcId < e2.srcId || (e1.srcId == e2.srcId && e1.dstId <= e2.dstId)
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
}
def concentricCirclesTest() = {
val sigma = 1.0
val nIterations = 50
val nIterations = 10
val nClusters = 3
val circleSpecs = Seq(
// Best results for 30 points
Expand Down

0 comments on commit 0700335

Please sign in to comment.