Skip to content

Titan Format

Dan LaRocque edited this page Sep 5, 2014 · 62 revisions
This is the documentation for Faunus 0.4.
Faunus was merged into Titan and renamed Titan-Hadoop in version 0.5.
Documentation for the latest Titan version is available at http://s3.thinkaurelius.com/docs/titan/current.

  • InputFormat: com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraInputFormat
  • InputFormat: com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
  • OutputFormat: com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraOutputFormat
  • OutputFormat: com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseOutputFormat

Titan is a distributed graph database developed by Aurelius and provided under the Apache 2 license. Titan is backend agnostic and is currently deployed with support for Apache Cassandra and Apache HBase (see The Benefits of Titan and Storage Backend Overview). Faunus can be used to bulk load, incremental load, and read data to and from a Titan cluster.

Titan InputFormat Support

An InputFormat specifies how to turn a data source into a stream of Hadoop <KEY,VALUE> pairs (see blog post). For Faunus, this means turning the source data into a stream of <NullWritable, FaunusVertex> pairs. The following TitanXXXInputFormat classes stream Titan encoded data contained within Cassandra and HBase into Faunus/Hadoop.

TitanCassandraInputFormat

In order to read graph data from Titan/Cassandra, a graph needs to exist. For the sake of an example, The Graph of the Gods dataset deployed with Titan can be loaded using Gremlin (see diagram at Getting Started).

gremlin> g = TitanFactory.open('bin/cassandra.local')
==>titangraph[cassandra:127.0.0.1]
gremlin> g.loadGraphML('data/graph-of-the-gods.xml')
==>null
gremlin> g.commit()

In Faunus, a bin/titan-cassandra-input.properties file is provided with the following properties which tell Faunus the location and features of the Titan/Cassandra cluster.

faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraInputFormat
faunus.graph.input.titan.storage.backend=cassandra
faunus.graph.input.titan.storage.hostname=localhost
faunus.graph.input.titan.storage.port=9160
faunus.graph.input.titan.storage.keyspace=titan
cassandra.input.partitioner.class=org.apache.cassandra.dht.RandomPartitioner
# cassandra.input.split.size=512
gremlin> g = FaunusFactory.open('bin/titan-cassandra-input.properties')      
==>faunusgraph[titancassandrainputformat]
gremlin> g.V.count()
13/01/04 12:53:24 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
13/01/04 12:53:24 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.VerticesMap.Map, com.thinkaurelius.faunus.mapreduce.util.CountMapReduce.Map, com.thinkaurelius.faunus.mapreduce.util.CountMapReduce.Reduce]
...
==>12

NOTE: When using Titan/Cassandra as a data source, and if there are vertices with a large number of edges (i.e. a very wide row in Cassandra), an inoculous exception may occur warning that the thrift frame size has been exceeded. While the cassandra.yaml can be updated and the following job properties added cassandra.thrift.framed.size_mb/cassandra.thrift.message.max_size_mb, typically, the easiest way to solve this is to add the following property to the FaunusGraph being worked with: cassandra.input.split.size=512 (see bin/titan-cassandra-input.properties). The value 512 is how many kilobytes to make the input split size and this value can be adjusted higher or lower to ensure performant, non-excepting behavior.

TitanHBaseInputFormat

The Graph of the Gods dataset deployed with Titan can be loaded into Titan/HBase using Gremlin (see diagram at Getting Started).

gremlin> g = TitanFactory.open('bin/hbase.local')
==>titangraph[hbase:127.0.0.1]
gremlin> g.loadGraphML('data/graph-of-the-gods.xml')
==>null
gremlin> g.commit()

In Faunus, a bin/titan-hbase-input.properties file is provided with the following properties. This creates a FaunusGraph that is fed from Titan/HBase. Note, for multi-machines environments, the titan.graph.input.storage.hostname should use the cluster-internal IP address of the machine with Zookeeper even if that machine is in fact localhost.

faunus.graph.input.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseInputFormat
faunus.graph.input.titan.storage.backend=hbase
faunus.graph.input.titan.storage.hostname=localhost
faunus.graph.input.titan.storage.port=2181
faunus.graph.input.titan.storage.tablename=titan
# hbase.mapreduce.scan.cachedrows=1000
gremlin> g = FaunusFactory.open('bin/titan-hbase-input.properties') 
==>faunusgraph[titanhbaseinputformat]
gremlin> g.V.count()
13/01/04 15:40:56 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
13/01/04 15:40:56 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.VerticesMap.Map, com.thinkaurelius.faunus.mapreduce.util.CountMapReduce.Map, com.thinkaurelius.faunus.mapreduce.util.CountMapReduce.Reduce]
...
==>12

Please follow the links below for more information on streaming data out of HBase.

Titan OutputFormat Support

