Skip to content

Commit

Permalink
First end to end working PIC
Browse files Browse the repository at this point in the history
  • Loading branch information
sboeschhuawei committed Jan 24, 2015
1 parent 0700335 commit e5df2b8
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark.mllib.clustering

import org.apache.log4j.Logger
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

object PIClustering {

private val logger = Logger.getLogger(getClass.getName())
type DVector = Array[Double]

type DEdge = Edge[Double]
Expand All @@ -44,26 +46,73 @@ object PIClustering {
val LA = PICLinalg
val RDDLA = RDDLinalg

def cluster(sc: SparkContext,
points: Points,
nClusters: Int,
nIterations: Int = DefaultIterations,
sigma: Double = DefaultSigma,
minAffinity: Double = DefaultMinAffinity) = {
/**
*
* @param sc
* @param points
* @param nClusters
* @param nIterations
* @param sigma
* @param minAffinity
* @return Tuple of (Seq[(Cluster Id,Cluster Center)],
* Seq[(VertexId, ClusterID Membership)]
*/
def run(sc: SparkContext,
points: Points,
nClusters: Int,
nIterations: Int = DefaultIterations,
sigma: Double = DefaultSigma,
minAffinity: Double = DefaultMinAffinity)
: (Seq[(Int, Vector)], Seq[(VertexId, Int)]) = {
val vidsRdd = sc.parallelize(points.map(_._1).sorted)
val nVertices = points.length

val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
println(s"Vt(0)=${LA.printVector(initialVt.map{_._2}.toArray)}")
if (logger.isDebugEnabled) {
logger.debug(s"Vt(0)=${
LA.printVector(initialVt.map {
_._2
}.toArray)
}")
}
val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
println(RDDLA.printMatrixFromEdges(G.edges))
if (logger.isDebugEnabled) {
logger.debug(RDDLA.printMatrixFromEdges(G.edges))
}
val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations)
// TODO: avoid local collect and then sc.parallelize.
val localVect = vt.map{Vectors.dense(_)}
val vectRdd = sc.parallelize(localVect)
val model = KMeans.train(vectRdd, 3, 10)
(model, gUpdated, lambda, vt)
val localVt = vt.collect.sortBy(_._1).map(_._2)
val vectRdd = sc.parallelize(localVt.map(Vectors.dense(_)))
// TODO: what to set nRuns
val nRuns = 10
vectRdd.cache()
val model = KMeans.train(vectRdd, nClusters, nRuns)
vectRdd.unpersist()
if (logger.isDebugEnabled) {
logger.debug(s"Eigenvalue = $lambda EigenVector: ${localVt.mkString(",")}")
}
val estimates = vidsRdd.zip(model.predict(sc.parallelize(localVt.map {
Vectors.dense(_)
})))
if (logger.isDebugEnabled) {
logger.debug(s"lambda=$lambda eigen=${localVt.mkString(",")}")
}
val ccs = (0 until model.clusterCenters.length).zip(model.clusterCenters)
if (logger.isDebugEnabled) {
logger.debug(s"Kmeans model cluster centers: ${ccs.mkString(",")}")
}
val pointsMap = Map(points: _*)
val estCollected = estimates.collect.sortBy(_._1)
if (logger.isDebugEnabled) {
// val clusters = estCollected.map(_._2)
// logger.debug(s"Cluster Estimates: ${estCollected.mkString(",")} "
// val counts = Map(estCollected:_*).groupBy(_._1).mapValues(_.size)
// + s" Counts: ${counts.mkString(",")}")
logger.debug(s"Cluster Estimates: ${estCollected.mkString(",")}")
}
(ccs, estCollected)
}

def createInitialVector(sc: SparkContext,
Expand Down Expand Up @@ -93,13 +142,11 @@ object PIClustering {

}

val printMatrices = true

def getPrincipalEigen(sc: SparkContext,
G: DGraph,
nIterations: Int = DefaultIterations,
optMinNormChange: Option[Double] = None
): (DGraph, Double, DVector) = {
): (DGraph, Double, VertexRDD[Double]) = {

var priorNorm = Double.MaxValue
var norm = Double.MaxValue
Expand All @@ -120,36 +167,43 @@ object PIClustering {
ctx.sendToDst(ctx.attr * ctx.dstAttr)
},
_ + _)
println(s"tmpEigen[$iter]: ${tmpEigen.collect.mkString(",")}\n")
if (logger.isDebugEnabled) {
logger.debug(s"tmpEigen[$iter]: ${tmpEigen.collect.mkString(",")}\n")
}
val vnorm =
prevG.vertices.map{ _._2}.fold(0.0) { case (sum, dval) =>
prevG.vertices.map {
_._2
}.fold(0.0) { case (sum, dval) =>
sum + Math.abs(dval)
}
println(s"vnorm[$iter]=$vnorm")
if (logger.isDebugEnabled) {
logger.debug(s"vnorm[$iter]=$vnorm")
}
outG = prevG.outerJoinVertices(tmpEigen) { case (vid, wval, optTmpEigJ) =>
val normedEig = optTmpEigJ.getOrElse {
println("We got null estimated eigenvector element");
-1.0
} / vnorm
println(s"Updating vertex[$vid] from $wval to $normedEig")
if (logger.isDebugEnabled) {
logger.debug(s"Updating vertex[$vid] from $wval to $normedEig")
}
normedEig
}
prevG = outG

if (printMatrices) {
if (logger.isDebugEnabled) {
val localVertices = outG.vertices.collect
val graphSize = localVertices.size
print(s"Vertices[$iter]: ${localVertices.mkString(",")}\n")
}
normVelocity = vnorm - priorNorm
normAccel = normVelocity - priorNormVelocity
println(s"normAccel[$iter]= $normAccel")
if (logger.isDebugEnabled) {
logger.debug(s"normAccel[$iter]= $normAccel")
}
priorNorm = vnorm
priorNormVelocity = vnorm - priorNorm
}
(outG, vnorm, outG.vertices.collect.map {
_._2
})
(outG, vnorm, outG.vertices)
}

def scalarDot(d1: DVector, d2: DVector) = {
Expand Down Expand Up @@ -179,9 +233,9 @@ object PIClustering {
val arr = toks.slice(1, toks.length).map(_.toDouble)
(toks(0).toLong, arr)
}.toSeq
println(s"Read in ${vertices.length} from $verticesFile")
// println(vertices.map { case (x, arr) => s"($x,${arr.mkString(",")})"}
// .mkString("[", ",\n", "]"))
if (logger.isDebugEnabled) {
logger.debug(s"Read in ${vertices.length} from $verticesFile")
}
vertices
}

Expand Down Expand Up @@ -224,18 +278,26 @@ object PIClustering {
(ix, vect)
}
}, nVertices)
println(s"Affinity:\n${LA.printMatrix(affinityRddNotNorm.collect.map(_._2._2),
nVertices, nVertices)}")
if (logger.isDebugEnabled) {
logger.debug(s"Affinity:\n${
LA.printMatrix(affinityRddNotNorm.collect.map(_._2._2),
nVertices, nVertices)
}")
}
val rowSums = affinityRddNotNorm.map { case (ix, (vid, vect)) =>
vect.foldLeft(0.0){ _ + _}
vect.foldLeft(0.0) {
_ + _
}
}
val materializedRowSums = rowSums.collect
val similarityRdd = affinityRddNotNorm.map { case (rowx, (vid, vect)) =>
(vid, vect.map {
_ / materializedRowSums(rowx)
})
}
println(s"W:\n${LA.printMatrix(similarityRdd.collect.map(_._2), nVertices, nVertices)}")
if (logger.isDebugEnabled) {
logger.debug(s"W:\n${LA.printMatrix(similarityRdd.collect.map(_._2), nVertices, nVertices)}")
}
(similarityRdd, materializedRowSums)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ object RDDLinalg {

def printMatrixFromEdges(edgesRdd: EdgeRDD[_]) = {
val edgec = edgesRdd.collect
assert(edgec.size < 1e3,"Let us not print a large graph")
// assert(edgec.size < 1e3,"Let us not print a large graph")
val sorted = edgec.sortWith { case (e1, e2) =>
e1.srcId < e2.srcId || (e1.srcId == e2.srcId && e1.dstId <= e2.dstId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.scalatest.FunSuite

import scala.util.Random


/**
* Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped
Expand Down Expand Up @@ -96,7 +98,7 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
// assert(LA.compareVectors(graphInitialVt, initialVtVect))
}
val (g2, norm, eigvect) = PIC.getPrincipalEigen(sc, G, nIterations)
println(s"lambda=$norm eigvect=${eigvect.mkString(",")}")
println(s"lambda=$norm eigvect=${eigvect.collect.mkString(",")}")
}
}

Expand All @@ -111,12 +113,11 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
withSpark { sc =>
val vertices = PIC.readVerticesfromFile(vertFile)
val nVertices = vertices.length
val (model, graph, lambda, eigen) = PIC.cluster(sc, vertices, nClusters,
val (ccenters, estimates) = PIC.run(sc, vertices, nClusters,
nIterations, sigma)
val collectedRdd = eigen // .collect
println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}")
println(s"Eigenvalue = $lambda EigenVectors:\n${LA.printMatrix(collectedRdd, nClusters, nVertices)}")
// println(s"Eigenvalues = ${lambdas.mkString(",")} EigenVectors:\n${printMatrix(collectedEigens, nClusters, nVertices)}")
// val collectedRdd = eigen.collect.map{_._2}
// println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}")
// println(s"Eigenvalue = $lambda EigenVectors:\n${LA.printMatrix(collectedRdd, nClusters, nVertices)}")
}
}

Expand All @@ -125,13 +126,14 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
}
def concentricCirclesTest() = {
val sigma = 1.0
val nIterations = 10
val nClusters = 3
val nIterations = 20
val circleSpecs = Seq(
// Best results for 30 points
CircleSpec(Point(0.0,0.0), 0.03, .1, 3),
CircleSpec(Point(0.0,0.0), 0.3, .03, 12),
CircleSpec(Point(0.0,0.0), 1.0, .01, 15)
CircleSpec(Point(0.0,0.0), 1.0, .01, 15),
CircleSpec(Point(0.0,0.0), 1.5, .005, 25),
CircleSpec(Point(0.0,0.0), 2.0, .002, 40)

// DECENT
// CircleSpec(Point(0.0,0.0), 0.1, .1, 5),
Expand All @@ -143,22 +145,17 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
// CircleSpec(Point(0.0,0.0), 1.0, .03, 25),
// CircleSpec(Point(0.0,0.0), 2.5, .01, 60)
)
val nClusters = circleSpecs.size
val cdata = createConcentricCirclesData(circleSpecs)
withSpark { sc =>
val vertices = createConcentricCirclesData(circleSpecs).zipWithIndex.map { case (p, ix) =>
(ix.toLong, Array(p.x, p.y))
}
val vertices = new Random().shuffle(cdata.map { p =>
(p.label, Array(p.x, p.y))
})

val nVertices = vertices.length
val (model, graph, lambda, eigen) = PIC.cluster(sc, vertices, nClusters,
nIterations, sigma)
val collectedRdd = eigen // .collect
// println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}")
println(s"Eigenvalue = $lambda EigenVector: ${collectedRdd.mkString(",")}")
val estimates = model.predict(sc.parallelize(eigen.map{Vectors.dense(_)}))
println(s"lambda=$lambda eigen=${eigen.mkString(",")}")
println(s"Kmeans model cluster centers: ${model.clusterCenters.mkString(",")}")
// println(s"Eigenvalues = ${lambdas.mkString(",")} EigenVectors:\n${printMatrix(collectedEigens, nClusters, nVertices)}")
println(s"Cluster Estimates=:${estimates.collect.mkString(",")}")
val (ccenters, estCollected) = PIC.run(sc, vertices, nClusters, nIterations)
println(s"Cluster centers: ${ccenters.mkString(",")} " +
s"Estimates: ${estCollected.mkString(",")}")
}
}

Expand All @@ -184,37 +181,45 @@ object PIClusteringSuite {

def pdoub(d: Double) = f"$d%1.6f"

case class Point(x: Double, y: Double) {
override def toString() = s"(${pdoub(x)},${pdoub(y)})"
case class Point(label: Long, x: Double, y: Double) {
def this(x: Double, y: Double) = this(-1L, x, y)
override def toString() = s"($label, (${pdoub(x)},${pdoub(y)}))"
}
object Point {
def apply(x: Double, y: Double) = new Point(-1L, x, y)
}

case class CircleSpec(center: Point, radius: Double, noiseToRadiusRatio: Double,
nPoints: Int, uniformDistOnCircle: Boolean = true)
def createConcentricCirclesData(circleSpecs: Seq[CircleSpec]) = {
import org.apache.spark.mllib.random.StandardNormalGenerator
val normalGen = new StandardNormalGenerator
var idStart = 0
val circles = for (csp <- circleSpecs) yield {
idStart += 1000
val circlePoints = for (thetax <- 0 until csp.nPoints) yield {
val theta = thetax * 2 * Math.PI / csp.nPoints
val (x,y) = ( csp.radius * Math.cos(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio),
csp.radius * Math.sin(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio))
Point(x,y)
(Point(idStart+thetax, x,y))
}
circlePoints
}
val points = circles.flatten
val points = circles.flatten.sortBy(_.label)
println(printPoints(points))
points
}

def printPoints(points: Seq[Point]) = {
val sorted = points.sortWith { case (p1, p2) =>
if (LA.withinTol(p1.y-p2.y)) {
p1.x <= p2.x
} else {
p1.y >= p2.y
}
}
sorted.mkString("["," , ","]")
// val sorted = points.sortWith { case (p1, p2) =>
// if (LA.withinTol(p1.y-p2.y)) {
// p1.x <= p2.x
// } else {
// p1.y >= p2.y
// }
// }
// sorted.mkString("["," , ","]")
points.mkString("["," , ","]")
}

def createAffinityMatrix() = {
Expand Down

0 comments on commit e5df2b8

Please sign in to comment.