Skip to content

Commit

Permalink
[FLINK-26553][build] Format code with Spotless/scalafmt
Browse files Browse the repository at this point in the history
  • Loading branch information
Rufus Refactor authored and twalthr committed Apr 12, 2022
1 parent 3ea3fee commit 91d81c4
Show file tree
Hide file tree
Showing 1,297 changed files with 60,656 additions and 54,415 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.flink.api.scala.hadoop.mapred

import org.apache.flink.annotation.Public
import org.apache.flink.api.java.hadoop.mapred.HadoopInputFormatBase

import org.apache.hadoop.mapred.{InputFormat, JobConf}

@Public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.flink.api.scala.hadoop.mapred

import org.apache.flink.annotation.Public
import org.apache.flink.api.java.hadoop.mapred.HadoopOutputFormatBase

import org.apache.hadoop.mapred.{JobConf, OutputCommitter, OutputFormat}

@Public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.hadoop.mapreduce

import org.apache.flink.annotation.Public
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase

import org.apache.hadoop.mapreduce.{InputFormat, Job}

@Public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.hadoop.mapreduce

import org.apache.flink.annotation.Public
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase

import org.apache.hadoop.mapreduce.{Job, OutputFormat}

@Public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.hadoopcompatibility.scala

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.hadoop.{mapred, mapreduce}

import org.apache.hadoop.fs.{Path => HadoopPath}
import org.apache.hadoop.mapred.{JobConf, FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat}
import org.apache.hadoop.mapred.{FileInputFormat => MapredFileInputFormat, InputFormat => MapredInputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => MapreduceInputFormat, Job}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => MapreduceFileInputFormat}
import org.apache.hadoop.mapreduce.{Job, InputFormat => MapreduceInputFormat}

/**
* HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
*
* It provides methods to create Flink InputFormat wrappers for Hadoop
* [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
*
* Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where
* the first field is the key and the second field is the value.
*
*/
* HadoopInputs is a utility class to use Apache Hadoop InputFormats with Apache Flink.
*
* It provides methods to create Flink InputFormat wrappers for Hadoop
* [[org.apache.hadoop.mapred.InputFormat]] and [[org.apache.hadoop.mapreduce.InputFormat]].
*
* Key value pairs produced by the Hadoop InputFormats are converted into [[Tuple2]] where the first
* field is the key and the second field is the value.
*/
object HadoopInputs {

/**
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapred.FileInputFormat]].
*/
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapred.FileInputFormat]].
*/
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
key: Class[K],
Expand All @@ -55,9 +54,9 @@ object HadoopInputs {
}

/**
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapred.FileInputFormat]].
*/
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapred.FileInputFormat]].
*/
def readHadoopFile[K, V](
mapredInputFormat: MapredFileInputFormat[K, V],
key: Class[K],
Expand All @@ -68,26 +67,24 @@ object HadoopInputs {
}

/**
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
* file with the given key and value classes.
*/
def readSequenceFile[K, V](
key: Class[K],
value: Class[V],
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that reads a Hadoop sequence
* file with the given key and value classes.
*/
def readSequenceFile[K, V](key: Class[K], value: Class[V], inputPath: String)(implicit
tpe: TypeInformation[(K, V)]): mapred.HadoopInputFormat[K, V] = {

readHadoopFile(
new org.apache.hadoop.mapred.SequenceFileInputFormat[K, V],
key,
value,
inputPath
)
)
}

/**
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapred.InputFormat]].
*/
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapred.InputFormat]].
*/
def createHadoopInput[K, V](
mapredInputFormat: MapredInputFormat[K, V],
key: Class[K],
Expand All @@ -98,9 +95,9 @@ object HadoopInputs {
}

/**
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
*/
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
*/
def readHadoopFile[K, V](
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
key: Class[K],
Expand All @@ -115,22 +112,22 @@ object HadoopInputs {
}

/**
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
*/
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapreduce.lib.input.FileInputFormat]].
*/
def readHadoopFile[K, V](
mapreduceInputFormat: MapreduceFileInputFormat[K, V],
key: Class[K],
value: Class[V],
inputPath: String)(implicit tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] =
{
inputPath: String)(implicit
tpe: TypeInformation[(K, V)]): mapreduce.HadoopInputFormat[K, V] = {
readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance)
}

