Skip to content
This repository has been archived by the owner on Oct 8, 2020. It is now read-only.

Commit

Permalink
Cont. Flink RDF
Browse files Browse the repository at this point in the history
  • Loading branch information
LorenzBuehmann committed Jun 14, 2019
1 parent 272f0e7 commit 69624cf
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 90 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@
<artifactId>sansa-rdf-partition-core</artifactId>
<version>${sansa.rdf.version}</version>
</dependency>
<dependency>
<groupId>net.sansa-stack</groupId>
<artifactId>sansa-rdf-flink_${scala.binary.version}</artifactId>
<version>${sansa.rdf.version}</version>
</dependency>

<!-- SANSA OWL -->
<dependency>
<groupId>net.sansa-stack</groupId>
Expand Down
6 changes: 6 additions & 0 deletions sansa-inference-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ under the License.
<scope>test</scope>
</dependency>

<!-- RDF Layer -->
<dependency>
<groupId>net.sansa-stack</groupId>
<artifactId>sansa-rdf-flink_${scala.binary.version}</artifactId>
</dependency>

<!-- Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ object RDFGraphMaterializer {

// load triples from disk
val graph = RDFGraphLoader.loadFromDisk(input, env)
// println(s"|G| = ${graph.size()}")

// create reasoner
val reasoner = profile match {
Expand All @@ -90,7 +89,7 @@ object RDFGraphMaterializer {

// compute inferred graph
val inferredGraph = reasoner.apply(graph)
println(s"|G_inf| = ${inferredGraph.size()}")
println(s"|G_inf| = ${inferredGraph.size}")

// write triples to disk
// RDFGraphWriter.writeToDisk(inferredGraph, output, writeToSingleFile, sortedOutput)
Expand Down Expand Up @@ -119,7 +118,7 @@ object RDFGraphMaterializer {

// the CLI parser
val parser = new scopt.OptionParser[Config]("RDFGraphMaterializer") {
head("RDFGraphMaterializer", "0.1.0")
head("RDFGraphMaterializer", "0.4.0")

// opt[Seq[File]]('i', "input").required().valueName("<path1>,<path2>,...").
// action((x, c) => c.copy(in = x)).
Expand All @@ -128,7 +127,7 @@ object RDFGraphMaterializer {
.required()
.valueName("<path>")
.action((x, c) => c.copy(in = x))
.text("path to file or directory that contains the input files (in N-Triple format)")
.text("path to file or directory that contains the input files (in N-Triples format)")

opt[URI]('o', "out")
.required()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,5 @@ case class RDFGraph(triples: DataSet[Triple]) {
*
* @return the number of triples
*/
def size(): Long = {
triples.count()
}
lazy val size: Long = triples.count()
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package net.sansa_stack.inference.flink.data

import java.io.File
import java.net.URI

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import net.sansa_stack.inference.data.RDFTriple
import org.apache.flink.configuration.Configuration
import scala.collection.JavaConverters._
import scala.language.implicitConversions

import org.apache.jena.rdf.model.impl.NTripleReader
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.jena.riot.{Lang, RDFDataMgr}

import net.sansa_stack.rdf.benchmark.io.ReadableByteChannelFromIterator

import net.sansa_stack.inference.utils.NTriplesStringToRDFTriple

/**
* @author Lorenz Buehmann
Expand All @@ -20,41 +18,50 @@ object RDFGraphLoader {

implicit def pathURIsConverter(uris: Seq[URI]): String = uris.map(p => p.toString).mkString(",")

def loadFromFile(path: String, env: ExecutionEnvironment): RDFGraph = {
val triples = env.readTextFile(path)
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2))) // tokens to triple

RDFGraph(triples)
def loadFromDisk(path: String, env: ExecutionEnvironment): RDFGraph = {
loadFromDisk(URI.create(path), env)
}

def loadFromDisk(path: URI, env: ExecutionEnvironment): RDFGraph = {
// create a configuration object
val parameters = new Configuration
loadFromDisk(Seq(path), env)
}

def loadFromDisk(paths: Seq[URI], env: ExecutionEnvironment): RDFGraph = {
// // create a configuration object
// val parameters = new Configuration
//
// // set the recursive enumeration parameter
// parameters.setBoolean("recursive.file.enumeration", true)
// env.readTextFile(f).withParameters(parameters)

// set the recursive enumeration parameter
parameters.setBoolean("recursive.file.enumeration", true)
val tmp: List[String] = paths.map(path => path.toString).toList

val triples = tmp
.map(f => env.readTextFile(f)) // no support to read from multiple paths at once, thus, map + union here
.reduce(_ union _) // TODO Flink 1.5.0 supports multiple paths via FileInputFormat
.mapPartition(p => {
// convert iterator to input stream
val is = ReadableByteChannelFromIterator.toInputStream(p.asJava)

// pass the configuration to the data source
val triples = env.readTextFile(path.toString).withParameters(parameters)
.map(line => line.replace(">", "").replace("<", "").split("\\s+")) // line to tokens
.map(tokens => RDFTriple(tokens(0), tokens(1), tokens(2)))
.name("triples") // tokens to triple
RDFDataMgr.createIteratorTriples(is, Lang.NTRIPLES, null).asScala
})
.name("triples")

RDFGraph(triples)
}

def loadFromDisk(paths: Seq[URI], env: ExecutionEnvironment): RDFGraph = {
def main(args: Array[String]): Unit = {
if (args.length == 0) println("Usage: RDFGraphLoader <PATH_TO_FILE>")

val tmp: List[String] = paths.map(path => path.toString).toList
val path = args(0)

val converter = new NTriplesStringToRDFTriple()
val env = ExecutionEnvironment.getExecutionEnvironment

val triples = tmp
.map(f => env.readTextFile(f).flatMap(line => converter.apply(line))).reduce(_ union _).name("triples")
val ds = RDFGraphLoader.loadFromDisk(path, env).triples

RDFGraph(triples)
println(s"size:${ds.count}")
println("sample data:\n" + ds.first(10).map { _.toString.replaceAll("[\\x00-\\x1f]","???")}.collect().mkString("\n"))
}

}
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package net.sansa_stack.inference.flink.data

import java.io.{ByteArrayInputStream, File}
import java.io.ByteArrayInputStream
import java.net.URI
import java.nio.charset.StandardCharsets

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem
import org.apache.jena.rdf.model.{Model, ModelFactory}

import net.sansa_stack.inference.utils.{RDFTripleOrdering, RDFTripleToNTripleString}
import org.apache.jena.sparql.util.TripleComparator
import org.slf4j.LoggerFactory

import net.sansa_stack.inference.utils.{JenaTripleToNTripleString, RDFTripleOrdering}

/**
* Writes an RDF graph to disk.
*
Expand All @@ -26,10 +27,10 @@ object RDFGraphWriter {
logger.info("writing triples to disk...")
val startTime = System.currentTimeMillis()

implicit val ordering = RDFTripleOrdering
implicit val ordering = new TripleComparator()

graph.triples.map(t => (t, t)).sortPartition(1, Order.DESCENDING).map(_._1)
.map(new RDFTripleToNTripleString()) // to N-TRIPLES string
.map(new JenaTripleToNTripleString()) // to N-Triples string
.writeAsText(path, writeMode = FileSystem.WriteMode.OVERWRITE)

logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
Expand Down Expand Up @@ -61,14 +62,14 @@ object RDFGraphWriter {
}

tmp
.map(new RDFTripleToNTripleString()) // to N-TRIPLES string
.map(new JenaTripleToNTripleString()) // to N-TRIPLES string
.writeAsText(path.toString, writeMode = FileSystem.WriteMode.OVERWRITE)

logger.info("finished writing triples to disk in " + (System.currentTimeMillis()-startTime) + "ms.")
}

def convertToModel(graph: RDFGraph) : Model = {
val modelString = graph.triples.map(new RDFTripleToNTripleString())
val modelString = graph.triples.map(new JenaTripleToNTripleString())
.collect().mkString("\n")

val model = ModelFactory.createDefaultModel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package net.sansa_stack.inference.flink
import java.util
import java.util.Comparator

import scala.collection.JavaConverters._

import com.google.common.collect.ComparisonChain
import net.sansa_stack.inference.flink.data.RDFGraph
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.apache.jena.graph.{NodeFactory, Triple}
import org.apache.jena.sparql.util.TripleComparator
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import net.sansa_stack.inference.data.RDFTriple

import scala.collection.JavaConverters._
import net.sansa_stack.inference.data.RDFTriple
import net.sansa_stack.inference.flink.data.RDFGraph

/**
* A test case for the computation of the transitive closure (TC).
Expand All @@ -26,19 +29,24 @@ class RDFGraphTestCase(mode: TestExecutionMode) extends MultipleProgramsTestBase
def testSubtract(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment

val s1 = NodeFactory.createURI("s1")
val p1 = NodeFactory.createURI("p1")
val o1 = NodeFactory.createURI("o1")
val o2 = NodeFactory.createURI("o2")
val o3 = NodeFactory.createURI("o3")

// generate dataset
val g1 = RDFGraph(env.fromCollection(
Seq(
RDFTriple("s1", "p1", "o1"),
RDFTriple("s1", "p1", "o2"),
RDFTriple("s1", "p1", "o3")
Triple.create(s1, p1, o1),
Triple.create(s1, p1, o2),
Triple.create(s1, p1, o3)
)
))
val g2 = RDFGraph(env.fromCollection(
Seq(
RDFTriple("s1", "p1", "o1"),
RDFTriple("s1", "p1", "o2")
Triple.create(s1, p1, o1),
Triple.create(s1, p1, o2)
)
))

Expand All @@ -47,17 +55,12 @@ class RDFGraphTestCase(mode: TestExecutionMode) extends MultipleProgramsTestBase

val result = g_diff.triples.collect()
val expected = Seq(
RDFTriple("s1", "p1", "o3")
Triple.create(s1, p1, o3)
)

TestBaseUtils.compareResultCollections(new util.ArrayList(result.asJava), new util.ArrayList(expected.asJava), new Comparator[RDFTriple] {
override def compare(t1: RDFTriple, t2: RDFTriple): Int =
ComparisonChain.start()
.compare(t1.s, t2.s)
.compare(t1.p, t2.p)
.compare(t1.o, t2.o)
.result()
})
TestBaseUtils.compareResultCollections(
new util.ArrayList(result.asJava),
new util.ArrayList(expected.asJava),
new TripleComparator())
}

}
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package net.sansa_stack.inference.flink.conformance

import net.sansa_stack.inference.flink.data.RDFGraphWriter
import net.sansa_stack.test.conformance.{IntegrationTestSuite, OWLHorstConformanceTestBase}
import scala.collection.mutable

import org.apache.flink.api.scala._
import org.apache.jena.graph.Triple
import org.apache.jena.rdf.model.Model
import net.sansa_stack.inference.data.{RDFTriple, SimpleRDFOps}
import net.sansa_stack.inference.flink.data.{RDFGraph, RDFGraphWriter}
import org.scalatest.Ignore

import scala.collection.mutable
import net.sansa_stack.inference.data.{Jena, JenaOps}
import net.sansa_stack.inference.flink.data.{RDFGraph, RDFGraphWriter}
import net.sansa_stack.test.conformance.OWLHorstConformanceTestBase

/**
* The class is to test the conformance of each materialization rule of OWL Horst entailment.
*
* @author Lorenz Buehmann
*
*/
@IntegrationTestSuite
class OWLHorstConformanceTest extends OWLHorstConformanceTestBase(rdfOps = new SimpleRDFOps) with SharedOWLHorstReasonerContext{
class OWLHorstConformanceTest
extends OWLHorstConformanceTestBase[Jena](rdfOps = new JenaOps)
with SharedOWLHorstReasonerContext{

override def computeInferredModel(triples: mutable.HashSet[RDFTriple]): Model = {
override def computeInferredModel(triples: mutable.HashSet[Triple]): Model = {
// distribute triples
val triplesRDD = env.fromCollection(triples)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
package net.sansa_stack.inference.flink.conformance

import net.sansa_stack.inference.flink.data.RDFGraphWriter
import net.sansa_stack.test.conformance.{IntegrationTestSuite, RDFSConformanceTestBase}
import org.apache.jena.rdf.model.Model
import net.sansa_stack.inference.data.{RDFTriple, SimpleRDFOps}
import net.sansa_stack.inference.flink.data.RDFGraph
import scala.collection.mutable

import org.apache.flink.api.scala._
import org.scalatest.Ignore
import org.apache.jena.graph.Triple
import org.apache.jena.rdf.model.Model

import scala.collection.mutable
import net.sansa_stack.inference.data.{Jena, JenaOps}
import net.sansa_stack.inference.flink.data.{RDFGraph, RDFGraphWriter}
import net.sansa_stack.test.conformance.RDFSConformanceTestBase

/**
* The class is to test the conformance of each materialization rule of RDFS(simple) entailment.
*
* @author Lorenz Buehmann
*
*/
@IntegrationTestSuite
class RDFSConformanceTest extends RDFSConformanceTestBase(rdfOps = new SimpleRDFOps) with SharedRDFSReasonerContext{
class RDFSConformanceTest
extends RDFSConformanceTestBase[Jena](rdfOps = new JenaOps)
with SharedRDFSReasonerContext{

override def computeInferredModel(triples: mutable.HashSet[RDFTriple]): Model = {
override def computeInferredModel(triples: mutable.HashSet[Triple]): Model = {
// distribute triples
val triplesRDD = env.fromCollection(triples)

Expand Down
Loading

0 comments on commit 69624cf

Please sign in to comment.