Skip to content

Commit

Permalink
Added visualization/plotting of input/output data
Browse files Browse the repository at this point in the history
  • Loading branch information
sboeschhuawei committed Jan 25, 2015
1 parent e5df2b8 commit 9294263
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class KMeans private (
}

// Compute squared norms and cache them.
val collected = data.collect()
val norms = data.map(Vectors.norm(_, 2.0))
norms.persist()
val zippedData = data.zip(norms).map { case (v, norm) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

package org.apache.spark.mllib.clustering

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.mllib.clustering.PICLinalg.DMatrix
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.FunSuite

import scala.util.Random
Expand All @@ -29,7 +29,7 @@ import scala.util.Random
* Provides a method to run tests against a {@link SparkContext} variable that is correctly stopped
* after each test.
* TODO: import this from the graphx test cases package i.e. may need update to pom.xml
*/
*/
trait LocalSparkContext {
/** Runs `f` on a new SparkContext and ensures that it is stopped afterwards. */
def withSpark[T](f: SparkContext => T) = {
Expand All @@ -50,16 +50,81 @@ trait LocalSparkContext {
*/
class PIClusteringSuite extends FunSuite with LocalSparkContext {

import org.apache.spark.mllib.clustering.PIClusteringSuite._

val PIC = PIClustering
val LA = PICLinalg
val RDDLA = RDDLinalg
val A = Array

test("concentricCirclesTest") {
concentricCirclesTest()
}

def concentricCirclesTest() = {
val sigma = 1.0
val nIterations = 20

val circleSpecs = Seq(
// Best results for 30 points
CircleSpec(Point(0.0, 0.0), 0.03, .1, 3),
CircleSpec(Point(0.0, 0.0), 0.3, .03, 12),
CircleSpec(Point(0.0, 0.0), 1.0, .01, 15),
// Add following to get 100 points
CircleSpec(Point(0.0, 0.0), 1.5, .005, 30),
CircleSpec(Point(0.0, 0.0), 2.0, .002, 40)

)
val nClusters = circleSpecs.size
val cdata = createConcentricCirclesData(circleSpecs)
withSpark { sc =>
val vertices = new Random().shuffle(cdata.map { p =>
(p.label, Array(p.x, p.y))
})

val nVertices = vertices.length
val (ccenters, estCollected) = PIC.run(sc, vertices, nClusters, nIterations)
println(s"Cluster centers: ${ccenters.mkString(",")} " +
s"Estimates: ${estCollected.mkString(",")}")
}
}

test("irisData") {
irisData()
}

def irisData() = {
import org.apache.spark.mllib.linalg._
import scala.io.Source
val irisRaw = Source.fromFile("data/mllib/iris.data").getLines.map(_.split(","))
val iter: Iterator[(Array[Double], String)] = irisRaw.map { toks => (toks.slice(0, toks.length - 1).map {
_.toDouble
}, toks(toks.length - 1))
}
withSpark { sc =>
val irisRdd = sc.parallelize(iter.toSeq.map { case (vect, label) =>
(Vectors.dense(vect), label)
})
val irisVectorsRdd = irisRdd.map(_._1).cache()
val irisLabelsRdd = irisRdd.map(_._2)
val model = KMeans.train(irisVectorsRdd, 3, 10, 1)
val pred = model.predict(irisVectorsRdd).zip(irisLabelsRdd)
val predColl = pred.collect
irisVectorsRdd.unpersist()
predColl
}

}

def saveToMatplotLib(dmat: DMatrix, optLegend: Option[Array[String]], optLabels: Option[Array[Array[String]]]) = {
import breeze.plot._

}

test("graphxSingleEigen") {
graphxSingleEigen
}

import org.apache.spark.mllib.clustering.PIClusteringSuite._
def graphxSingleEigen() = {

val (aMat, dxDat1) = createAffinityMatrix()
Expand All @@ -78,9 +143,13 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
val edgesRddCollected = edgesRdd.collect()
println(s"edges=${edgesRddCollected.mkString(",")}")
val rowSums = aMat.map { vect =>
vect.foldLeft(0.0){ _ + _ }
vect.foldLeft(0.0) {
_ + _
}
}
val initialVt = PIC.createInitialVector(sc, affinityRdd.map{_._1}.collect, rowSums )
val initialVt = PIC.createInitialVector(sc, affinityRdd.map {
_._1
}.collect, rowSums)
val G = PIC.createGraphFromEdges(sc, edgesRdd, nVertices, Some(initialVt))
val printVtVectors: Boolean = true
if (printVtVectors) {
Expand All @@ -95,7 +164,7 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
}.toArray
println(s"graphInitialVt=${graphInitialVt.mkString(",")}\n"
+ s"initialVt=${initialVt.mkString(",")}")
// assert(LA.compareVectors(graphInitialVt, initialVtVect))
// assert(LA.compareVectors(graphInitialVt, initialVtVect))
}
val (g2, norm, eigvect) = PIC.getPrincipalEigen(sc, G, nIterations)
println(s"lambda=$norm eigvect=${eigvect.collect.mkString(",")}")
Expand All @@ -105,6 +174,7 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
test("fifteenVerticesTest") {
fifteenVerticesTest()
}

def fifteenVerticesTest() = {
val vertFile = "../data/graphx/new_lr_data.15.txt"
val sigma = 1.0
Expand All @@ -115,64 +185,21 @@ class PIClusteringSuite extends FunSuite with LocalSparkContext {
val nVertices = vertices.length
val (ccenters, estimates) = PIC.run(sc, vertices, nClusters,
nIterations, sigma)
// val collectedRdd = eigen.collect.map{_._2}
// println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}")
// println(s"Eigenvalue = $lambda EigenVectors:\n${LA.printMatrix(collectedRdd, nClusters, nVertices)}")
// val collectedRdd = eigen.collect.map{_._2}
// println(s"DegreeMatrix:\n${LA.printMatrix(collectedRdd, nVertices, nVertices)}")
// println(s"Eigenvalue = $lambda EigenVectors:\n${LA.printMatrix(collectedRdd, nClusters, nVertices)}")
}
}

test("concentricCirclesTest") {
concentricCirclesTest()
}
def concentricCirclesTest() = {
val sigma = 1.0
val nIterations = 20
val circleSpecs = Seq(
// Best results for 30 points
CircleSpec(Point(0.0,0.0), 0.03, .1, 3),
CircleSpec(Point(0.0,0.0), 0.3, .03, 12),
CircleSpec(Point(0.0,0.0), 1.0, .01, 15),
CircleSpec(Point(0.0,0.0), 1.5, .005, 25),
CircleSpec(Point(0.0,0.0), 2.0, .002, 40)

// DECENT
// CircleSpec(Point(0.0,0.0), 0.1, .1, 5),
// CircleSpec(Point(0.0,0.0), 1.0, .1, 15),
// CircleSpec(Point(0.0,0.0), 2.5, .1, 30)

// GOOD but big (90 points)
// CircleSpec(Point(0.0,0.0), 0.1, .1, 5),
// CircleSpec(Point(0.0,0.0), 1.0, .03, 25),
// CircleSpec(Point(0.0,0.0), 2.5, .01, 60)
)
val nClusters = circleSpecs.size
val cdata = createConcentricCirclesData(circleSpecs)
withSpark { sc =>
val vertices = new Random().shuffle(cdata.map { p =>
(p.label, Array(p.x, p.y))
})
// test("testLinearFnGenerator") {
// val PS = PolySpec
// val dr = new DRange(0.0, 5.0)
// val polyInfo = A(PS(3.0, 2.0, -1.0)
// val noiseRatio = 0.1
// val l = List(1,2,3)
// l.scanLeft(
// }

val nVertices = vertices.length
val (ccenters, estCollected) = PIC.run(sc, vertices, nClusters, nIterations)
println(s"Cluster centers: ${ccenters.mkString(",")} " +
s"Estimates: ${estCollected.mkString(",")}")
}
}

// test("testLinearFnGenerator") {
// val PS = PolySpec
// val dr = new DRange(0.0, 5.0)
// val polyInfo = A(PS(3.0, 2.0, -1.0)
// val noiseRatio = 0.1
// val l = List(1,2,3)
// l.scanLeft(
// }

def printMatrix(darr: Array[Double], numRows: Int, numCols: Int): String =
LA.printMatrix(darr, numRows, numCols)

def printMatrix(darr: Array[Array[Double]], numRows: Int, numCols: Int): String =
LA.printMatrix(darr, numRows, numCols)
}

object PIClusteringSuite {
Expand All @@ -183,14 +210,17 @@ object PIClusteringSuite {

case class Point(label: Long, x: Double, y: Double) {
def this(x: Double, y: Double) = this(-1L, x, y)

override def toString() = s"($label, (${pdoub(x)},${pdoub(y)}))"
}

object Point {
def apply(x: Double, y: Double) = new Point(-1L, x, y)
}

case class CircleSpec(center: Point, radius: Double, noiseToRadiusRatio: Double,
nPoints: Int, uniformDistOnCircle: Boolean = true)

def createConcentricCirclesData(circleSpecs: Seq[CircleSpec]) = {
import org.apache.spark.mllib.random.StandardNormalGenerator
val normalGen = new StandardNormalGenerator
Expand All @@ -199,9 +229,9 @@ object PIClusteringSuite {
idStart += 1000
val circlePoints = for (thetax <- 0 until csp.nPoints) yield {
val theta = thetax * 2 * Math.PI / csp.nPoints
val (x,y) = ( csp.radius * Math.cos(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio),
val (x, y) = (csp.radius * Math.cos(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio),
csp.radius * Math.sin(theta) * (1 + normalGen.nextValue * csp.noiseToRadiusRatio))
(Point(idStart+thetax, x,y))
(Point(idStart + thetax, x, y))
}
circlePoints
}
Expand All @@ -211,15 +241,15 @@ object PIClusteringSuite {
}

def printPoints(points: Seq[Point]) = {
// val sorted = points.sortWith { case (p1, p2) =>
// if (LA.withinTol(p1.y-p2.y)) {
// p1.x <= p2.x
// } else {
// p1.y >= p2.y
// }
// }
// sorted.mkString("["," , ","]")
points.mkString("["," , ","]")
// val sorted = points.sortWith { case (p1, p2) =>
// if (LA.withinTol(p1.y-p2.y)) {
// p1.x <= p2.x
// } else {
// p1.y >= p2.y
// }
// }
// sorted.mkString("["," , ","]")
points.mkString("[", " , ", "]")
}

def createAffinityMatrix() = {
Expand All @@ -229,19 +259,7 @@ object PIClusteringSuite {
A(0.8, 0.7, 0, 0.75),
A(0.9, .5, .75, 0)
)
// val asize=10
// val dat1 = toMat(for (ix <- 0 until asize;
// for cx<- 0 until asize)
// yield {
// cx match {
// case _ if cx < 2 => 10
// case _ if cx <= 7 => 0.5
// case _ => cx * 0.2
// }
// }, asize, asize)


println(s"Input mat: ${LA.printMatrix(dat1, 4,4)}")
println(s"Input mat: ${LA.printMatrix(dat1, 4, 4)}")
val D = /*LA.transpose(dat1)*/ dat1.zipWithIndex.map { case (dvect, ix) =>
val sum = dvect.foldLeft(0.0) {
_ + _
Expand All @@ -263,8 +281,6 @@ object PIClusteringSuite {

def main(args: Array[String]) {
val pictest = new PIClusteringSuite
// pictest.graphxSingleEigen()
// pictest.fifteenVerticesTest()
pictest.concentricCirclesTest()
}
}
80 changes: 80 additions & 0 deletions python/pyspark/mllib/matplot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import numpy as np
import matplotlib.pyplot as plt

class Plot:
def plotCircles(self):
# LABELS=self.getLabels()
(labels,x,y) = ([], [], [])
for (label, (xx,yy)) in self.getCirclePoints():
labels.append(int(label))
x.append(float(xx))
y.append(float(yy))

colors = ['b', 'g', 'r','y','w', 'm']
norm = plt.normalize()
norm.autoscale(x)
# norm.autoscale(y)
for (plabel, x, y) in zip(labels,x, y):
xcolor = colors[-1 + int(plabel / 1000)]
lines = plt.plot(x, y, 'x',markersize=3,color=xcolor)
plt.annotate(str(plabel),xy=(x,y), xytext= (-5,5),
textcoords='offset points',
arrowprops=dict(facecolor=xcolor, shrink=0.01),
horizontalalignment='right', verticalalignment='bottom',
fontsize=6
)
# plt.plot(x, y, 'o', markersize=1,color=colors[-1 + (plabel / 1000)], label= '%s' %plabel )
# plt.plot(x, y)
# plt.plot(x, y, color='y') # , markersize=1, label= '%s' %plabel )

plt.show()
plt.savefig('inputs.png')

def plotOutputs(self):
(labels,locs,clusters) = ([], [], [])
(centers_sorted, out_points) = self.getOutputPoints()
center_index_to_magnitude_map = {cx : cm for (cx,(cm,cloc)) in centers_sorted}
for ((label, loc), cluster) in out_points:
labels.append(label)
locs.append(loc)
clusters.append(cluster)

colors = ['b', 'g', 'r','y','c', 'm']
markers = ['s', '+', 'x','8','D', '*']
norm = plt.normalize()
norm.autoscale(locs)
# norm.autoscale(y)
for (ix, (plabel, loc, cluster)) in enumerate(zip(labels,locs, clusters)):
xcolor = colors[-1 + int(plabel / 1000)]
clusterx = center_index_to_magnitude_map[cluster]
xmarker = markers[clusterx]
lines = plt.plot(ix, loc, xmarker,markersize=3,color=xcolor)
plt.yscale('linear')
plt.annotate(str(plabel),xy=(ix,loc), xytext= (-5,5),
textcoords='offset points',
arrowprops=dict(facecolor=xcolor, shrink=0.01),
horizontalalignment='right', verticalalignment='bottom',
fontsize=6
)
# plt.plot(x, y, 'o', markersize=1,color=colors[-1 + (plabel / 1000)], label= '%s' %plabel )
# plt.plot(x, y)
# plt.plot(x, y, color='y') # , markersize=1, label= '%s' %plabel )

plt.show()
plt.savefig('outputs.png')

def getCirclePoints(self):
fillInLabelsHere=[(1000, (0.026055,0.000000)) , (1001, (-0.015593,0.023425)) , (1002, (-0.012923,-0.024027)) , (2000, (0.300923,0.000000)) , (2001, (0.270132,0.148135)) , (2002, (0.157354,0.261961)) , (2003, (0.000000,0.287528)) , (2004, (-0.163370,0.250285)) , (2005, (-0.257946,0.153550)) , (2006, (-0.290993,0.000000)) , (2007, (-0.241891,-0.148736)) , (2008, (-0.152099,-0.273338)) , (2009, (-0.000000,-0.313627)) , (2010, (0.140641,-0.269290)) , (2011, (0.267766,-0.145981)) , (3000, (1.000656,0.000000)) , (3001, (0.917339,0.413769)) , (3002, (0.664959,0.738677)) , (3003, (0.305250,0.931993)) , (3004, (-0.103926,0.988312)) , (3005, (-0.495637,0.866338)) , (3006, (-0.813345,0.578460)) , (3007, (-0.994044,0.205409)) , (3008, (-0.970741,-0.209993)) , (3009, (-0.812726,-0.587868)) , (3010, (-0.502153,-0.867390)) , (3011, (-0.105587,-1.000996)) , (3012, (0.309040,-0.939486)) , (3013, (0.676942,-0.740258)) , (3014, (0.922195,-0.403860)) , (4000, (1.500370,0.000000)) , (4001, (1.455509,0.372520)) , (4002, (1.313174,0.724842)) , (4003, (1.103429,1.023959)) , (4004, (0.808768,1.264479)) , (4005, (0.463024,1.435827)) , (4006, (0.093931,1.489670)) , (4007, (-0.282205,1.468860)) , (4008, (-0.641969,1.357055)) , (4009, (-0.960632,1.153854)) , (4010, (-1.208709,0.881284)) , (4011, (-1.398515,0.558539)) , (4012, (-1.480977,0.186267)) , (4013, (-1.466877,-0.188276)) , (4014, (-1.384225,-0.551507)) , (4015, (-1.210292,-0.878939)) , (4016, (-0.954331,-1.158034)) , (4017, (-0.637888,-1.365072)) , (4018, (-0.281502,-1.485258)) , (4019, (0.093529,-1.517232)) , (4020, (0.462337,-1.435428)) , (4021, (0.802200,-1.273476)) , (4022, (1.088346,-1.022671)) , (4023, (1.303522,-0.726381)) , (4024, (1.441486,-0.376954)) , (5000, (2.006090,0.000000)) , (5001, (1.975017,0.312498)) , (5002, (1.903007,0.616796)) , (5003, (1.779725,0.906926)) , (5004, (1.615806,1.176478)) , (5005, (1.418842,1.412193)) , (5006, (1.176238,1.618327)) , (5007, (0.907240,1.778774)) , (5008, (0.616737,1.907389)) , (5009, (0.312334,1.972686)) , (5010, (0.000000,2.003958)) , (5011, (-0.312175,1.978076)) , (5012, (-0.617802,1.906119)) , (5013, (-0.910539,1.783043)) , (5014, (-1.176869,1.610774)) , (5015, (-1.413218,1.413486)) , (5016, (-1.612863,1.172439)) , (5017, (-1.777011,0.909389)) , (5018, (-1.897253,0.618963)) , (5019, (-1.973228,0.312265)) , (5020, (-1.997124,0.000000)) , (5021, (-1.972173,-0.312524)) , (5022, (-1.904784,-0.617654)) , (5023, (-1.784558,-0.908745)) , (5024, (-1.618230,-1.172448)) , (5025, (-1.417072,-1.417323)) , (5026, (-1.173193,-1.611568)) , (5027, (-0.906398,-1.779951)) , (5028, (-0.619644,-1.903364)) , (5029, (-0.314077,-1.973233)) , (5030, (-0.000000,-2.001369)) , (5031, (0.312326,-1.985733)) , (5032, (0.617108,-1.897669)) , (5033, (0.905477,-1.784064)) , (5034, (1.178242,-1.623806)) , (5035, (1.410244,-1.418794)) , (5036, (1.625406,-1.176519)) , (5037, (1.774561,-0.906173)) , (5038, (1.897196,-0.618358)) , (5039, (1.969360,-0.311903))]
return fillInLabelsHere

def getOutputPoints(self):
centers = [(0,[2.9860909099353196E-4]),(1,[0.10256078249751793]),(2,[0.03543295315856783]),(3,[0.005922820396093594]),(4,[0.1126278976630788])]
# centers = [(0,2.9860909099353196E-4),(1,0.10256078249751793),(2,0.03543295315856783),(3,0.005922820396093594),(4,0.1126278976630788)]
centers_sorted = sorted(centers,key = lambda c: c[1])
centers_sorted_ix = list(enumerate(centers_sorted))
opoints = [((1000,[0.11255703573044015]),4),((1001,[0.11258575254117632]),4),((1002,[0.11257078842887869]),4),((2000,[0.10171707839616172]),1),((2001,[0.10290373550340648]),1),((2002,[0.10213543105489717]),1),((2003,[0.10392631471722016]),1),((2004,[0.10232694874744065]),1),((2005,[0.10251591585124924]),1),((2006,[0.10127088683120573]),1),((2007,[0.10253071346571023]),1),((2008,[0.1033873501355303]),1),((2009,[0.10252830585275843]),1),((2010,[0.10227294667030125]),1),((2011,[0.1025608138347488]),1),((3000,[0.035906883751883376]),2),((3001,[0.03504061712471225]),2),((3002,[0.03578644371373917]),2),((3003,[0.03652384259635414]),2),((3004,[0.03460755093576597]),2),((3005,[0.035531248714107226]),2),((3006,[0.034675624652207575]),2),((3007,[0.035952072191425356]),2),((3008,[0.03632290316860608]),2),((3009,[0.03719884210586558]),2),((3010,[0.034909360099880986]),2),((3011,[0.03451626245224591]),2),((3012,[0.036276532353686705]),2),((3013,[0.034499068006321326]),2),((3014,[0.03444715655157359]),2),((4000,[0.0063409978898199965]),0),((4001,[0.005641897862833211]),0),((4002,[0.0061989986546385325]),0),((4003,[0.005732189674461753]),0),((4004,[0.00583169713457494]),0),((4005,[0.006124006606283006]),0),((4006,[0.006096865495117659]),0),((4007,[0.005843993649928398]),0),((4008,[0.0058995813542313145]),0),((4009,[0.006066471732454353]),0),((4010,[0.00597453407106392]),0),((4011,[0.00569094398306935]),0),((4012,[0.006135281376077952]),0),((4013,[0.005764302127595832]),0),((4014,[0.006021919856338952]),0),((4015,[0.005668462670801292]),0),((4016,[0.006069813665268231]),0),((4017,[0.005834224015447166]),0),((4018,[0.005884951063841428]),0),((4019,[0.006079709703702092]),0),((4020,[0.005742608529812731]),0),((4021,[0.0059101428982749745]),0),((4022,[0.005868093983272937]),0),((4023,[0.005871154799370013]),0),((4024,[0.005934592542335712]),0),((5000,[2.812367638627021E-4]),3),((5001,[2.992710669415067E-4]),3),((5002,[3.1228235094580767E-4]),3),((5003,[2.907886025099482E-4]),3),((5004,[3.0010232120431863E-4]),3),((5005,[2.979376705248636E-4]),3),((5006,[2.9701136951793667E-4]),3),((5007,[2.9191731183498057E-4]),3),((5008,[2.9511436042104405E-4]),3),((5009,[2.999732644713502E-4]),3),((5010,[2.982000382551668E-4]),3),((5011,[2.84875034573973E-4]),3),((5012,[2.928298888404219E-4]),3),((5013,[3.067900632285769E-4]),3),((5014,[2.900538609854514E-4]),3),((5015,[2.9372966822917296E-4]),3),((5016,[3.045205684767979E-4]),3),((5017,[3.0299172395976915E-4]),3),((5018,[2.8902170153107074E-4]),3),((5019,[3.0518145810340207E-4]),3),((5020,[2.854725344557008E-4]),3),((5021,[2.9452123441530835E-4]),3),((5022,[2.989311736005669E-4]),3),((5023,[3.1641592934541227E-4]),3),((5024,[2.997541391147682E-4]),3),((5025,[2.985016516572594E-4]),3),((5026,[3.0227813918851565E-4]),3),((5027,[2.9387535815135084E-4]),3),((5028,[2.995694521972803E-4]),3),((5029,[2.9486477413906566E-4]),3),((5030,[2.938900550102159E-4]),3),((5031,[2.969700562695488E-4]),3),((5032,[3.11130411239986E-4]),3),((5033,[2.9783807673530586E-4]),3),((5034,[3.019034936161776E-4]),3),((5035,[2.8495706412754086E-4]),3),((5036,[3.081362782208011E-4]),3),((5037,[2.9365158669359705E-4]),3),((5038,[3.0311814421690064E-4]),3),((5039,[3.010157660759845E-4]),3)]
return (centers_sorted_ix, opoints)

if __name__ == '__main__':
Plot().plotCircles()
Plot().plotOutputs()

0 comments on commit 9294263

Please sign in to comment.