Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Feature/flink jena #14

Merged
merged 28 commits into from
Jun 28, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
44067c4
Flink reasoning with Jena datastructures.
LorenzBuehmann Jun 26, 2018
24d641b
Merge branch 'develop' into feature/flink-jena
LorenzBuehmann Jun 27, 2018
2df2c0c
Cont. Flink RDF
LorenzBuehmann Jun 27, 2018
7d231ec
Minor Pom changes
LorenzBuehmann Dec 9, 2016
272f0e7
Flink reasoning with Jena datastructures.
LorenzBuehmann Jun 26, 2018
69624cf
Cont. Flink RDF
LorenzBuehmann Jun 27, 2018
8719a31
Use RDF layer for loading/writing
LorenzBuehmann Jun 16, 2019
ec65954
Merge branch 'feature/flink-jena' of github.com:SANSA-Stack/SANSA-Inf…
LorenzBuehmann Jun 16, 2019
4d99e4f
Key type wrapper.
LorenzBuehmann Jun 17, 2019
fc07298
Flink needs either key type or key selector function for join()
LorenzBuehmann Jun 17, 2019
be1bf6f
Minor changes in main entry class, e.g. always write to disk and bump…
LorenzBuehmann Jun 17, 2019
3a34c4a
Merge branch 'develop' into feature/flink-jena
LorenzBuehmann Jun 18, 2019
095b5a0
boolean function utils.
LorenzBuehmann Jun 27, 2019
04effc9
POM cleanup
LorenzBuehmann Jun 27, 2019
3dd33b9
Merge remote-tracking branch 'origin/feature/0.6.0-SNAPSHOT' into fea…
LorenzBuehmann Jun 27, 2019
d5e6b6c
Utils simplified
LorenzBuehmann Jun 27, 2019
7cf311c
Extended test output if test fails.
LorenzBuehmann Jun 27, 2019
f66a9e0
Simplified conversion from RDD[Triple] to Jena Model
LorenzBuehmann Jun 27, 2019
79e2e3a
minor changes in I/O
LorenzBuehmann Jun 27, 2019
9da7479
subtraction operation for Flink DataSet with Jena Triple
LorenzBuehmann Jun 27, 2019
2e84e5b
Improved Flink conformance test setup
LorenzBuehmann Jun 27, 2019
89394b0
log output of test base class
LorenzBuehmann Jun 27, 2019
af1ff94
Flink conformance test clean up
LorenzBuehmann Jun 28, 2019
8a4d6d3
Flink reasoning on Triple DataSet
LorenzBuehmann Jun 28, 2019
634099a
Added missing deps for dist package
LorenzBuehmann Jun 28, 2019
2ddb2f9
Minor
LorenzBuehmann Jun 28, 2019
b5e03b2
Scalastyle cleanup
LorenzBuehmann Jun 28, 2019
50260fe
Merge branch 'develop' into feature/flink-jena
GezimSejdiu Jun 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Key type wrapper.
  • Loading branch information
LorenzBuehmann committed Jun 17, 2019
commit 4d99e4f8bb94dc1b4d1e474a2938885e6f403e7e
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package net.sansa_stack.inference.flink.utils

import org.apache.jena.graph.Node
import org.apache.jena.sparql.util.NodeComparator

/**
* Key type wrapper for Jena `Node` objects.
* It basically makes Node comparable which is necessary to be handles as key in Flink.
*
* @author Lorenz Buehmann
*/
class NodeKey(val node: Node) extends Comparable[NodeKey] with Equals {

override def compareTo(o: NodeKey): Int = {
val other = o.node
if (node == null)
if (other == null) 0 else -1
else
if (other == null) 1 else new NodeComparator().compare(node, other)
}

override def canEqual(that: Any): Boolean = that.isInstanceOf[NodeKey]

override def hashCode(): Int = 31 * node.##

override def equals(that: Any): Boolean =
that match {
case key: NodeKey => (this eq key) || (key.canEqual(this) && hashCode == key.hashCode)
case _ => false
}
}

