diff --git a/docs/src/reference/asciidoc/core/cascading.adoc b/docs/src/reference/asciidoc/core/cascading.adoc index 0db0b5874..e8ef7a9ff 100644 --- a/docs/src/reference/asciidoc/core/cascading.adoc +++ b/docs/src/reference/asciidoc/core/cascading.adoc @@ -95,7 +95,8 @@ Simply hook `EsTap` into the Cascading flow: ---- Tap in = new Lfs(new TextDelimited(new Fields("id", "name", "url", "picture")), "/resources/artists.dat"); -Tap out = new EsTap("radio/artists" <1>, new Fields("name", "url", "picture") <2>); +Tap out = new EsTap("radio/artists", <1> + new Fields("name", "url", "picture")); <2> new HadoopFlowConnector().connect(in, out, new Pipe("write-to-Es")).complete(); ---- @@ -140,8 +141,8 @@ One can index the data to a different resource, depending on the tuple being rea [source,java] ---- -Tap out = new EsTap("my-collection-{media.type}/doc" <1>, - new Fields("name", "media.type", "year") <2>); +Tap out = new EsTap("my-collection-{media.type}/doc", <1> + new Fields("name", "media.type", "year")); <2> ---- <1> Resource pattern using field `media.type` @@ -154,7 +155,7 @@ The functionality is available when dealing with raw JSON as well - in this case [source,js] ---- { - "media_type":"book",<1> + "media_type":"book", <1> "title":"Harry Potter", "year":"2010" } @@ -167,7 +168,8 @@ the `Tap` declaration can be as follows: ---- props.setProperty("es.input.json", "true"); Tap in = new Lfs(new TextLine(new Fields("line")),"/archives/collection.json"); -Tap out = new EsTap("my-collection-{media_type}/doc" <1>, new Fields("line") <2>); +Tap out = new EsTap("my-collection-{media_type}/doc", <1> + new Fields("line")); <2> ---- <1> Resource pattern relying on fields _within_ the JSON document and _not_ on the `Tap` schema @@ -180,7 +182,8 @@ Just the same, add `EsTap` on the other end of a pipe, to read (instead of writi [source,java] ---- -Tap in = new EsTap("radio/artists/"<1>,"?q=me*"<2>); +Tap in = new EsTap("radio/artists/", <1> + "?q=me*"); <2> Tap out = new StdOut(new TextLine()); new LocalFlowConnector().connect(in, out, new Pipe("read-from-Es")).complete(); ---- diff --git a/docs/src/reference/asciidoc/core/hive.adoc b/docs/src/reference/asciidoc/core/hive.adoc index cbf1e3f82..1385efd79 100644 --- a/docs/src/reference/asciidoc/core/hive.adoc +++ b/docs/src/reference/asciidoc/core/hive.adoc @@ -59,7 +59,7 @@ When using Hive, one can use `TBLPROPERTIES` to specify the <; + 'es.index.auto.create' = 'false'); <1> ---- <1> {eh} setting @@ -78,12 +78,10 @@ To wit: CREATE EXTERNAL TABLE artists (...) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'radio/artists', - <1>'es.mapping.names' = 'date:@timestamp <2>, url:url_123 <3>'); + 'es.mapping.names' = 'date:@timestamp, url:url_123'); <1> ---- -<1> name mapping for two fields -<2> Hive column `date` mapped in {es} to `@timestamp` -<3> Hive column `url` mapped in {es} to `url_123` +<1> Hive column `date` mapped in {es} to `@timestamp`; Hive column `url` mapped in {es} to `url_123` TIP: Hive is case **insensitive** while {es} is not. The loss of information can create invalid queries (as the column in Hive might not match the one in {es}). To avoid this, {eh} will always convert Hive column names to lower-case. This being said, it is recommended to use the default Hive style and use upper-case names only for Hive commands and avoid mixed-case names. @@ -102,7 +100,7 @@ CREATE EXTERNAL TABLE artists ( name STRING, links STRUCT) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'<1> -TBLPROPERTIES('es.resource' = 'radio/artists'<2>); +TBLPROPERTIES('es.resource' = 'radio/artists'); <2> -- insert data to Elasticsearch from another table called 'source' INSERT OVERWRITE TABLE artists @@ -148,10 +146,10 @@ IMPORTANT: Make sure the data is properly encoded, in `UTF-8`. The field content [source,java] ---- -CREATE EXTERNAL TABLE json (data STRING<1>) +CREATE EXTERNAL TABLE json (data STRING) <1> STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = '...', - 'es.input.json` = 'yes'<2>); + 'es.input.json` = 'yes'); <2> ... ---- @@ -170,7 +168,7 @@ CREATE EXTERNAL TABLE media ( type STRING,<1> year STRING, STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' -TBLPROPERTIES('es.resource' = 'my-collection-{type}/doc'<2>); +TBLPROPERTIES('es.resource' = 'my-collection-{type}/doc'); <2> ---- <1> Table field used by the resource pattern. Any of the declared fields can be used. @@ -195,9 +193,9 @@ the table declaration can be as follows: [source,sql] ---- -CREATE EXTERNAL TABLE json (data STRING<1>) +CREATE EXTERNAL TABLE json (data STRING) <1> STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' -TBLPROPERTIES('es.resource' = 'my-collection-{media_type}/doc'<2>, +TBLPROPERTIES('es.resource' = 'my-collection-{media_type}/doc', <2> 'es.input.json` = 'yes'); ---- @@ -216,7 +214,8 @@ CREATE EXTERNAL TABLE artists ( name STRING, links STRUCT) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'<1> -TBLPROPERTIES('es.resource' = 'radio/artists'<2>, 'es.query' = '?q=me*'<3>); +TBLPROPERTIES('es.resource' = 'radio/artists', <2> + 'es.query' = '?q=me*'); <3> -- stream data from Elasticsearch SELECT * FROM artists; diff --git a/docs/src/reference/asciidoc/core/intro/download.adoc b/docs/src/reference/asciidoc/core/intro/download.adoc index 0af934915..c840ebf8b 100644 --- a/docs/src/reference/asciidoc/core/intro/download.adoc +++ b/docs/src/reference/asciidoc/core/intro/download.adoc @@ -28,7 +28,7 @@ These are available under the same `groupId`, using an `artifactId` with the pat ---- org.elasticsearch - elasticsearch-hadoop-mr<1> + elasticsearch-hadoop-mr <1> {ver} ---- @@ -40,7 +40,7 @@ These are available under the same `groupId`, using an `artifactId` with the pat ---- org.elasticsearch - elasticsearch-hadoop-hive<1> + elasticsearch-hadoop-hive <1> {ver} ---- @@ -52,7 +52,7 @@ These are available under the same `groupId`, using an `artifactId` with the pat ---- org.elasticsearch - elasticsearch-hadoop-pig<1> + elasticsearch-hadoop-pig <1> {ver} ---- @@ -64,13 +64,16 @@ These are available under the same `groupId`, using an `artifactId` with the pat ---- org.elasticsearch - elasticsearch-spark-20<1>_2.10<2> + elasticsearch-spark-20_2.10 <1> {ver} ---- -<1> 'spark' artifact. Notice the `-20` part of the suffix which indicates the Spark version compatible with the artifact. Use `20` for Spark 2.0+ and `13` for Spark 1.3-1.6. -<2> Notice the `_2.10` suffix which indicates the Scala version compatible with the artifact. Currently it is the same as the version used by Spark itself. +<1> 'spark' artifact. Notice the `-20` part of the suffix which indicates the +Spark version compatible with the artifact. Use `20` for Spark 2.0+ and `13` for +Spark 1.3-1.6. Notice the `_2.10` suffix which indicates the Scala version +compatible with the artifact. Currently it is the same as the version used by +Spark itself. The Spark connector framework is the most sensitive to version incompatibilities. For your convenience, a version compatibility matrix has been provided below: [cols="2,2,10",options="header",] @@ -89,7 +92,7 @@ The Spark connector framework is the most sensitive to version incompatibilities ---- org.elasticsearch - elasticsearch-hadoop-cascading<1> + elasticsearch-hadoop-cascading <1> {ver} ---- @@ -114,7 +117,7 @@ in order for the Cascading dependencies to be properly resolved: ---- org.elasticsearch - elasticsearch-storm<1> + elasticsearch-storm <1> {ver} ---- diff --git a/docs/src/reference/asciidoc/core/pig.adoc b/docs/src/reference/asciidoc/core/pig.adoc index 498d3050b..e46797cfd 100644 --- a/docs/src/reference/asciidoc/core/pig.adoc +++ b/docs/src/reference/asciidoc/core/pig.adoc @@ -44,9 +44,10 @@ With Pig, one can specify the <> properties (as an [source,sql] ---- -STORE B INTO 'radio/artists'<1> USING org.elasticsearch.hadoop.pig.EsStorage - ('es.http.timeout = 5m<2>', - 'es.index.auto.create = false' <3>); +STORE B INTO 'radio/artists' <1> + USING org.elasticsearch.hadoop.pig.EsStorage + ('es.http.timeout = 5m', <2> + 'es.index.auto.create = false'); <3> ---- <1> {eh} configuration (target resource) @@ -163,12 +164,10 @@ For example: [source,sql] ---- STORE B INTO '...' USING org.elasticsearch.hadoop.pig.EsStorage( - '<1>es.mapping.names=date:@timestamp<2>, uRL:url<3>') + 'es.mapping.names=date:@timestamp, uRL:url') <1> ---- -<1> name mapping for two fields -<2> Pig column `date` mapped in {es} to `@timestamp` -<3> Pig column `uRL` mapped in {es} to `url` +<1> Pig column `date` mapped in {es} to `@timestamp`; Pig column `uRL` mapped in {es} to `url` TIP: Since {eh} 2.1, the Pig schema case sensitivity is preserved to {es} and back. @@ -185,11 +184,13 @@ A = LOAD 'src/test/resources/artists.dat' USING PigStorage() -- transform data B = FOREACH A GENERATE name, TOTUPLE(url, picture) AS links; -- save the result to Elasticsearch -STORE B INTO 'radio/artists'<1> USING org.elasticsearch.hadoop.pig.EsStorage(<2>); +STORE B INTO 'radio/artists'<1> + USING org.elasticsearch.hadoop.pig.EsStorage(); <2> ---- <1> {es} resource (index and type) associated with the given storage -<2> additional configuration parameters can be passed here - in this case the defaults are used +<2> additional configuration parameters can be passed inside the `()` - in this +case the defaults are used For cases where the id (or other metadata fields like +ttl+ or +timestamp+) of the document needs to be specified, one can do so by setting the appropriate <>, namely +es.mapping.id+. Following the previous example, to indicate to {es} to use the field +id+ as the document id, update the +Storage+ configuration: @@ -219,9 +220,9 @@ IMPORTANT: Make sure the data is properly encoded, in `UTF-8`. The field content [source,sql] ---- -A = LOAD '/resources/artists.json' USING PigStorage() AS (json:chararray<1>);" +A = LOAD '/resources/artists.json' USING PigStorage() AS (json:chararray);" <1> STORE B INTO 'radio/artists' - USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true'<2>...); + USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true'...); <2> ---- <1> Load the (JSON) data as a single field (`json`) @@ -235,8 +236,9 @@ One can index the data to a different resource, depending on the 'row' being rea [source,sql] ---- A = LOAD 'src/test/resources/media.dat' USING PigStorage() - AS (name:chararray, type:chararray <1>, year: chararray); -STORE B INTO 'my-collection-{type}/doc'<2> USING org.elasticsearch.hadoop.pig.EsStorage(); + AS (name:chararray, type:chararray, year: chararray); <1> +STORE B INTO 'my-collection-{type}/doc' <2> + USING org.elasticsearch.hadoop.pig.EsStorage(); ---- <1> Tuple field used by the resource pattern. Any of the declared fields can be used. @@ -262,8 +264,8 @@ the table declaration can be as follows: [source,sql] ---- -A = LOAD '/resources/media.json' USING PigStorage() AS (json:chararray<1>);" -STORE B INTO 'my-collection-{media_type}/doc'<2> +A = LOAD '/resources/media.json' USING PigStorage() AS (json:chararray);" <1> +STORE B INTO 'my-collection-{media_type}/doc' <2> USING org.elasticsearch.hadoop.pig.EsStorage('es.input.json=true'); ---- @@ -278,8 +280,8 @@ As you would expect, loading the data is straight forward: [source,sql] ---- -- execute Elasticsearch query and load data into Pig -A = LOAD 'radio/artists'<1> - USING org.elasticsearch.hadoop.pig.EsStorage('es.query=?me*'<2>); +A = LOAD 'radio/artists' <1> + USING org.elasticsearch.hadoop.pig.EsStorage('es.query=?me*'); <2> DUMP A; ---- diff --git a/docs/src/reference/asciidoc/core/spark.adoc b/docs/src/reference/asciidoc/core/spark.adoc index 7128ff437..acd140fdf 100644 --- a/docs/src/reference/asciidoc/core/spark.adoc +++ b/docs/src/reference/asciidoc/core/spark.adoc @@ -49,7 +49,7 @@ For those that want to set the properties through the command-line (either direc [source, bash] ---- -$ ./bin/spark-submit --conf spark.es.resource<1>=index/type ... +$ ./bin/spark-submit --conf spark.es.resource=index/type ... <1> ---- <1> Notice the +es.resource+ property which became +spark.es.resource+ @@ -82,7 +82,9 @@ val sc = new SparkContext(conf) <3> val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3) val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") -sc.makeRDD<4>(Seq(numbers, airports)).saveToEs<5>("spark/docs") +sc.makeRDD( <4> + Seq(numbers, airports) +).saveToEs("spark/docs") <5> ---- <1> Spark Scala imports @@ -233,7 +235,7 @@ String json1 = "{\"reason\" : \"business\",\"airport\" : \"SFO\"}"; <1> String json2 = "{\"participants\" : 5,\"airport\" : \"OTP\"}"; JavaSparkContext jsc = ... -JavaRDD<2> stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); +JavaRDD stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); <2> JavaEsSpark.saveJsonToEs(stringRDD, "spark/json-trips"); <3> ---- @@ -253,7 +255,10 @@ For cases when the data being written to {es} needs to be indexed under differen [source,scala] ---- -val game = Map("media_type"<1>->"game","title" -> "FF VI","year" -> "1994") +val game = Map( + "media_type"->"game", <1> + "title" -> "FF VI", + "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") @@ -312,8 +317,9 @@ val sfo = Map("iata" -> "SFO", "name" -> "San Fran") // instance of SparkContext val sc = ... -val airportsRDD<1> = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) <2> -airportsRDD.saveToEsWithMeta<3>("airports/2015") +val airportsRDD = <1> + sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) <2> +airportsRDD.saveToEsWithMeta("airports/2015") <3> ---- <1> +airportsRDD+ is a __key-value__ pair +RDD+; it is created from a +Seq+ of ++tuple++s @@ -339,7 +345,8 @@ val sfoMeta = Map(ID -> 3) <4> // instance of SparkContext val sc = ... -val airportsRDD = sc.makeRDD<5>(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) +val airportsRDD = sc.makeRDD( <5> + Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) airportsRDD.saveToEsWithMeta("airports/2015") <6> ---- @@ -367,7 +374,7 @@ Map jfk = ImmutableMap.of("iata", "JFK", "name", "JFK NYC"); JavaSparkContext jsc = ... // create a pair RDD between the id and the docs -JavaPairRDD pairRdd = jsc.parallelizePairs<1>(ImmutableList.of( +JavaPairRDD pairRdd = jsc.parallelizePairs(ImmutableList.of( <1> new Tuple2(1, otp), <2> new Tuple2(2, jfk))); <3> JavaEsSpark.saveToEsWithMeta(pairRDD, target); <4> @@ -393,26 +400,25 @@ Map sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure -Map otpMeta<3> = ImmutableMap.<4> of(ID, 1, TTL, "1d"); -Map sfoMeta<5> = ImmutableMap. of(ID, "2", VERSION, "23"); +Map otpMeta = ImmutableMap.of(ID, 1, TTL, "1d"); <3> +Map sfoMeta = ImmutableMap. of(ID, "2", VERSION, "23"); <4> JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaPairRDD pairRdd = jsc.parallelizePairs<(ImmutableList.of( - new Tuple2(otpMeta, otp), <6> - new Tuple2(sfoMeta, sfo))); <7> -JavaEsSpark.saveToEsWithMeta(pairRDD, target); <8> + new Tuple2(otpMeta, otp), <5> + new Tuple2(sfoMeta, sfo))); <6> +JavaEsSpark.saveToEsWithMeta(pairRDD, target); <7> ---- <1> +Metadata+ +enum+ describing the document metadata that can be declared <2> static import for the +enum+ to refer to its values in short format (+ID+, +TTL+, etc...) <3> Metadata for +otp+ document -<4> Boiler-plate construct for forcing the +of+ method generic signature -<5> Metadata for +sfo+ document -<6> Tuple between +otp+ (as the value) and its metadata (as the key) -<7> Tuple associating +sfo+ and its metadata -<8> +saveToEsWithMeta+ invoked over the +JavaPairRDD+ containing documents and their respective metadata +<4> Metadata for +sfo+ document +<5> Tuple between +otp+ (as the value) and its metadata (as the key) +<6> Tuple associating +sfo+ and its metadata +<7> +saveToEsWithMeta+ invoked over the +JavaPairRDD+ containing documents and their respective metadata [[spark-read]] [float] @@ -496,8 +502,9 @@ Let us see how this looks, but this time around using http://docs.oracle.com/jav import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.*; <1> ... -JavaRDD> esRDD = - esRDD(jsc, "radio/artists", "?q=me*"<2>).values()<3>; +JavaRDD> rdd = + esRDD(jsc, "radio/artists", "?q=me*") <2> + .values(); <3> ---- <1> statically import `JavaEsSpark` class @@ -653,7 +660,7 @@ val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran") val rdd = sc.makeRDD(Seq(numbers, airports)) val microbatches = mutable.Queue(rdd) <5> -ssc.queueStream(microbatches).saveToEs<6>("spark/docs") +ssc.queueStream(microbatches).saveToEs("spark/docs") <6> ssc.start() ssc.awaitTermination() <7> @@ -843,7 +850,7 @@ JavaStreamingContext jssc = ... JavaRDD stringRDD = jsc.parallelize(ImmutableList.of(json1, json2)); Queue> microbatches = new LinkedList>(); <2> microbatches.add(stringRDD); -JavaDStream<3> stringDStream = jssc.queueStream(microbatches); +JavaDStream stringDStream = jssc.queueStream(microbatches); <3> JavaEsSparkStreaming.saveJsonToEs(stringRDD, "spark/json-trips"); <4> @@ -868,7 +875,10 @@ For cases when the data being written to {es} needs to be indexed under differen [source,scala] ---- -val game = Map("media_type"<1>->"game","title" -> "FF VI","year" -> "1994") +val game = Map( + "media_type" -> "game", <1> + "title" -> "FF VI", + "year" -> "1994") val book = Map("media_type" -> "book","title" -> "Harry Potter","year" -> "2010") val cd = Map("media_type" -> "music","title" -> "Surfing With The Alien") @@ -938,10 +948,12 @@ val sc = ... // instance of StreamingContext val ssc = ... -val airportsRDD<1> = sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) <2> +val airportsRDD = <1> + sc.makeRDD(Seq((1, otp), (2, muc), (3, sfo))) <2> val microbatches = mutable.Queue(airportsRDD) -ssc.queueStream<3>(microbatches).saveToEsWithMeta<4>("airports/2015") +ssc.queueStream(microbatches) <3> + .saveToEsWithMeta("airports/2015") <4> ssc.start() ---- @@ -971,10 +983,12 @@ val sc = ... // instance of StreamingContext val ssc = ... -val airportsRDD = sc.makeRDD<5>(Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) +val airportsRDD = sc.makeRDD( <5> + Seq((otpMeta, otp), (mucMeta, muc), (sfoMeta, sfo))) val microbatches = mutable.Queue(airportsRDD) -ssc.queueStream<6>(microbatches).saveToEsWithMeta<7>("airports/2015") +ssc.queueStream(microbatches) <6> + .saveToEsWithMeta("airports/2015") <7> ssc.start() ---- @@ -1019,7 +1033,8 @@ JavaSparkContext jsc = ... JavaStreamingContext jssc = ... // create an RDD of between the id and the docs -JavaRDD> rdd = jsc.parallelize<1>(ImmutableList.of( +JavaRDD> rdd = jsc.parallelize( <1> + ImmutableList.of( new Tuple2(1, otp), <2> new Tuple2(2, jfk))); <3> @@ -1054,35 +1069,34 @@ Map sfo = ImmutableMap.of("iata", "SFO", "name", "San Fran"); // metadata for each document // note it's not required for them to have the same structure -Map otpMeta<3> = ImmutableMap.<4> of(ID, 1, TTL, "1d"); -Map sfoMeta<5> = ImmutableMap. of(ID, "2", VERSION, "23"); +Map otpMeta = ImmutableMap.of(ID, 1, TTL, "1d"); <3> +Map sfoMeta = ImmutableMap. of(ID, "2", VERSION, "23"); <4> JavaSparkContext jsc = ... // create a pair RDD between the id and the docs JavaRDD> pairRdd = jsc.parallelize<(ImmutableList.of( - new Tuple2(otpMeta, otp), <6> - new Tuple2(sfoMeta, sfo))); <7> + new Tuple2(otpMeta, otp), <5> + new Tuple2(sfoMeta, sfo))); <6> Queue>> microbatches = ... -JavaDStream> dStream = jssc.queueStream(microbatches); <8> +JavaDStream> dStream = jssc.queueStream(microbatches); <7> -JavaPairDStream pairDStream = dstream.mapToPair(new ExtractTuples()) <9> +JavaPairDStream pairDStream = dstream.mapToPair(new ExtractTuples()) <8> -JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); <10> +JavaEsSparkStreaming.saveToEsWithMeta(pairDStream, target); <9> jssc.start(); ---- <1> +Metadata+ +enum+ describing the document metadata that can be declared <2> static import for the +enum+ to refer to its values in short format (+ID+, +TTL+, etc...) <3> Metadata for +otp+ document -<4> Boiler-plate construct for forcing the +of+ method generic signature -<5> Metadata for +sfo+ document -<6> Tuple between +otp+ (as the value) and its metadata (as the key) -<7> Tuple associating +sfo+ and its metadata -<8> Create a +JavaDStream+ out of the +JavaRDD+ -<9> Repack the +JavaDStream+ into a +JavaPairDStream+ by mapping the +Tuple2+ identity function over it. -<10> +saveToEsWithMeta+ invoked over the +JavaPairDStream+ containing documents and their respective metadata +<4> Metadata for +sfo+ document +<5> Tuple between +otp+ (as the value) and its metadata (as the key) +<6> Tuple associating +sfo+ and its metadata +<7> Create a +JavaDStream+ out of the +JavaRDD+ +<8> Repack the +JavaDStream+ into a +JavaPairDStream+ by mapping the +Tuple2+ identity function over it. +<9> +saveToEsWithMeta+ invoked over the +JavaPairDStream+ containing documents and their respective metadata [float] [[spark-streaming-type-conversion]] @@ -1283,7 +1297,9 @@ When using Spark SQL, {eh} allows access to {es} through +SQLContext+ +load+ met ---- val sql = new SQLContext... // Spark 1.3 style -val df = sql.load<1>("spark/index"<2>, "org.elasticsearch.spark.sql"<3>) +val df = sql.load( <1> + "spark/index", <2> + "org.elasticsearch.spark.sql") <3> ---- <1> +SQLContext+ _experimental_ +load+ method for arbitrary data sources @@ -1295,7 +1311,9 @@ In Spark 1.4, one would use the following similar API calls: [source,scala] ---- // Spark 1.4 style -val df = sql.read<1>.format("org.elasticsearch.spark.sql"<2>).load("spark/index"<3>) +val df = sql.read <1> + .format("org.elasticsearch.spark.sql") <2> + .load("spark/index") <3> ---- <1> +SQLContext+ _experimental_ +read+ method for arbitrary data sources @@ -1307,7 +1325,8 @@ In Spark 1.5, this can be further simplified to: [source,scala] ---- // Spark 1.5 style -val df = sql.read.format("es"<1>).load("spark/index") +val df = sql.read.format("es")<1> + .load("spark/index") ---- <1> Use +es+ as an alias instead of the full package name for the +DataSource+ provider @@ -1341,18 +1360,22 @@ For example: val sql = new SQLContext... // options for Spark 1.3 need to include the target path/resource val options13 = Map("path" -> "spark/index", - "pushdown"<1> -> "true", - "es.nodes"<2> -> "someNode", "es.port" -> "9200") + "pushdown" -> "true", <1> + "es.nodes" -> "someNode", <2> + "es.port" -> "9200") // Spark 1.3 style -val spark13DF = sql.load("org.elasticsearch.spark.sql", options13<3>) +val spark13DF = sql.load("org.elasticsearch.spark.sql", options13) <3> // options for Spark 1.4 - the path/resource is specified separately -val options = Map("pushdown"<1> -> "true", "es.nodes"<2> -> "someNode", "es.port" -> "9200") +val options = Map("pushdown" -> "true", <1> + "es.nodes" -> "someNode", <2> + "es.port" -> "9200") // Spark 1.4 style val spark14DF = sql.read.format("org.elasticsearch.spark.sql") - .options<3>(options).load("spark/index") + .options(options) <3> + .load("spark/index") ---- <1> `pushdown` option - specific to Spark data sources @@ -1365,7 +1388,7 @@ val spark14DF = sql.read.format("org.elasticsearch.spark.sql") sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + <1> "USING org.elasticsearch.spark.sql " + <2> - "OPTIONS (<3> resource 'spark/index', nodes 'someNode')" ) " + "OPTIONS (resource 'spark/index', nodes 'someNode')" ) " <3> ---- <1> Spark's temporary table name @@ -1478,7 +1501,8 @@ Available since Spark SQL 1.2, one can also access a data source by declaring it sqlContext.sql( "CREATE TEMPORARY TABLE myIndex " + <1> "USING org.elasticsearch.spark.sql " + <2> - "OPTIONS (resource 'spark/index'<3>, scroll_size<4> '20')" ) + "OPTIONS (resource 'spark/index', " + <3> + "scroll_size '20')" ) <4> ---- <1> Spark's temporary table name @@ -1544,7 +1568,7 @@ And just as with the Spark _core_ support, additional parameters can be specifie [source,scala] ---- // get only the Smiths -val smiths = sqlContext.esDF("spark/people","?q=Smith" <1>) +val smiths = sqlContext.esDF("spark/people","?q=Smith") <1> ---- <1> {es} query whose results comprise the +DataFrame+ @@ -1591,7 +1615,7 @@ Better yet, the +DataFrame+ can be backed by a query result: [source,java] ---- -DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people", "?q=Smith" <1>); +DataFrame people = JavaEsSparkSQL.esDF(sql, "spark/people", "?q=Smith"); <1> ---- <1> {es} query backing the {eh} +DataFrame+ diff --git a/docs/src/reference/asciidoc/core/storm.adoc b/docs/src/reference/asciidoc/core/storm.adoc index 02b60588b..f19def59b 100644 --- a/docs/src/reference/asciidoc/core/storm.adoc +++ b/docs/src/reference/asciidoc/core/storm.adoc @@ -99,8 +99,13 @@ import org.elasticsearch.storm.EsBolt; <1> TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 10); -builder.setBolt("es-bolt", new EsBolt<2>("storm/docs"<3>), 5<4>) - .shuffleGrouping("spout"); +builder.setBolt( + "es-bolt", + new EsBolt(<2> + "storm/docs"<3> + ), + 5)<4> + .shuffleGrouping("spout"); ---- <1> {eh} +EsBolt+ package import @@ -135,8 +140,8 @@ Map conf = new HashMap(); conf.put("es.input.json", "true"); <2> TopologyBuilder builder = new TopologyBuilder(); -builder.setSpout("json-spout", new StringSpout<3>(Arrays.asList(json1, json2)); -builder.setBolt("es-bolt", new EsBolt("storm/json-trips", conf<4>)) +builder.setSpout("json-spout", new StringSpout(Arrays.asList(json1, json2)); <3> +builder.setBolt("es-bolt", new EsBolt("storm/json-trips", conf)) <4> .shuffleGrouping("json-spout"); ---- @@ -154,7 +159,8 @@ In cases where the data needs to be indexed based on its content, one can choose [source, java] ---- builder.setBolt("es-bolt", - new EsBolt("my-collection-{media_type}/doc"<1>)).shuffleGrouping("spout"); + new EsBolt("my-collection-{media_type}/doc") <1> +).shuffleGrouping("spout"); ---- <1> Resource pattern using field +type+ @@ -182,7 +188,10 @@ Map conf = new HashMap(); conf.put("es.input.json", "true"); <1> builder.setBolt("es-bolt", - new EsBolt("my-collection-{media_type}-{year}/doc"<2>, conf<3>)).shuffleGrouping("spout"); + new EsBolt( + "my-collection-{media_type}-{year}/doc",<2> + conf) <3> + ).shuffleGrouping("spout"); ---- <1> Option indicating the input is in JSON format @@ -201,7 +210,12 @@ As you can expect, for reading data (typically executing queries) {eh} offers a import org.elasticsearch.storm.EsSpout; <1> TopologyBuilder builder = new TopologyBuilder(); -builder.setSpout("es-spout", new EsSpout<2>("storm/docs"<3>, "?q=me*<4>), 5<5>); +builder.setSpout( + "es-spout", + new EsSpout( <2> + "storm/docs", <3> + "?q=me*), <4> + 5); <5> builder.setBolt("bolt", new PrinterBolt()).shuffleGrouping("es-spout"); ----