Faunus can be used to bulk load data into Titan. Thus, given a stream of <NullWritable, FaunusVertex> pairs, with a TitanXXXOutputFormat the stream is faithfully written to Titan. For all the examples to follow, it is assumed that data/graph-of-the-gods.json is in HDFS. Finally, note that is is typically a good idea to have the graph (keyspace/table) already initialized (e.g. g = TitanFactory.open(...)) before doing bulk writing as the creation process takes time and a heavy write load during the graph creation process can yield exceptions.

When writing a Faunus graph to Titan, there are two MapReduce phases. The first Map-phase writes all the vertices of the graph and computes the Titan generated ID and maintains a map between the Faunus vertex ID and the Titan vertex ID. The reduce phase distributes that Faunus-Titan ID map to all edge pairs and thus, a read-only distributed hash map is created. In the second MapReduce phase, the Map writes all the edges and there is no reducer.

During the map phases described above, numerous transactions are being committed. The size of the transaction is the size of the full task. The mechanism to control the size of a task for both the mappers and reducers is using mapred.max.split.size. Be sure that the size of the split is not so large that OEMs occur and so small that very little data is being written. Furthermore, some backends prefer larger TX sizes (Cassandra:20-30MB) and others prefer smaller TX sizes (HBase:5-10MB). It is best to play with these TX sizes/split-sizes as performance can be greatly effected and is highly dependent on the backend storage system, the topology of the graph, and the backend configuration (e.g. replication factor, durable writes, etc.). Finally, it is best to have mapred.job.reuse.jvm.num.tasks=-1 (or a large number like 1000) as transactions are typically small and JVM start up time may be costly in the long run.

NOTE: For those importing into Titan using an external index such as Elasticsearch, be sure to provide the appropriate properties referencing the Elasticsearch cluster. The example properties below are provided commented out in the example TitanOutputFormat properties files distributed with Faunus.

faunus.graph.output.titan.storage.index.search.backend=elasticsearch
faunus.graph.output.titan.storage.index.search.hostname=127.0.0.1
faunus.graph.output.titan.storage.index.search.client-only=true

TitanCassandraOutputFormat

faunus.graph.output.format=com.thinkaurelius.faunus.formats.titan.cassandra.TitanCassandraOutputFormat
faunus.graph.output.titan.storage.backend=cassandra
faunus.graph.output.titan.storage.hostname=localhost
faunus.graph.output.titan.storage.port=9160
faunus.graph.output.titan.storage.keyspace=titan
faunus.graph.output.titan.storage.batch-loading=true
# faunus.graph.output.titan.ids.block-size=100000
faunus.graph.output.titan.infer-schema=true

Here are some notes for the above properties.

  • storage.batch-loading: By setting this to true, certain checks in Titan are circumvented which speeds up the writing process.
  • ids.block-size: When this value is small and the clients are writing lots of data, the clients communicates with Titan repeatedly to get new ids and this can cause exceptions to happen as the id system stalls trying to serve all the clients.
  • infer-schema: When a new edge label or property key is provided to Titan, Titan updates its schema metadata. By inferring the schema prior to writing, exceptions can be circumvented.
gremlin> g = FaunusFactory.open('bin/titan-cassandra-output.properties') 
==>faunusgraph[graphsoninputformat]
gremlin> g.V.sideEffect('{it.roman = true}') 
13/01/04 15:44:42 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
13/01/04 15:44:42 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.VerticesMap.Map, com.thinkaurelius.faunus.mapreduce.sideeffect.SideEffectMap.Map, com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce.Map, com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce.Reduce]
...

In the above job, the Graph of the Gods GraphSON file is streamed from HDFS and each vertex has a new property added (roman=true). The output graph is pushed into Titan/Cassandra. Via the Titan/Gremlin console, the graph is viewable.

titan$ bin/gremlin.sh 

         \,,,/
         (o o)
-----oOOo-(_)-oOOo-----
gremlin> g = TitanFactory.open('bin/cassandra.local')
==>titangraph[cassandrathrift:127.0.0.1]
gremlin> g.v(4).map
==>{name=saturn, type=titan, roman=true}
gremlin>

TitanHBaseOutputFormat

faunus.graph.output.format=com.thinkaurelius.faunus.formats.titan.hbase.TitanHBaseOutputFormat
faunus.graph.output.titan.storage.backend=hbase
faunus.graph.output.titan.storage.hostname=localhost
faunus.graph.output.titan.storage.port=2181
faunus.graph.output.titan.storage.tablename=titan
faunus.graph.output.titan.storage.batch-loading=true
# titan.graph.output.ids.block-size=100000
faunus.graph.output.titan.infer-schema=true

NOTE: Please see the TitanCassandraOutputFormat section for information the meaning of these properties.

In Titan/HBase there are two other parameters that should be considered.