object NodeKey {
def apply(node: Node): NodeKey = new NodeKey(node)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package net.sansa_stack.inference.flink.utils.key

/**
* Base of a tuple-like generic key.
*
* @tparam T The type of the concrete key type.
*/
abstract class Key[T <: Key[T]] extends Comparable[T] {
/**
* Gets the i-th element of the tuple-like key.
*
* @param pos The position.
* @return The element at that key position;
*/
def get(pos: Int): Any
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package net.sansa_stack.inference.flink.utils.key

/**
* A key with one key field.
*
* @tparam T1 The type of the field.
*/
class Key1[T1 <: Comparable[T1]](val value1: T1) extends Key[Key1[T1]] with Equals {

def get(pos: Int): Any = pos match {
case 0 =>
value1
case _ =>
throw new IndexOutOfBoundsException
}

override def hashCode: Int = if (value1 == null) 0 else value1.hashCode

override def canEqual(that: Any): Boolean = that.isInstanceOf[Key1[T1]]

override def equals(obj: Any): Boolean =
obj match {
case that: Key1[T1] => (this eq that) || (this.canEqual(that) && (value1 == that.value1))
case _ => false
}

override def toString: String = s"Key1 ($value1)"

def compareTo(o: Key1[T1]): Int = {
val other = o.value1
if (value1 == null)
if (other == null) 0 else -1
else
if (other == null) 1 else value1.compareTo(other)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package net.sansa_stack.inference.flink.utils.key

/**
* A key with two key fields.
*
* @tparam T1 The type of the first field.
* @tparam T2 The type of the second field.
*/
class Key2[T1 <: Comparable[T1], T2 <: Comparable[T2]](val value1: T1, val value2: T2)
extends Key[Key2[T1, T2]]
with Equals {

def get(pos: Int): Any = pos match {
case 0 =>
value1
case 1 =>
value2
case _ =>
throw new IndexOutOfBoundsException
}

override def hashCode: Int = {
val c1: Int = if (value1 == null) 0 else value1.hashCode
val c2: Int = if (value2 == null) 0 else value2.hashCode
c1 * 17 + c2 * 31
}

override def canEqual(that: Any): Boolean = that.isInstanceOf[Key2[T1, T2]]

override def equals(obj: Any): Boolean =
obj match {
case that: Key2[T1, T2] => (this eq that) || (this.canEqual(that) && (value1 == that.value1) && (value2 == that.value2))
case _ => false
}

override def toString: String = s"Key2 ($value1, $value2)"

def compareTo(o: Key2[T1, T2]): Int = {
val other1 = o.value1
val other2 = o.value2

val c1 = if (value1 == null)
if (other1 == null) 0 else -1
else
if (other1 == null) 1 else value1.compareTo(other1)

if(c1 != 0) c1 else
if (value2 == null)
if (other2 == null) 0 else -1
else
if (other2 == null) 1 else value2.compareTo(other2)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package net.sansa_stack.inference.flink.utils.key

/**
* A key with two key fields.
*
* @tparam T1 The type of the first field.
* @tparam T2 The type of the second field.
* @tparam T3 The type of the third field.
*/
class Key3[T1 <: Comparable[T1], T2 <: Comparable[T2], T3 <: Comparable[T3]](val value1: T1, val value2: T2, val value3: T3)
extends Key[Key3[T1, T2, T3]]
with Equals {

def get(pos: Int): Any = pos match {
case 0 =>
value1
case 1 =>
value2
case 2 =>
value3
case _ =>
throw new IndexOutOfBoundsException
}

override def hashCode: Int = {
val c1: Int = if (value1 == null) 0 else value1.hashCode
val c2: Int = if (value2 == null) 0 else value2.hashCode
val c3: Int = if (value3 == null) 0 else value3.hashCode
c1 * 17 + c2 * 31 + c3 * 47
}

override def canEqual(that: Any): Boolean = that.isInstanceOf[Key3[T1, T2, T3]]

override def equals(obj: Any): Boolean =
obj match {
case that: Key3[T1, T2, T3] =>
(this eq that) || (this.canEqual(that) && (value1 == that.value1) && (value2 == that.value2) && (value3 == that.value3))
case _ => false
}

override def toString: String = s"Key3 ($value1, $value2, $value3)"

def compareTo(o: Key3[T1, T2, T3]): Int = {
val other1 = o.value1
val other2 = o.value2
val other3 = o.value3

val c1 = if (value1 == null)
if (other1 == null) 0 else -1
else
if (other1 == null) 1 else value1.compareTo(other1)

if(c1 != 0) c1 else {
val c2 = if (value2 == null)
if (other2 == null) 0 else -1
else
if (other2 == null) 1 else value2.compareTo(other2)

if(c2 != 0) c2 else {
if (value3 == null)
if (other3 == null) 0 else -1
else
if (other3 == null) 1 else value3.compareTo(other3)
}

}

}
}