Skip to content

Commit

Permalink
This patch allows for testing of hive operators and hooks. Sasl is us…
Browse files Browse the repository at this point in the history
…ed (NoSasl in connection string is not possible). Tests have been adjusted.
  • Loading branch information
bolkedebruin committed Mar 18, 2016
1 parent ae8062a commit ba15d3a
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 63 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ install:
before_script:
- mysql -e 'drop database if exists airflow; create database airflow' -u root
- psql -c 'create database airflow;' -U postgres
- export PATH=${PATH}:/tmp/hive/bin
script:
- pip --version
- ls -l $HOME/.wheelhouse
Expand Down
48 changes: 48 additions & 0 deletions airflow/minihivecluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# -*- coding: utf-8 -*-
# Copyright (c) 2016 Bolke de Bruin
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
import os
import subprocess
import select
import re


class MiniHiveCluster(object):
def __init__(self):
self._minicluster_home = os.environ['MINICLUSTER_HOME']
self._minicluster_class = "com.ing.minicluster.MiniCluster"
self._start_mini_cluster()
self._is_started()

def _start_mini_cluster(self):
classpath = os.path.join(self._minicluster_home, "*")
cmd = ["java", "-cp", classpath, self._minicluster_class]

self.hive = subprocess.Popen(cmd, bufsize=0, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)

def terminate(self):
self.hive.terminate()

def _is_started(self):
while self.hive.poll() is None:
rlist, wlist, xlist = select.select([self.hive.stderr, self.hive.stdout], [], [])
for f in rlist:
line = f.readline()
print (line,)
m = re.match(".*Starting ThriftBinaryCLIService", line)
if m:
return True


8 changes: 4 additions & 4 deletions airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ def initdb():
merge_conn(
models.Connection(
conn_id='airflow_db', conn_type='mysql',
host='localhost', login='root', password='',
host='localhost', login='root',
schema='airflow'))
merge_conn(
models.Connection(
conn_id='beeline_default', conn_type='beeline',
host='localhost',
schema='airflow'))
conn_id='beeline_default', conn_type='beeline', port="10000",
host='localhost', extra="{\"use_beeline\": true, \"auth\": \"\"}",
schema='default'))
merge_conn(
models.Connection(
conn_id='bigquery_default', conn_type='bigquery'))
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ PyOpenSSL
PySmbClient
psycopg2
python-dateutil
# pyhs2 -> not compatible with Python 3 because of sasl
pyhs2
redis
requests
setproctitle
Expand Down
23 changes: 23 additions & 0 deletions scripts/ci/setup_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ HADOOP_DISTRO=${HADOOP_DISTRO:-"hdp"}
ONLY_DOWNLOAD=${ONLY_DOWNLOAD:-false}
ONLY_EXTRACT=${ONLY_EXTRACT:-false}

MINICLUSTER_URL=https://github.com/bolkedebruin/minicluster/releases/download/1.1/minicluster-1.1-SNAPSHOT-bin.zip

HIVE_HOME=/tmp/hive

while test $# -gt 0; do
case "$1" in
-h|--help)
Expand Down Expand Up @@ -46,6 +50,7 @@ while test $# -gt 0; do
done

HADOOP_HOME=/tmp/hadoop-${HADOOP_DISTRO}
MINICLUSTER_HOME=/tmp/minicluster

if $ONLY_DOWNLOAD && $ONLY_EXTRACT; then
echo "Both only-download and only-extract specified - abort" >&2
Expand All @@ -54,9 +59,15 @@ fi

mkdir -p ${HADOOP_HOME}
mkdir -p ${TRAVIS_CACHE}/${HADOOP_DISTRO}
mkdir -p ${TRAVIS_CACHE}/minicluster
mkdir -p ${TRAVIS_CACHE}/hive
mkdir -p ${HIVE_HOME}
chmod -R 777 ${HIVE_HOME}
mkdir -p /user/hive/warehouse

if [ $HADOOP_DISTRO = "cdh" ]; then
URL="http://archive.cloudera.com/cdh5/cdh/5/hadoop-latest.tar.gz"
HIVE_URL="http://archive.cloudera.com/cdh5/cdh/5/hive-latest.tar.gz"
elif [ $HADOOP_DISTRO = "hdp" ]; then
URL="http://public-repo-1.hortonworks.com/HDP/centos6/2.x/updates/2.0.6.0/tars/hadoop-2.2.0.2.0.6.0-76.tar.gz"
else
Expand Down Expand Up @@ -86,4 +97,16 @@ if [ $? != 0 ]; then
exit 1
fi

echo "Downloading and unpacking hive"
curl -z ${TRAVIS_CACHE}/hive/hive.tar.gz -o ${TRAVIS_CACHE}/hive/hive.tar.gz -L ${HIVE_URL}
tar zxf ${TRAVIS_CACHE}/hive/hive.tar.gz --strip-components 1 -C ${HIVE_HOME}


echo "Downloading and unpacking minicluster"
rm -rf ${TRAVIS_CACHE}/minicluster/minicluster.zip
curl -z ${TRAVIS_CACHE}/minicluster/minicluster.zip -o ${TRAVIS_CACHE}/minicluster/minicluster.zip -L ${MINICLUSTER_URL}
unzip ${TRAVIS_CACHE}/minicluster/minicluster.zip -d /tmp

echo "Path = ${PATH}"

java -cp "/tmp/minicluster-1.1-SNAPSHOT/*" com.ing.minicluster.MiniCluster &
Loading

0 comments on commit ba15d3a

Please sign in to comment.