/**
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapreduce.InputFormat]].
*/
* Creates a Flink [[org.apache.flink.api.common.io.InputFormat]] that wraps the given Hadoop
* [[org.apache.hadoop.mapreduce.InputFormat]].
*/
def createHadoopInput[K, V](
mapreduceInputFormat: MapreduceInputFormat[K, V],
key: Class[K],
Expand All @@ -140,4 +137,3 @@ object HadoopInputs {
new mapreduce.HadoopInputFormat[K, V](mapreduceInputFormat, key, value, job)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.api.hadoopcompatibility.scala
import org.apache.flink.api.java.typeutils.WritableTypeInfo
import org.apache.flink.api.scala._
import org.apache.flink.util.TestLogger

import org.apache.hadoop.io.Text
import org.junit.Assert._
import org.junit.Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.test.testdata.WordCountData
import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
import org.apache.flink.util.OperatingSystem

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, TextInputFormat, TextOutputFormat}
Expand All @@ -38,31 +39,34 @@ class WordCountMapredITCase extends JavaProgramTestBase {
Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows)
}

protected override def preSubmit() {
override protected def preSubmit() {
textPath = createTempFile("text.txt", WordCountData.TEXT)
resultPath = getTempDirPath("result")
}

protected override def postSubmit() {
TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
resultPath, Array[String](".", "_"))
override protected def postSubmit() {
TestBaseUtils.compareResultsByLinesInMemory(
WordCountData.COUNTS,
resultPath,
Array[String](".", "_"))
}

private def internalRun (): Unit = {
private def internalRun(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val input =
env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
classOf[Text], textPath))
env.createInput(
HadoopInputs
.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))

val counts = input
.map(_._2.toString)
.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map((_, 1)))
.groupBy(0)
.sum(1)

val words = counts
.map( t => (new Text(t._1), new LongWritable(t._2)) )
.map(t => (new Text(t._1), new LongWritable(t._2)))

val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
new TextOutputFormat[Text, LongWritable],
Expand All @@ -81,4 +85,3 @@ class WordCountMapredITCase extends JavaProgramTestBase {
postSubmit()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.hadoopcompatibility.scala

import org.apache.flink.api.scala._
Expand All @@ -24,6 +23,7 @@ import org.apache.flink.hadoopcompatibility.scala.HadoopInputs
import org.apache.flink.test.testdata.WordCountData
import org.apache.flink.test.util.{JavaProgramTestBase, TestBaseUtils}
import org.apache.flink.util.OperatingSystem

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.Job
Expand All @@ -41,41 +41,43 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {
Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows)
}

protected override def preSubmit() {
override protected def preSubmit() {
textPath = createTempFile("text.txt", WordCountData.TEXT)
resultPath = getTempDirPath("result")
}

protected override def postSubmit() {
TestBaseUtils.compareResultsByLinesInMemory(WordCountData.COUNTS,
resultPath, Array[String](".", "_"))
override protected def postSubmit() {
TestBaseUtils.compareResultsByLinesInMemory(
WordCountData.COUNTS,
resultPath,
Array[String](".", "_"))
}

protected def testProgram() {
internalRun()
postSubmit()
}

private def internalRun (): Unit = {
private def internalRun(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val input =
env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat, classOf[LongWritable],
classOf[Text], textPath))
env.createInput(
HadoopInputs
.readHadoopFile(new TextInputFormat, classOf[LongWritable], classOf[Text], textPath))

val counts = input
.map(_._2.toString)
.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map( (_, 1)))
.flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty).map((_, 1)))
.groupBy(0)
.sum(1)

val words = counts
.map( t => (new Text(t._1), new LongWritable(t._2)) )
.map(t => (new Text(t._1), new LongWritable(t._2)))

val job = Job.getInstance()
val hadoopOutputFormat = new HadoopOutputFormat[Text, LongWritable](
new TextOutputFormat[Text, LongWritable],
job)
val hadoopOutputFormat =
new HadoopOutputFormat[Text, LongWritable](new TextOutputFormat[Text, LongWritable], job)
hadoopOutputFormat.getConfiguration.set("mapred.textoutputformat.separator", " ")

FileOutputFormat.setOutputPath(job, new Path(resultPath))
Expand All @@ -85,4 +87,3 @@ class WordCountMapreduceITCase extends JavaProgramTestBase {
env.execute("Hadoop Compat WordCount")
}
}

Loading

0 comments on commit 91d81c4

Please sign in to comment.