Skip to content

Commit

Permalink
Merge pull request apache#48 in BDP/apache-spark from netlix/2.1.1-un…
Browse files Browse the repository at this point in the history
…stable/cl to netflix/2.1.1-unstable

Squashed commit of the following:

commit 8053563b1d7f0cd21ea34263ebdf017ba309be90
Merge: b8acef1 1035c8d
Author: Abhay <aamin@netflix.com>
Date:   Wed Mar 20 23:21:19 2019 -0700

    Merge remote-tracking branch 'origin/netflix/2.1.1-unstable' into netlix/2.1.1-unstable/cl

commit b8acef18a6421bafbe7e7080706493cc143dc124
Merge: a1842a6 93934ec
Author: Abhay <aamin@netflix.com>
Date:   Wed Mar 20 23:09:30 2019 -0700

    Merge branch 'netlix/2.1.1-unstable/cl' of ssh://stash.corp.netflix.com:7999/bdp/apache-spark into netlix/2.1.1-unstable/cl

commit a1842a69ff5bf706319acc7cf7d4182eb8112bd9
Author: Abhay <aamin@netflix.com>
Date:   Wed Mar 20 22:49:48 2019 -0700

    fix typo for test class name

commit 401024f6e9a41e885d1ead816db66e6c870c07b7
Author: Abhay <aamin@netflix.com>
Date:   Mon Mar 18 12:00:12 2019 -0700

    add cl_snapshot_extract udf for spark native

commit 93934ecc9be6eafeebd7b0bd85124725eb77d831
Author: Abhay <aamin@netflix.com>
Date:   Wed Mar 20 22:49:48 2019 -0700

    fix typo for test class name

commit 45d51c59bbe5250feff325c3540bfab899b83174
Author: Abhay <aamin@netflix.com>
Date:   Mon Mar 18 12:00:12 2019 -0700

    add cl_snapshot_extract udf for spark native
  • Loading branch information
abhay-a committed Mar 21, 2019
1 parent 1035c8d commit 349c726
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 0 deletions.
8 changes: 8 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,14 @@ def nf_quarter(input):
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.nf_quarter(_to_java_column(input)))