faunus.graph.output.titan.ids.num-partitions=5 // typically the number of region servers
faunus.graph.output.titan.ids.partition=true

Because Titan/HBase does not randomly distribute the data around the cluster it is good to tell Titan to generate random partitions of the ID space so that data is written in (as best as possible) a round robin fashion so no single region server is burdened with data writes.
gremlin> g = FaunusFactory.open('bin/titan-hbase-output.properties')    
==>faunusgraph[graphsoninputformat]
gremlin> g.V.sideEffect('{it.roman = true}')                           
13/01/04 15:48:32 INFO mapreduce.FaunusCompiler: Compiled to 1 MapReduce job(s)
13/01/04 15:48:32 INFO mapreduce.FaunusCompiler: Executing job 1 out of 1: MapSequence[com.thinkaurelius.faunus.mapreduce.transform.VerticesMap.Map, com.thinkaurelius.faunus.mapreduce.sideeffect.SideEffectMap.Map, com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce.Map, com.thinkaurelius.faunus.formats.BlueprintsGraphOutputMapReduce.Reduce]
...

Incremental Data Loading using a TitanOutputFormat

Faunus can be used for incremental loading. That is, loading data into a graph that already exists. The Faunus property to be aware of is faunus.graph.output.blueprints.script-file. If that property does not exist, then it is assumed that the underlying graph is empty. If the property does exist, it points to a file in HDFS that is a Gremlin/Groovy script that has either or both of the following two methods:

  • def Vertex getOrCreateVertex(FaunusVertex vertex, Graph graph, Mapper.Context context)
  • def Edge getOrCreateEdge(FaunusEdge faunusEdge, Vertex blueprintsOutVertex, Vertex blueprintsInVertex, Graph graph, Mapper.Context context)

An example getOrCreateVertex() definition is in the code fragment below. The example determines whether a vertex already exists in the graph by a unique key (e.g. a domain-specific unique address space). If the vertex exists according to a global index query, then return it. Else, create a new vertex and return it. This example is distributed with Faunus in data/BlueprintsScript.groovy (e.g. hadoop fs -copyFromLocal data/BlueprintsScript.groovy BlueprintsScript.groovy). Finally, realize that the definition of a “unique vertex” can be an arbitrary algorithm — unique key/value property, a graph traversal, accessing another dataset, etc.

def Vertex getOrCreateVertex(final FaunusVertex faunusVertex, final Graph graph, final Mapper.Context context) {
  String uniqueKey = "name";
  Object uniqueValue = faunusVertex.getProperty(uniqueKey);
  Vertex blueprintsVertex;
  if (null == uniqueValue)
    throw new RuntimeException("The provided Faunus vertex does not have a property for the unique key: " + faunusVertex);

  Iterator<Vertex> itty = graph.query().has(uniqueKey, uniqueValue).vertices().iterator();
  if (itty.hasNext()) {
    blueprintsVertex = itty.next();
    context.getCounter(VERTICES_RETRIEVED).increment(1l);
    if (itty.hasNext())
      LOGGER.error("The unique key is not unique as more than one vertex with the value: " + uniqueValue);
  } else {
    blueprintsVertex = graph.addVertex(faunusVertex.getIdAsLong());
    context.getCounter(VERTICES_WRITTEN).increment(1l);
  }
  // if vertex existed or not, add all the properties of the faunusVertex to the blueprintsVertex
  for (String property : faunusVertex.getPropertyKeys()) {
    blueprintsVertex.setProperty(property, faunusVertex.getProperty(property));
    context.getCounter(VERTEX_PROPERTIES_WRITTEN).increment(1l);
  }
  return blueprintsVertex;
}

An example of a getOrCreateEdge() is provided below that says if an edge already exists between the two vertices with the same edge label return that edge, else create it.

def Edge getOrCreateEdge(final FaunusEdge faunusEdge, final Vertex blueprintsOutVertex, final Vertex blueprintsInVertex, final Graph graph, final Mapper.Context context) {
    final Edge blueprintsEdge = !blueprintsOutVertex.out(faunusEdge.getLabel()).has("id", blueprintsInVertex.getId()).hasNext() ?
        graph.addEdge(null, blueprintsOutVertex, blueprintsInVertex, faunusEdge.getLabel()) :
        blueprintsOutVertex.outE(faunusEdge.getLabel()).as("here").inV().has("id", blueprintsInVertex.getId()).back("here").next();
    context.getCounter(EDGES_WRITTEN).increment(1l);

    // if edge existed or not, add all the properties of the faunusEdge to the blueprintsEdge
    for (final String key : faunusEdge.getPropertyKeys()) {
        blueprintsEdge.setProperty(key, faunusEdge.getProperty(key));
        context.getCounter(EDGE_PROPERTIES_WRITTEN).increment(1l);
    }
    return blueprintsEdge;
}