Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into streaming-web-ui
Browse files Browse the repository at this point in the history
Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
	streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
	streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala
  • Loading branch information
tdas committed Apr 10, 2014
2 parents 61358e3 + bde9cc1 commit 3e986f8
Show file tree
Hide file tree
Showing 485 changed files with 11,230 additions and 3,658 deletions.
1 change: 1 addition & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ work
.*\.q
golden
test.out/*
.*iml
12 changes: 11 additions & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>
Expand Down Expand Up @@ -208,7 +218,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>buildnumber-maven-plugin</artifactId>
<version>1.1</version>
<version>1.2</version>
<executions>
<execution>
<phase>validate</phase>
Expand Down
20 changes: 12 additions & 8 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,27 +220,31 @@ object Bagel extends Logging {
*/
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
grouped: RDD[(K, (Seq[C], Seq[V]))],
grouped: RDD[(K, (Iterable[C], Iterable[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
val processed = grouped.flatMapValues {
case (_, vs) if vs.size == 0 => None
case (c, vs) =>
val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
.flatMapValues {
case (_, vs) if !vs.hasNext => None
case (c, vs) => {
val (newVert, newMsgs) =
compute(vs(0), c match {
case Seq(comb) => Some(comb)
case Seq() => None
})
compute(vs.next,
c.hasNext match {
case true => Some(c.next)
case false => None
}
)

numMsgs += newMsgs.size
if (newVert.active) {
numActiveVerts += 1
}

Some((newVert, newMsgs))
}
}.persist(storageLevel)

// Force evaluation of processed RDD for accurate performance measurements
Expand Down
35 changes: 19 additions & 16 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,7 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"

# Support for interacting with Hive. Since hive pulls in a lot of dependencies that might break
# existing Spark applications, it is not included in the standard spark assembly. Instead, we only
# include it in the classpath if the user has explicitly requested it by running "sbt hive/assembly"
# Hopefully we will find a way to avoid uber-jars entirely and deploy only the needed packages in
# the future.
if [ -f "$FWDIR"/sql/hive/target/scala-$SCALA_VERSION/spark-hive-assembly-*.jar ]; then

# Datanucleus jars do not work if only included in the uberjar as plugin.xml metadata is lost.
DATANUCLEUSJARS=$(JARS=("$FWDIR/lib_managed/jars"/datanucleus-*.jar); IFS=:; echo "${JARS[*]}")
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS

ASSEMBLY_DIR="$FWDIR/sql/hive/target/scala-$SCALA_VERSION/"
else
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION/"
fi
ASSEMBLY_DIR="$FWDIR/assembly/target/scala-$SCALA_VERSION"

# First check if we have a dependencies jar. If so, include binary classes with the deps jar
if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
Expand All @@ -59,7 +45,7 @@ if [ -f "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"

DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark*-assembly*hadoop*-deps.jar`
DEPS_ASSEMBLY_JAR=`ls "$ASSEMBLY_DIR"/spark-assembly*hadoop*-deps.jar`
CLASSPATH="$CLASSPATH:$DEPS_ASSEMBLY_JAR"
else
# Else use spark-assembly jar from either RELEASE or assembly directory
Expand All @@ -71,6 +57,23 @@ else
CLASSPATH="$CLASSPATH:$ASSEMBLY_JAR"
fi

# When Hive support is needed, Datanucleus jars must be included on the classpath.
# Datanucleus jars do not work if only included in the uber jar as plugin.xml metadata is lost.
# Both sbt and maven will populate "lib_managed/jars/" with the datanucleus jars when Spark is
# built with Hive, so first check if the datanucleus jars exist, and then ensure the current Spark
# assembly is built for Hive, before actually populating the CLASSPATH with the jars.
# Note that this check order is faster (by up to half a second) in the case where Hive is not used.
num_datanucleus_jars=$(ls "$FWDIR"/lib_managed/jars/ 2>/dev/null | grep "datanucleus-.*\\.jar" | wc -l)
if [ $num_datanucleus_jars -gt 0 ]; then
AN_ASSEMBLY_JAR=${ASSEMBLY_JAR:-$DEPS_ASSEMBLY_JAR}
num_hive_files=$(jar tvf "$AN_ASSEMBLY_JAR" org/apache/hadoop/hive/ql/exec 2>/dev/null | wc -l)
if [ $num_hive_files -gt 0 ]; then
echo "Spark assembly has been built with Hive, including Datanucleus jars on classpath" 1>&2
DATANUCLEUSJARS=$(echo "$FWDIR/lib_managed/jars"/datanucleus-*.jar | tr " " :)
CLASSPATH=$CLASSPATH:$DATANUCLEUSJARS
fi
fi

# Add test classes if we're running from SBT or Maven with SPARK_TESTING set to 1
if [[ $SPARK_TESTING == 1 ]]; then
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/test-classes"
Expand Down
3 changes: 3 additions & 0 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ if [ -z "$SPARK_ENV_LOADED" ]; then
use_conf_dir=${SPARK_CONF_DIR:-"$parent_dir/conf"}

if [ -f "${use_conf_dir}/spark-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${use_conf_dir}/spark-env.sh"
set +a
fi
fi
3 changes: 2 additions & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

if [[ "$IPYTHON" = "1" ]] ; then
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
exec ipython $IPYTHON_OPTS
else
exec "$PYSPARK_PYTHON" "$@"
Expand Down
2 changes: 0 additions & 2 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -154,5 +154,3 @@ if [ "$SPARK_PRINT_LAUNCH_COMMAND" == "1" ]; then
fi

exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"


8 changes: 4 additions & 4 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ set -o posix
FWDIR="$(cd `dirname $0`/..; pwd)"

SPARK_REPL_OPTS="${SPARK_REPL_OPTS:-""}"
DEFAULT_MASTER="local"
DEFAULT_MASTER="local[*]"
MASTER=${MASTER:-""}

info_log=0
Expand Down Expand Up @@ -64,7 +64,7 @@ ${txtbld}OPTIONS${txtrst}:
is followed by m for megabytes or g for gigabytes, e.g. "1g".
-dm --driver-memory : The memory used by the Spark Shell, the number is followed
by m for megabytes or g for gigabytes, e.g. "1g".
-m --master : A full string that describes the Spark Master, defaults to "local"
-m --master : A full string that describes the Spark Master, defaults to "local[*]"
e.g. "spark://localhost:7077".
--log-conf : Enables logging of the supplied SparkConf as INFO at start of the
Spark Context.
Expand Down Expand Up @@ -127,7 +127,7 @@ function set_spark_log_conf(){

function set_spark_master(){
if ! [[ "$1" =~ $ARG_FLAG_PATTERN ]]; then
MASTER="$1"
export MASTER="$1"
else
out_error "wrong format for $2"
fi
Expand All @@ -145,7 +145,7 @@ function resolve_spark_master(){
fi

if [ -z "$MASTER" ]; then
MASTER="$DEFAULT_MASTER"
export MASTER="$DEFAULT_MASTER"
fi

}
Expand Down
53 changes: 51 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -113,12 +117,10 @@
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
<version>0.3.1</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
Expand Down Expand Up @@ -196,6 +198,53 @@
<artifactId>derby</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon</artifactId>
<version>0.4.1-thrift</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-jsp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
</exclusion>
<exclusion>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
46 changes: 33 additions & 13 deletions core/src/main/java/org/apache/spark/api/java/StorageLevels.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@
* Expose some commonly useful storage level constants.
*/
public class StorageLevels {
public static final StorageLevel NONE = create(false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, 2);
public static final StorageLevel NONE = create(false, false, false, false, 1);
public static final StorageLevel DISK_ONLY = create(true, false, false, false, 1);
public static final StorageLevel DISK_ONLY_2 = create(true, false, false, false, 2);
public static final StorageLevel MEMORY_ONLY = create(false, true, false, true, 1);
public static final StorageLevel MEMORY_ONLY_2 = create(false, true, false, true, 2);
public static final StorageLevel MEMORY_ONLY_SER = create(false, true, false, false, 1);
public static final StorageLevel MEMORY_ONLY_SER_2 = create(false, true, false, false, 2);
public static final StorageLevel MEMORY_AND_DISK = create(true, true, false, true, 1);
public static final StorageLevel MEMORY_AND_DISK_2 = create(true, true, false, true, 2);
public static final StorageLevel MEMORY_AND_DISK_SER = create(true, true, false, false, 1);
public static final StorageLevel MEMORY_AND_DISK_SER_2 = create(true, true, false, false, 2);
public static final StorageLevel OFF_HEAP = create(false, false, true, false, 1);

/**
* Create a new StorageLevel object.
Expand All @@ -42,7 +43,26 @@ public class StorageLevels {
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized, int replication) {
return StorageLevel.apply(useDisk, useMemory, deserialized, replication);
@Deprecated
public static StorageLevel create(boolean useDisk, boolean useMemory, boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, false, deserialized, replication);
}

/**
* Create a new StorageLevel object.
* @param useDisk saved to disk, if true
* @param useMemory saved to memory, if true
* @param useOffHeap saved to Tachyon, if true
* @param deserialized saved as deserialized objects, if true
* @param replication replication factor
*/
public static StorageLevel create(
boolean useDisk,
boolean useMemory,
boolean useOffHeap,
boolean deserialized,
int replication) {
return StorageLevel.apply(useDisk, useMemory, useOffHeap, deserialized, replication);
}
}
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}

/**
* :: DeveloperApi ::
* A set of functions used to aggregate data.
*
* @param createCombiner function to create the initial value of the aggregation.
* @param mergeValue function to merge a new value into the aggregation result.
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
@DeveloperApi
case class Aggregator[K, V, C] (
createCombiner: V => C,
mergeValue: (C, V) => C,
Expand Down
Loading

0 comments on commit 3e986f8

Please sign in to comment.