Skip to content

Commit

Permalink
Converted custom Linear Algebra datatypes/routines to use Breeze.
Browse files Browse the repository at this point in the history
  • Loading branch information
sboeschhuawei committed Jan 28, 2015
1 parent b29c0db commit bea48ea
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 197 deletions.
193 changes: 92 additions & 101 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/PICLinalg.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark.mllib.clustering

import org.apache.spark.mllib.linalg.Vectors

import scala.reflect.ClassTag
import scala.util.Random
import breeze.linalg.{DenseVector => BDV,DenseMatrix => BDM}

/**
* PICLinalg
Expand All @@ -26,53 +30,72 @@ import scala.util.Random

object PICLinalg {

type DVector = Array[Double]
type DMatrix = Array[DVector]
type DMatrix = BDM[Double]

type LabeledVector = (String, DVector)
type LabeledVector = (Long, BDV[Double])

type IndexedVector = (Long, DVector)
type IndexeBDV[Double] = (Long, BDV[Double])

type Vertices = Seq[LabeledVector]

def add(v1: DVector, v2: DVector) =
// implicit def arrayToVect(darr: Array[Double]): BDV[Double] = new BDV(darr)
implicit def bdvToSeq[T](vect: BDV[T])(implicit ct: ClassTag[T]): Seq[T] = vect.toArray.toSeq
implicit def bdvToArray[T](vect: BDV[T])(implicit ct: ClassTag[T]): Array[T] = vect.toArray
// implicit def arrayToSeq(arr: Array[Double]) = Predef.doubleArrayOps(arr)
def add(v1: BDV[Double], v2: BDV[Double]) =
v1.zip(v2).map { x => x._1 + x._2}

def mult(v1: DVector, d: Double) = {
v1.map {
_ * d
}
def norm(darr: Array[Double]): Double = {
Math.sqrt(darr.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)})
}

def norm(darr: BDV[Double]): Double = {
darr.norm(2)
}

def mult(v1: DVector, v2: DVector) = {
v1.zip(v2).map { case (v1v, v2v) => v1v * v2v}
//
//
// // Implicits to convert between Breeze DenseVector's and Arrays
// implicit def arrToSeq[T](arr: Array[T]): Seq[T] = arr.toSeq
//// implicit def arrayToBDV(darr: Array[Double]): BDV[Double]
//// = Vectors.dense(darr).asInstanceOf[BDV[Double]]
//// implicit def bdvToArray[T](vect: BDV[T])(implicit ct: ClassTag[T]): Array[T] = vect.toArray
//// implicit def bdvToSeq[T](vect: BDV[T])(implicit ct: ClassTag[T]): Seq[T] = vect.toArray.toSeq
//
// def add(v1: BDV[Double], v2: BDV[Double]) =
// v1.zip(v2).map { x => x._1 + x._2}

def mult(v1: BDV[Double], d: Double) = {
v1 * d
}

def multColByRow(v1: DVector, v2: DVector) = {
val mat = for (v1v <- v1)
yield mult(v2, v1v)
// println(s"Col by Row:\n${printMatrix(mat,
// v1.length, v1.length)}")
def mult(v1: BDV[Double], v2: BDV[Double]) = {
v1 * v2
}

def multColByRow(v1: BDV[Double], v2: BDV[Double]) = {
val mat = v1 * v2.t
mat
}

def norm(vect: DVector): Double = {
Math.sqrt(vect.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)})
def norm(vect: BDV[Double]): Double = {
vect.norm
}

def manhattanNorm(vect: DVector): Double = {
val n = vect.foldLeft(0.0) { case (sum, dval) => sum + Math.abs(dval)}
n / Math.sqrt(vect.size)
// def norm(darr: Array[Double]): Double = {
// Math.sqrt(darr.foldLeft(0.0) { case (sum, dval) => sum + Math.pow(dval, 2)})
// }

def manhattanNorm(vect: BDV[Double]): Double = {
vect.norm(1)
}

def dot(v1: DVector, v2: DVector) = {
v1.zip(v2).foldLeft(0.0) {
case (sum, (b, p)) => sum + b * p
}
def dot(v1: BDV[Double], v2: BDV[Double]) : Double = {
v1.dot(v2)
}

def onesVector(len: Int): DVector = {
Array.fill(len)(1.0)
def onesVector(len: Int): BDV[Double] = {
BDV.ones(len)
}

val calcEigenDiffs = true
Expand All @@ -90,43 +113,25 @@ object PICLinalg {
}

def transpose(mat: DMatrix) = {
val nCols = mat(0).length
val matT = mat
.flatten
.zipWithIndex
.groupBy {
_._2 % nCols
}
.toSeq.sortBy {
_._1
}
.map(_._2)
// .map(_.toSeq.sortBy(_._1))
.map(_.map(_._1))
.toArray
matT
mat.t
}

def printMatrix(mat: Array[Array[Double]]): String
= printMatrix(mat, mat.length, mat.length)
def printMatrix(mat: BDM[Double]): String
= printMatrix(mat, mat.rows, mat.cols)

def printMatrix(darr: Array[DVector], numRows: Int, numCols: Int): String = {
val flattenedArr = darr.zipWithIndex.foldLeft(new DVector(numRows * numCols)) {
case (flatarr, (row, indx)) =>
System.arraycopy(row, 0, flatarr, indx * numCols, numCols)
flatarr
}
printMatrix(flattenedArr, numRows, numCols)
def printMatrix(mat: BDM[Double], numRows: Int, numCols: Int): String = {
printMatrix(mat.toArray, numRows, numCols)
}

def printMatrix(darr: DVector, numRows: Int, numCols: Int): String = {
val stride = (darr.length / numCols)
def printMatrix(vect: Array[Double], numRows: Int, numCols: Int): String = {
val darr = vect
val stride = darr.length / numCols
val sb = new StringBuilder
def leftJust(s: String, len: Int) = {
" ".substring(0, len - Math.min(len, s.length)) + s
}

assert(darr.size == numRows * numCols,
assert(darr.length == numRows * numCols,
s"Input array is not correct length (${darr.length}) given #rows/cols=$numRows/$numCols")
for (r <- 0 until numRows) {
for (c <- 0 until numCols) {
Expand All @@ -137,57 +142,54 @@ object PICLinalg {
sb.toString
}

def printVector(dvect: DVector) = {
def printVector(dvect: BDV[Double]) = {
dvect.mkString(",")
}

def project(basisVector: DVector, inputVect: DVector) = {
def project(basisVector: BDV[Double], inputVect: BDV[Double]) = {
val pnorm = makeNonZero(norm(basisVector))
val projectedVect = basisVector.map(
_ * dot(basisVector, inputVect) / dot(basisVector, basisVector))
projectedVect
}

def subtract(v1: DVector, v2: DVector) = {
val subvect = v1.zip(v2).map { case (v1val, v2val) => v1val - v2val}
subvect
def subtract(v1: BDV[Double], v2: BDV[Double]) = {
v1 - v2
}

def subtractProjection(vect: DVector, basisVect: DVector): DVector = {
def subtractProjection(vect: BDV[Double], basisVect: BDV[Double]): BDV[Double] = {
val proj = project(basisVect, vect)
val subVect = subtract(vect, proj)
subVect
}

def localPIC(matIn: DMatrix, nClusters: Int, nIterations: Int,
optExpected: Option[(DVector, DMatrix)]) = {
optExpected: Option[(BDV[Double], DMatrix)]) = {

var mat = matIn.map(identity)
val numVects = mat.length
val numVects = mat.cols

val (expLambda, expdat) = optExpected.getOrElse((new DVector(0), new DMatrix(0)))
val (expLambda, expdat) = optExpected.getOrElse((new BDV(Array(0.0)), new BDM(0,0)))
var cnorm = -1.0
for (k <- 0 until nClusters) {
val r = new Random()
var eigen = Array.fill(numVects) {
var eigen = new BDV(Array.fill(numVects) {
// 1.0
r.nextDouble
}
})
val enorm = norm(eigen)
eigen.map { e => e / enorm}
eigen *= 1.0 / enorm

for (iter <- 0 until nIterations) {
eigen = mat.map { dvect =>
dot(dvect, eigen)
}
eigen = mat * eigen
cnorm = makeNonZero(norm(eigen))
eigen = eigen.map(_ / cnorm)
}
val signum = Math.signum(dot(mat(0), eigen))
val lambda = dot(mat(0), eigen) / eigen(0)
eigen = eigen.map(_ * signum)
println(s"lambda=$lambda eigen=${printVector(eigen)}")
if (expLambda.length > 0) {
if (expLambda.toArray.length > 0) {
val compareVect = eigen.zip(expdat(k)).map { case (a, b) => a / b}
println(s"Ratio to expected: lambda=${lambda / expLambda(k)} " +
s"Vect=${compareVect.mkString("[", ",", "]")}")
Expand All @@ -204,26 +206,22 @@ object PICLinalg {
}

def compareMatrices(m1: DMatrix, m2: DMatrix) = {
m1.zip(m2).forall { case (m1v, m2v) =>
m1v.zip(m2v).forall { case (m1vv, m2vv) => withinTol(m1vv - m2vv)}
m1.toArray.zip(m2.toArray).forall { case (m1v, m2v) =>
withinTol(m1v - m2v)
}
}

def subtract(mat1: DMatrix, mat2: DMatrix) = {
mat1.zip(mat2).map { case (m1row, m2row) =>
m1row.zip(m2row).map { case (m1v, m2v) => m1v - m2v}
}
mat1 - mat2
}

def deflate(mat: DMatrix, lambda: Double, eigen: DVector) = {
def deflate(mat: DMatrix, lambda: Double, eigen: BDV[Double]) = {
// mat = mat.map(subtractProjection(_, mult(eigen, lambda)))
val eigT = eigen
val projected = multColByRow(eigen, eigT).map(mult(_, lambda))
val projected = (eigen * eigen.t) * lambda
// println(s"projected matrix:\n${printMatrix(projected,
// eigen.length, eigen.length)}")
val matOut = mat.zip(projected).map { case (mrow, prow) =>
subtract(mrow, prow)
}
val matOut = mat - projected
println(s"Updated matrix:\n${
printMatrix(mat,
eigen.length, eigen.length)
Expand All @@ -232,46 +230,39 @@ object PICLinalg {
}

def mult(mat1: DMatrix, mat2: DMatrix) = {
val mat2T = transpose(mat2)
val outmatT = for {row <- mat1}
yield {
val outRow = mat2T.map { col =>
dot(row, col)
}
outRow
}
outmatT
val outMat = mat1 :* mat2
outMat
}

// def mult(mat: DMatrix, vect: DVector): DMatrix = {
// def mult(mat: DMatrix, vect: BDV[Double]): DMatrix = {
// val outMat = mat.map { m =>
// mult(m, vect)
// }
// outMat
// }
//
// def mult(vect: DVector, mat: DMatrix): DMatrix = {
// def mult(vect: BDV[Double], mat: DMatrix): DMatrix = {
// for {d <- vect.zip(transpose(mat)) }
// yield mult(d._2, d._1)
// }

def scale(mat: DMatrix, d: Double): DMatrix = {
for (row <- mat) yield mult(row, d)
mat * d
}

def transpose(vector: DVector) = {
def transpose(vector: BDV[Double]) = {
vector.map { d => Array(d)}
}

def toMat(dvect: Array[Double], ncols: Int) = {
val m = dvect.toSeq.grouped(ncols).map(_.toArray).toArray
val m = dvect.toSeq.grouped(ncols).map(_.toArray)
m
}

def schurComplement(mat: DMatrix, lambda: Double, eigen: DVector) = {
val eigT = toMat(eigen, eigen.length) // The sense is reversed
val eig = transpose(eigT)
val projected = mult(eig, eigT)
def schurComplement(mat: DMatrix, lambda: Double, eigen: BDV[Double]) = {
val eig = eigen
val eigT = eigen.t
val projected = eig * eigT
println(s"projected matrix:\n${
printMatrix(projected,
eigen.length, eigen.length)
Expand All @@ -282,9 +273,9 @@ object PICLinalg {
printMatrix(numerat2,
eigen.length, eigen.length)
}")
val denom1 = mult(eigT, mat)
val denom2 = mult(denom1, toMat(eigen, 1))
val denom = denom2(0)(0)
val denom1 = eigT * mat
val denom2 = denom1 * eigen
val denom = denom2.toArray(0)
println(s"denom is $denom")
val projMat = scale(numerat2, 1.0 / denom)
println(s"Updated matrix:\n${
Expand Down
Loading

0 comments on commit bea48ea

Please sign in to comment.