Java 1.8
Maven 3
Docker (The Docker host machine should have 16G memory or more)
Postman for verification
Download MarkLogic Server x64 (AMD64, Intel EM64T) 64-bit Linux RPM and save it into marklogic
folder.
In marklogic/Dockerfile
line 24, make sure the rpm file name is same as the downloaded file, currently it is MarkLogic-9.0-6.x86_64.rpm.
Build kafka sinker:
cd kafka-sinker
mvn clean package -Dmaven.test.skip=true
Build docker containers:
cd ..
docker-compose build
Run:
docker-compose up -d
# Wait about 2 minutes
# Then check the Tez session of "hive.local" is initialized
docker exec hive.local cat /tmp/root/hive.log | grep TezSession
When you see something like following about Tez session created, the containers are all started:
2018-07-02T06:45:41,950 INFO [main] tez.TezSessionPoolManager: Created new tez session for queue: default with session id: 704d121b-70a3-4ddc-a024-fdb7fcf6e3e1
2018-07-02T06:45:45,217 INFO [main] tez.TezSessionState: User of session id 704d121b-70a3-4ddc-a024-fdb7fcf6e3e1 is root
2018-07-02T06:45:47,102 INFO [main] tez.TezSessionState: Opening new Tez Session (id: 704d121b-70a3-4ddc-a024-fdb7fcf6e3e1, scratch dir: hdfs://hive.local:9000/tmp/hive/root/_tez_session_dir/704d121b-70a3-4ddc-a024-fdb7fcf6e3e1)
2018-07-02T06:45:48,791 INFO [main] tez.TezSessionState: Prewarming 1 containers (id: 704d121b-70a3-4ddc-a024-fdb7fcf6e3e1, scratch dir: hdfs://hive.local:9000/tmp/hive/root/_tez_session_dir/704d121b-70a3-4ddc-a024-fdb7fcf6e3e1)
2018-07-02T06:45:56,684 INFO [main] client.TezClient: Submitting dag to TezSession, sessionName=HIVE-704d121b-70a3-4ddc-a024-fdb7fcf6e3e1, applicationId=application_1530513907233_0001, dagName=TezPreWarmDAG_0
2018-07-02T06:45:56,941 INFO [main] client.TezClient: Submitted dag to TezSession, sessionName=HIVE-704d121b-70a3-4ddc-a024-fdb7fcf6e3e1, applicationId=application_1530513907233_0001, dagId=dag_1530513907233_0001_1, dagName=TezPreWarmDAG_0
Config is in ./marklogic/config.sh
, you can config following:
- USER/PASS: The admin username/password, which will be automatically created
- DB_NAME: The name of database, which will be automatically created
- BOOTSTRAP_HOST: Bootstrap host in cluster
- JOIN_HOSTS: Hosts to join to cluster
- TIER1_HOSTS/TIER2_HOSTS/TIER3_HOSTS: You can config the hosts the tier to be stored in
- HDFS_PATH: The HDFS path for tier3 storage
- MANAGE_API: Manage api path
- SEARCH_API: Search api path
- admin_api: Admin api path
- LICENSE: The marklogic license
Run:
./marklogic/setup.sh
# The setup takes about 1 minute. It initializes the MarkLogic Server, creates a new database and SQL views.
You will see output like following, note the "curl: (52) Empty reply from server" message is expected as the server is initializing:
Bootstrap host: ml9node1.local
Use http://localhost:8001/admin/v1 to admin ml9node1.local
Initializing host: ml9node1.local...
curl: (52) Empty reply from server
Successfully initialized ml9node1.local
Configuring ml9node1.local security...
curl: (52) Empty reply from server
Successfully configured ml9node1.local security
Bootstrap host done: ml9node1.local
Use http://localhost:8002/manage/v2 to manage database
Creating database: tiers-poc...
Successfully created database: tiers-poc
Creating tier1 partition on hosts: [ "ml9node1.local" ] ...
Successfully created tier1 partition
Successfully created SQL views
Setup done
Run:
# Create Hive tables
docker exec hive.local beeline -u jdbc:hive2://hive.local:10000/default -f /tmp/tables.sql
# Verify instrument/position/transaction tables created
docker exec hive.local beeline -u jdbc:hive2://hive.local:10000/default -e "show tables;"
Download sample-data.zip from https://drive.google.com/open?id=1jYBqIDXN8QT4_C-wg4_X62fNXUE4RfBg
Unzip it to sample-data
folder.
# With project dir, unzip <path-to-downloaded-sample-data.zip>, eg:
unzip ~/Downloads/sample-data.zip
# Logstash will then parse the sample CSV files and insert into kafka.
# The load will take about 5 minutes.
To verify the CSV files are loaded, go to http://localhost:9600/_node/stats/pipelines?pretty , when you see 1430312 events output to "kafka" (Like this: http://take.ms/4iEUn), then this load step is done.
TIP: after this step, the logstash container is useless now, you can stop it to save cpu/memory resources:
docker stop logstash.local
Run:
docker exec kafka.local /opt/kafka_2.11-0.10.1.0/bin/connect-standalone.sh /config/kafka-connect-standalone.properties /config/kafka-sink.properties
# The connector will consume kafka records and insert into MarkLogic/Hive
# The load will take about 10 minutes.
When you see following log which saying the position
topic has offset 51441, the instrument
topic has offset 138872, the transaction
topics has offset 1239999, then the load is finished:
[2018-07-02 11:00:51,449] INFO Flush - Topic position, Partition 0, Offset 51441, Metadata (kafka.connect.marklogic.sink.MarkLogicSinkTask:112)
[2018-07-02 11:00:51,449] INFO Flush - Topic instrument, Partition 0, Offset 138872, Metadata (kafka.connect.marklogic.sink.MarkLogicSinkTask:112)
[2018-07-02 11:00:51,449] INFO Flush - Topic transaction, Partition 0, Offset 1239999, Metadata (kafka.connect.marklogic.sink.MarkLogicSinkTask:112)
TIP: after this step, the kafka container is useless now, you can stop it to save cpu/memory resources:
docker stop kafka.local
Run:
cd query-service
mvn compile exec:java
The query service is started at 8080 port.
Query performance:
# Query all transaction
time curl -X GET "http://localhost:8080/query?sql=select%20%2A%20from%20transaction&format=json" > /dev/null
# Query all position
time curl -X GET "http://localhost:8080/query?sql=select%20%2A%20from%20position&format=json" > /dev/null
# Query all instrument
time curl -X GET "http://localhost:8080/query?sql=select%20%2A%20from%20instrument&format=json" > /dev/null
# SELECT * FROM POSITION WHERE BusinessDate > '2017-01-01' AND BusinessDate < '2018-12-31'
time curl -X GET "http://localhost:8080/query?sql=SELECT%20*%20FROM%20POSITION%20WHERE%20BusinessDate%20%3E%20'2017-01-01'%20AND%20BusinessDate%20%3C%20'2018-12-31'" > /dev/null
# SELECT * FROM TRANSACTION WHERE TradeDate > '2017-01-01' AND TradeDate < '2018-12-31'
time curl -X GET "http://localhost:8080/query?sql=SELECT%20*%20FROM%20TRANSACTION%20WHERE%20TradeDate%20%3E%20'2017-01-01'%20AND%20TradeDate%20%3C%20'2018-12-31'" > /dev/null
# SELECT * FROM POSITION INNER JOIN INSTRUMENT ON POSITION.InstrumentId=INSTRUMENT.InstrumentId WHERE INSTRUMENT.AddedDate > '2001-01-01' AND INSTRUMENT.AddedDate < '2018-12-31'
time curl -X GET "http://localhost:8080/query?sql=SELECT%20*%20FROM%20POSITION%20INNER%20JOIN%20INSTRUMENT%20ON%20POSITION.InstrumentId%3DINSTRUMENT.InstrumentId%20WHERE%20INSTRUMENT.AddedDate%20%3E%20'2001-01-01'%20AND%20INSTRUMENT.AddedDate%20%3C%20'2018-12-31'" > /dev/null
Import test/tiers-poc.postman_collection.json
and test/tiers-poc.postman_environment.json
into Postman, you can do following:
- Count/Search
transaction
records - Count/Search
instrument
records - Count/Search
position
records