Skip to content

Commit

Permalink
Merge branch 'master' into col-computability
Browse files Browse the repository at this point in the history
Conflicts:
	sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
  • Loading branch information
rxin committed Feb 3, 2015
2 parents 5afe1ff + 8aa3cff commit fd92bc7
Show file tree
Hide file tree
Showing 136 changed files with 7,701 additions and 446 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ To build Spark and its example programs, run:

(You do not need to do this if you downloaded a pre-built package.)
More detailed documentation is available from the project site, at
["Building Spark with Maven"](http://spark.apache.org/docs/latest/building-spark.html).
["Building Spark"](http://spark.apache.org/docs/latest/building-spark.html).

## Interactive Scala Shell

Expand Down
4 changes: 3 additions & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ fi
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
"classes ahead of assembly." >&2
# Spark classes
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SPARK_SCALA_VERSION/classes"
Expand All @@ -63,6 +63,8 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SPARK_SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SPARK_SCALA_VERSION/classes"
# Jars for shaded deps in their original form (copied here during build)
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
fi

# Use spark-assembly jar from either RELEASE or assembly directory
Expand Down
22 changes: 20 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,35 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
</dependency>

<!-- Jetty dependencies promoted to compile here so they are shaded
and inlined into spark-core jar -->
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-plus</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-security</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -348,19 +361,24 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<!-- When using SPARK_PREPEND_CLASSES Spark classes compiled locally don't use
shaded deps. So here we store jars in their original form which are added
when the classpath is computed. -->
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>guava</includeArtifactIds>
<includeArtifactIds>
guava,jetty-io,jetty-http,jetty-plus,jetty-util,jetty-server
</includeArtifactIds>
<silent>true</silent>
</configuration>
</execution>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
* spark.dynamicAllocation.enabled - Whether this feature is enabled
* spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
* spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
Expand All @@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(

import ExecutorAllocationManager._

// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
// Lower and upper bounds on the number of executors.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
Integer.MAX_VALUE)

// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
Expand Down Expand Up @@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
*/
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
}
if (minNumExecutors == 0 || maxNumExecutors == 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
if (maxNumExecutors == 0) {
throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
}
if (minNumExecutors > maxNumExecutors) {
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[spark] class HttpFileServer(
var serverUri : String = null

def initialize() {
baseDir = Utils.createTempDir()
baseDir = Utils.createTempDir(Utils.getLocalDir(conf), "httpd")
fileDir = new File(baseDir, "files")
jarDir = new File(baseDir, "jars")
fileDir.mkdir()
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet

import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
Expand Down Expand Up @@ -53,8 +54,8 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

if (loadDefaults) {
// Load any spark.* system properties
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
set(k, v)
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
}
}

Expand Down
69 changes: 39 additions & 30 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -687,9 +687,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @param minPartitions Minimum number of Hadoop Splits to generate.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopRDD[K, V](
conf: JobConf,
Expand All @@ -705,12 +706,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* */
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand Down Expand Up @@ -741,9 +743,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
Expand All @@ -764,9 +767,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* }}}
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
Expand All @@ -788,9 +792,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
Expand All @@ -810,9 +815,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* and extra configuration options to pass to the input format.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
Expand All @@ -826,9 +832,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def sequenceFile[K, V](path: String,
keyClass: Class[K],
Expand All @@ -843,9 +850,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
/** Get an RDD for a Hadoop SequenceFile with given key and value types.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
assertNotStopped()
Expand All @@ -869,9 +877,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* allow it to figure out the Writable class to use in the subclass case.
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD will create many references to the same object.
* If you plan to directly cache Hadoop writable objects, you should first copy them using
* a `map` function.
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ object SparkEnv extends Logging {
// this is a temporary directory; in distributed mode, this is the executor's current working
// directory.
val sparkFilesDir: String = if (isDriver) {
Utils.createTempDir().getAbsolutePath
Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath
} else {
"."
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private[broadcast] object HttpBroadcast extends Logging {
}

private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf), "broadcast")
val broadcastPort = conf.getInt("spark.broadcast.port", 0)
server =
new HttpServer(conf, broadcastDir, securityManager, broadcastPort, "HTTP broadcast server")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.api.python.PythonUtils
import org.apache.spark.util.{RedirectThread, Utils}

/**
* A main class used by spark-submit to launch Python applications. It executes python as a
* A main class used to launch Python applications. It executes python as a
* subprocess and then has it connect back to the JVM to access system properties, etc.
*/
object PythonRunner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class SparkHadoopUtil extends Logging {
val baselineBytesRead = f()
Some(() => f() - baselineBytesRead)
} catch {
case e: NoSuchMethodException => {
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
logDebug("Couldn't find method for retrieving thread-level FileSystem input data", e)
None
}
Expand All @@ -163,7 +163,7 @@ class SparkHadoopUtil extends Logging {
val baselineBytesWritten = f()
Some(() => f() - baselineBytesWritten)
} catch {
case e: NoSuchMethodException => {
case e @ (_: NoSuchMethodException | _: ClassNotFoundException) => {
logDebug("Couldn't find method for retrieving thread-level FileSystem output data", e)
None
}
Expand Down
Loading

0 comments on commit fd92bc7

Please sign in to comment.