Java 1.8

Maven 3

Docker (The Docker host machine should have 16G memory or more)

Postman for verification

Download MarkLogic

Download MarkLogic Server x64 (AMD64, Intel EM64T) 64-bit Linux RPM and save it into marklogic folder.

In marklogic/Dockerfile line 24, make sure the rpm file name is same as the downloaded file, currently it is MarkLogic-9.0-6.x86_64.rpm.


Build kafka sinker:

cd kafka-sinker
mvn clean package -Dmaven.test.skip=true

Build docker containers:

cd ..
docker-compose build

Start Docker Containers


docker-compose up -d

# Wait about 2 minutes
# Then check the Tez session of "hive.local" is initialized
docker exec hive.local cat /tmp/root/hive.log | grep TezSession

When you see something like following about Tez session created, the containers are all started:

2018-07-02T06:45:41,950  INFO [main] tez.TezSessionPoolManager: Created new tez session for queue: default with session id: 704d121b-70a3-4ddc-a024-fdb7fcf6e3e1
2018-07-02T06:45:45,217  INFO [main] tez.TezSessionState: User of session id 704d121b-70a3-4ddc-a024-fdb7fcf6e3e1 is root
2018-07-02T06:45:47,102  INFO [main] tez.TezSessionState: Opening new Tez Session (id: 704d121b-70a3-4ddc-a024-fdb7fcf6e3e1, scratch dir: hdfs://hive.local:9000/tmp/hive/root/_tez_session_dir/704d121b-70a3-4ddc-a024-fdb7fcf6e3e1)
2018-07-02T06:45:48,791  INFO [main] tez.TezSessionState: Prewarming 1 containers  (id: 704d121b-70a3-4ddc-a024-fdb7fcf6e3e1, scratch dir: hdfs://hive.local:9000/tmp/hive/root/_tez_session_dir/704d121b-70a3-4ddc-a024-fdb7fcf6e3e1)
2018-07-02T06:45:56,684  INFO [main] client.TezClient: Submitting dag to TezSession, sessionName=HIVE-704d121b-70a3-4ddc-a024-fdb7fcf6e3e1, applicationId=application_1530513907233_0001, dagName=TezPreWarmDAG_0
2018-07-02T06:45:56,941  INFO [main] client.TezClient: Submitted dag to TezSession, sessionName=HIVE-704d121b-70a3-4ddc-a024-fdb7fcf6e3e1, applicationId=application_1530513907233_0001, dagId=dag_1530513907233_0001_1, dagName=TezPreWarmDAG_0

Setup MarkLogic

Config is in ./marklogic/, you can config following:

  • USER/PASS: The admin username/password, which will be automatically created
  • DB_NAME: The name of database, which will be automatically created
  • BOOTSTRAP_HOST: Bootstrap host in cluster
  • JOIN_HOSTS: Hosts to join to cluster
  • TIER1_HOSTS/TIER2_HOSTS/TIER3_HOSTS: You can config the hosts the tier to be stored in
  • HDFS_PATH: The HDFS path for tier3 storage
  • MANAGE_API: Manage api path
  • SEARCH_API: Search api path
  • admin_api: Admin api path
  • LICENSE: The marklogic license



# The setup takes about 1 minute. It initializes the MarkLogic Server, creates a new database and SQL views.

You will see output like following, note the "curl: (52) Empty reply from server" message is expected as the server is initializing:

Bootstrap host: ml9node1.local
Use http://localhost:8001/admin/v1 to admin ml9node1.local
Initializing host: ml9node1.local...
curl: (52) Empty reply from server
Successfully initialized ml9node1.local
Configuring ml9node1.local security...
curl: (52) Empty reply from server
Successfully configured ml9node1.local security
Bootstrap host done: ml9node1.local
Use http://localhost:8002/manage/v2 to manage database
Creating database: tiers-poc...
Successfully created database: tiers-poc
Creating tier1 partition on hosts: [ "ml9node1.local" ] ...
Successfully created tier1 partition
Successfully created SQL views
Setup done

Create Hive Tables


# Create Hive tables
docker exec hive.local beeline -u jdbc:hive2://hive.local:10000/default -f /tmp/tables.sql

# Verify instrument/position/transaction tables created
docker exec hive.local beeline -u jdbc:hive2://hive.local:10000/default -e "show tables;"

Load CSV Records into Kafka

Download from

Unzip it to sample-data folder.

# With project dir, unzip <>, eg:
unzip ~/Downloads/

# Logstash will then parse the sample CSV files and insert into kafka.
# The load will take about 5 minutes.

To verify the CSV files are loaded, go to http://localhost:9600/_node/stats/pipelines?pretty , when you see 1430312 events output to "kafka" (Like this:, then this load step is done.

TIP: after this step, the logstash container is useless now, you can stop it to save cpu/memory resources:

docker stop logstash.local

Load Kafka Records into MarkLogic/Hive


docker exec kafka.local /opt/kafka_2.11- /config/ /config/

# The connector will consume kafka records and insert into MarkLogic/Hive
# The load will take about 10 minutes.

When you see following log which saying the position topic has offset 51441, the instrument topic has offset 138872, the transaction topics has offset 1239999, then the load is finished:

[2018-07-02 11:00:51,449] INFO Flush - Topic position, Partition 0, Offset 51441, Metadata  (kafka.connect.marklogic.sink.MarkLogicSinkTask:112)
[2018-07-02 11:00:51,449] INFO Flush - Topic instrument, Partition 0, Offset 138872, Metadata  (kafka.connect.marklogic.sink.MarkLogicSinkTask:112)
[2018-07-02 11:00:51,449] INFO Flush - Topic transaction, Partition 0, Offset 1239999, Metadata  (kafka.connect.marklogic.sink.MarkLogicSinkTask:112)

TIP: after this step, the kafka container is useless now, you can stop it to save cpu/memory resources:

docker stop kafka.local

Start Query Service


cd query-service
mvn compile exec:java

The query service is started at 8080 port.


Query performance:

# Query all transaction
time curl -X GET "http://localhost:8080/query?sql=select%20%2A%20from%20transaction&format=json" > /dev/null

# Query all position
time curl -X GET "http://localhost:8080/query?sql=select%20%2A%20from%20position&format=json" > /dev/null

# Query all instrument
time curl -X GET "http://localhost:8080/query?sql=select%20%2A%20from%20instrument&format=json" > /dev/null

# SELECT * FROM POSITION WHERE BusinessDate > '2017-01-01' AND BusinessDate < '2018-12-31'
time curl -X GET "http://localhost:8080/query?sql=SELECT%20*%20FROM%20POSITION%20WHERE%20BusinessDate%20%3E%20'2017-01-01'%20AND%20BusinessDate%20%3C%20'2018-12-31'" > /dev/null

# SELECT * FROM TRANSACTION WHERE TradeDate > '2017-01-01' AND TradeDate < '2018-12-31'
time curl -X GET "http://localhost:8080/query?sql=SELECT%20*%20FROM%20TRANSACTION%20WHERE%20TradeDate%20%3E%20'2017-01-01'%20AND%20TradeDate%20%3C%20'2018-12-31'" > /dev/null

time curl -X GET "http://localhost:8080/query?sql=SELECT%20*%20FROM%20POSITION%20INNER%20JOIN%20INSTRUMENT%20ON%20POSITION.InstrumentId%3DINSTRUMENT.InstrumentId%20WHERE%20INSTRUMENT.AddedDate%20%3E%20'2001-01-01'%20AND%20INSTRUMENT.AddedDate%20%3C%20'2018-12-31'" > /dev/null

Import test/tiers-poc.postman_collection.json and test/tiers-poc.postman_environment.json into Postman, you can do following:

  • Count/Search transaction records
  • Count/Search instrument records
  • Count/Search position records