@since(2.1)
def cl_snapshot_extract(input1, input2, input3, input4):
"""
Extracts desired values from cl snapshot based on clType, extractCriteria and filterCriteria.
Refer http://go/cludfs
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.cl_snapshot_extract(_to_java_column(input1), input2, input3, input4))

# ---------------------------- misc functions ----------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ object FunctionRegistry {
expression[NfJsonExtract]("nf_json_extract"),
expression[NfJsonExtractArray]("nf_json_extract_array"),

// Cl functions
expression[ClSnapshotExtract]("cl_snapshot_extract"),

// collection functions
expression[CreateArray]("array"),
expression[ArrayContains]("array_contains"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions
import java.util.ArrayList

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.catalyst.util.NetflixClUtils._
import org.apache.spark.sql.catalyst.util.NetflixJsonUtils._
import org.apache.spark.sql.types.{
AbstractDataType,
ArrayType,
DataType,
StringType
}
import org.apache.spark.unsafe.types.UTF8String

/**
* Extracts values from a cl snapshot based on clType, extractCriteria, and filterCriteria
* and returns array of strings. It will return null if the input parameters are invalid.
*/
@ExpressionDescription(
usage =
"_FUNC_(json, clType, filterCriteria, extractCriteria) - " +
"Extracts values from cl snapshot, http://go/cludfs."
)
case class ClSnapshotExtract(snapshot: Expression,
clType: Expression,
extractCriteria: Expression,
filterCriteria: Expression)
extends ExpectsInputTypes
with CodegenFallback {
override def children: Seq[Expression] =
Seq(snapshot, clType, extractCriteria, filterCriteria)

override def inputTypes: Seq[AbstractDataType] =
Seq(StringType, StringType, StringType, StringType)
override def dataType: DataType = ArrayType(StringType)
override def nullable: Boolean = true
override def prettyName: String = "cl_snapshot_extract"

override def eval(input: InternalRow): Any = {
val extractedValue =
snapshotExtract(input, snapshot, clType, extractCriteria, filterCriteria)
val result = new ArrayBuffer[UTF8String]
if (extractedValue.isInstanceOf[ArrayList[Any]]) {
val matchesArray = extractedValue.asInstanceOf[ArrayList[Any]]
for (data <- matchesArray.asScala) {
result.append(getJsonAsString(data))
}
} else {
result.append(getJsonAsString(extractedValue))
}
ArrayData.toArrayData(result.toArray)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import scala.collection.immutable.HashSet

import com.jayway.jsonpath.{JsonPath, JsonPathException}

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.unsafe.types.UTF8String

object NetflixClUtils {

private val relationalOperators = HashSet("==", "!=", ">=", "<=", ">", "<")

def generateClSnapshotJaywayJsonPath(clType: String,
extractCriteria: String,
filterCriteria: String): String = {
val jsonPathifiedExtractCriteria =
Option(extractCriteria).getOrElse("") match {
case x if x.nonEmpty => s".$x"
case _ => ""
}

val jsonPathifiedFilterCriteria =
Option(filterCriteria).getOrElse("") match {
case x if x.nonEmpty =>
val tokens = filterCriteria
.split(
"((?<===|>|<|>=|<=|!=|&&|\\|\\||\\(|\\)) *|(?===|>|<|>=|<=|!=|&&|\\|\\||\\(|\\))) *")
.toList
" && (" + tokens
.sliding(2)
.toList
.map(e =>
if (relationalOperators
.contains(e.last)) { s"@.${e.head.trim}" } else {
s"${e.head.trim}"
})
.mkString + tokens.last.trim + ")"
case _ => ""
}

"$[?(\"" + Option(clType).getOrElse("") +
"\" in @.type" + jsonPathifiedFilterCriteria + ")]" + jsonPathifiedExtractCriteria
}

@throws(classOf[JsonPathException])
def snapshotExtract(input: InternalRow,
json: Expression,
clType: Expression,
extractCriteria: Expression,
filterCriteria: Expression): Any = {
val snapshot = json.eval(input).asInstanceOf[UTF8String]
if (snapshot == null) {
return null
}

if (clType.eval().asInstanceOf[UTF8String] == null) {
throw new IllegalArgumentException(
"`cl_snapshot_extract` must be supplied with " +
"a valid `clType` from http://go/cl, refer http://go/cludfs")
}

val jsonPath = generateClSnapshotJaywayJsonPath(
clType.eval().asInstanceOf[UTF8String].toString,
extractCriteria.eval().asInstanceOf[UTF8String].toString,
filterCriteria.eval().asInstanceOf[UTF8String].toString
)
JsonPath.parse(snapshot.toString).read(jsonPath.toString)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkFunSuite

class NetflixClExpressionsSuit extends SparkFunSuite with ExpressionEvalHelper {

private val snapshot =
"""
|[{"sequence":1,"id":757294327146741,"source":"iOS","schema":{"name":"App","version":"1.26.0"},"type":["Log","Session"],"time":559440041005},
|{"type":["ProcessState"],"id":757294397275504,"computation":"none","allocation":"none","interaction":"none"},
|{"type":["ProcessState"],"id":757294425125683,"computation":"normal","allocation":"normal","interaction":"direct"},
|{"sequence":3,"id":757294439218544,"type":["ProcessStateTransition","Action","Session"],"time":559440041006},
|{"nfvdid":"blahblablb","id":757294485523660,"sequence":4,"type":["VisitorDeviceId","AccountIdentity","Session"],"time":559440041008},
|{"sequence":5,"id":757294513373839,"type":["UserInteraction","Session"],"time":559440041008},
|{"model":"APPLE_iPhone9-1","type":["Device"],"id":757294562698854},
|{"type":["OsVersion"],"osVersion":"11.4.1","id":757294575785082},
|{"appVersion":"11.2.0","type":["AppVersion"],"id":757294615379312},
|{"uiVersion":"ios11.2.0 (2265)","type":["UiVersion"],"id":757294643565035},
|{"esn":"","type":["Esn"],"id":757294732484280},
|{"type":["NrdLib"],"id":757294743557242,"appVersion":"11.2.0 (2265)","sdkVersion":"2012.4","libVersion":"2012.4"},
|{"userAgent":" App/11.2.0 ( iOS 11.4.1 )","type":["UserAgent"],"id":757294786171371},
|{"utcOffset":-18000,"type":["TimeZone"],"id":757294829121044},
|{"muting":false,"id":757294867037552,"level":0.875,"type":["Volume"]},
|{"type":["WifiConnection","NetworkConnection"],"id":757294904954060},
|{"type":["UiLocale"],"uiLocale":"en","id":757294954950164},
|{"cells":{"7972":1,"10953":4},"type":["TestAllocations"],"id":757294995551027},
|{"trackingInfo":{"videoId":12345,"trackId":9087,"imageKey":"test1"},"sequence":9,"id":757295011213817,"view":"browseTitles","type":["Presentation","Session"],"time":559440043683},
|{"trackingInfo":{"surveyResponse":1,"surveyIdentifier":"IO_80203147"},"sequence":11,"id":757295111313817,"view":"homeTab","type":["Focus","Session"],"time":559440043683}]
""".stripMargin.replace("\n", "")

test("function with only `clType`") {
val result = List(
"{\"id\":757295111313817,\"sequence\":11,\"time\":559440043683,\"trackingInfo\":{\"surveyIdentifier\":\"IO_80203147\",\"surveyResponse\":1},\"type\":[\"Focus\",\"Session\"],\"view\":\"homeTab\"}"
)
checkEvaluation(ClSnapshotExtract(Literal(snapshot),
Literal("Focus"),
Literal(""),
Literal("")),
result)
val result1 = List(
"{\"id\":757294327146741,\"schema\":{\"name\":\"App\",\"version\":\"1.26.0\"},\"sequence\":1,\"source\":\"iOS\",\"time\":559440041005,\"type\":[\"Log\",\"Session\"]}",
"{\"id\":757294439218544,\"sequence\":3,\"time\":559440041006,\"type\":[\"ProcessStateTransition\",\"Action\",\"Session\"]}",
"{\"id\":757294485523660,\"nfvdid\":\"blahblablb\",\"sequence\":4,\"time\":559440041008,\"type\":[\"VisitorDeviceId\",\"AccountIdentity\",\"Session\"]}",
"{\"id\":757294513373839,\"sequence\":5,\"time\":559440041008,\"type\":[\"UserInteraction\",\"Session\"]}",
"{\"id\":757295011213817,\"sequence\":9,\"time\":559440043683,\"trackingInfo\":{\"imageKey\":\"test1\",\"trackId\":9087,\"videoId\":12345},\"type\":[\"Presentation\",\"Session\"],\"view\":\"browseTitles\"}",
"{\"id\":757295111313817,\"sequence\":11,\"time\":559440043683,\"trackingInfo\":{\"surveyIdentifier\":\"IO_80203147\",\"surveyResponse\":1},\"type\":[\"Focus\",\"Session\"],\"view\":\"homeTab\"}"
)
checkEvaluation(ClSnapshotExtract(Literal(snapshot),
Literal("Session"),
Literal(""),
Literal("")),
result1)
}

test("function with `clType` and `extractCriteria`") {
val result = List("\"homeTab\"")
checkEvaluation(ClSnapshotExtract(Literal(snapshot),
Literal("Focus"),
Literal("view"),
Literal("")),
result)
val result1 = List("1")
checkEvaluation(ClSnapshotExtract(Literal(snapshot),
Literal("Focus"),
Literal("trackingInfo.surveyResponse"),
Literal("")),
result1)
}

test("function with all the arguments present") {
val result =
List("{\"surveyIdentifier\":\"IO_80203147\",\"surveyResponse\":1}")
checkEvaluation(ClSnapshotExtract(Literal(snapshot),
Literal("Focus"),
Literal("trackingInfo"),
Literal("view==\"homeTab\"")),
result)
val result1 = List("\"IO_80203147\"")
checkEvaluation(
ClSnapshotExtract(
Literal(snapshot),
Literal("Focus"),
Literal("trackingInfo.surveyIdentifier"),
Literal("view==\"homeTab\" && trackingInfo.surveyResponse==1")),
result1
)
val result3 = List()
checkEvaluation(
ClSnapshotExtract(
Literal(snapshot),
Literal("Focus"),
Literal("trackingInfo.surveyIdentifier"),
Literal("view==\"browseTitles\" && trackingInfo.surveyResponse==1")),
result3
)
val result4 = List("\"test1\"")
checkEvaluation(
ClSnapshotExtract(
Literal(snapshot),
Literal("Presentation"),
Literal("trackingInfo.imageKey"),
Literal(
"view==\"browseTitles\" && ( trackingInfo.trackId==9087 || trackingInfo.videoId == 12345 )")
),
result4
)
}

test("function without `clType` parameter") {
try {
ClSnapshotExtract(
Literal(snapshot),
Literal(null),
Literal("trackingInfo.imageKey"),
Literal(
"view==\"browseTitles\" && ( trackingInfo.trackId==9087 || trackingInfo.videoId == 12345 )")
)
} catch {
case _: IllegalArgumentException =>
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper

class NetflixClUtilsSuite extends SparkFunSuite with ExpressionEvalHelper {
test("jsonpath with only `clType`") {
assertResult(
NetflixClUtils.generateClSnapshotJaywayJsonPath("Focus", "", ""))(
"$[?(\"Focus\" in @.type)]")
assertResult(
NetflixClUtils.generateClSnapshotJaywayJsonPath("Session", "", ""))(
"$[?(\"Session\" in @.type)]")
}

test("jsonpath with `clType` and `extractCriteria`") {
assertResult(
NetflixClUtils.generateClSnapshotJaywayJsonPath("Focus", "view", null))(
"$[?(\"Focus\" in @.type)].view")
assertResult(
NetflixClUtils.generateClSnapshotJaywayJsonPath(
"Focus",
"trackingInfo.surveyResponse",
null))("$[?(\"Focus\" in @.type)].trackingInfo.surveyResponse")
}

test("jsonpath with all arguments") {
assertResult(
NetflixClUtils.generateClSnapshotJaywayJsonPath("Focus",
"trackingInfo",
"view==\"homeTab\""))(
"$[?(\"Focus\" in @.type && (@.view==\"homeTab\"))].trackingInfo"
)
assertResult(
NetflixClUtils.generateClSnapshotJaywayJsonPath(
"Focus",
"trackingInfo.surveyIdentifier",
"view==\"homeTab\" && trackingInfo.surveyResponse==1"))(
"$[?(\"Focus\" in @.type && (@.view==\"homeTab\"&&@.trackingInfo.surveyResponse==1))].trackingInfo.surveyIdentifier"
)
assertResult(
NetflixClUtils.generateClSnapshotJaywayJsonPath(
"Presentation",
"trackingInfo.imageKey",
"view==\"browseTitles\" && ( trackingInfo.trackId==9087 || trackingInfo.videoId == 12345 )"))(
"$[?(\"Presentation\" in @.type && (@.view==\"browseTitles\"&&(@.trackingInfo.trackId==9087||@.trackingInfo.videoId==12345)))].trackingInfo.imageKey"
)
}
}
Loading

0 comments on commit 349c726

Please sign in to comment.