diff --git a/.travis.yml b/.travis.yml index 757f99e..52667e7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -7,6 +7,7 @@ sudo: required language: python python: - 2.7 +cache: pip branches: only: @@ -14,8 +15,9 @@ branches: - master env: - - ANSIBLE_INSTALL_VERSION=2.5.9 - - ANSIBLE_INSTALL_VERSION=2.6.6 + - ANSIBLE_INSTALL_VERSION=2.4.6.0 + - ANSIBLE_INSTALL_VERSION=2.5.11 + - ANSIBLE_INSTALL_VERSION=2.6.7 - ANSIBLE_INSTALL_VERSION=2.7.1 services: @@ -26,7 +28,7 @@ before_install: - sudo apt-get install -o Dpkg::Options::="--force-confold" --force-yes -y docker-ce install: - - make deps + - make .venv script: - make test diff --git a/.version b/.version index c0ec837..d9fb9c7 100644 --- a/.version +++ b/.version @@ -1 +1 @@ -v2.0 +v3.0 diff --git a/Makefile b/Makefile index 246f380..09e360c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -ANSIBLE_INSTALL_VERSION ?= 2.5.9 +ANSIBLE_INSTALL_VERSION ?= 2.6.7 PATH := $(PWD)/.venv_ansible$(ANSIBLE_INSTALL_VERSION)/bin:$(shell printenv PATH) SHELL := env PATH=$(PATH) /bin/bash @@ -6,19 +6,12 @@ SHELL := env PATH=$(PATH) /bin/bash .PHONY: all clean destroy help test -_check_venv: - @if [ ! -e .venv_ansible$(ANSIBLE_INSTALL_VERSION)/bin/activate ]; then \ - echo -e "\033[0;31mERROR: No virtualenv found - run 'make deps' first\033[0m"; \ - false; \ - fi - - ## Make deps, test all: deps test ## Activate the virtualenv -activate: _check_venv +activate: .venv_ansible$(ANSIBLE_INSTALL_VERSION) @echo -e "\033[0;32mINFO: Activating venv_ansible$(ANSIBLE_INSTALL_VERSION) (ctrl-d to exit)\033[0m" @exec $(SHELL) --init-file .venv_ansible$(ANSIBLE_INSTALL_VERSION)/bin/activate @@ -42,18 +35,24 @@ destroy: echo -e "\033[0;33mWARNING: molecule not found - either remove potential containers manually or run 'make deps' first\033[0m"; \ fi + ## Login to docker container named '%' -login_%: _check_venv +login_%: .venv_ansible$(ANSIBLE_INSTALL_VERSION) @echo -e "\033[0;32mINFO: Logging into $(subst login_,,$@) (ctrl-d to exit)\033[0m" @.venv_ansible$(ANSIBLE_INSTALL_VERSION)/bin/molecule login --host $(subst login_,,$@) + ## Run 'molecule test --destroy=never' (run 'make destroy' to destroy containers) -test: _check_venv +test: .venv_ansible$(ANSIBLE_INSTALL_VERSION) @.venv_ansible$(ANSIBLE_INSTALL_VERSION)/bin/molecule test --destroy=never +# shortcut for creating virtualenv +.venv: .venv_ansible$(ANSIBLE_INSTALL_VERSION) + + ## Create virtualenv, install dependencies -deps: +.venv_ansible$(ANSIBLE_INSTALL_VERSION): @if (python -V 2>&1 | grep -qv "Python 2.7"); then \ echo -e "\033[0;31mERROR: Only Python 2.7 is supported at this stage\033[0m"; \ false; \ @@ -66,7 +65,7 @@ deps: ## Run 'make test' on any file change -watch: _check_venv +watch: .venv_ansible$(ANSIBLE_INSTALL_VERSION) @while sleep 1; do \ find defaults/ files/ handlers/ meta/ molecule/*/*.yml molecule/*/test/*.py tasks/ templates/ vars/ 2> /dev/null \ | entr -d make test; \ diff --git a/defaults/main.yml b/defaults/main.yml index dbd3269..7818f2a 100644 --- a/defaults/main.yml +++ b/defaults/main.yml @@ -1,53 +1,26 @@ --- sansible_kafka_apache_mirror: https://archive.apache.org/dist/ -sansible_kafka_auto_create_topics: "false" sansible_kafka_aws_cluster_assigned_id_enabled: no sansible_kafka_aws_cluster_assigned_id_tag_name: instanceindex -# If enabled AWS will be used to figure out which host and id should be used -# Note that you must install the AWS CLI tools to use this feature -sansible_kafka_aws_cluster_autodiscover_enabled: no -sansible_kafka_aws_cluster_autodiscover_hosts: [] -sansible_kafka_aws_cluster_autodiscover_id_tag_name: KafkaID -sansible_kafka_aws_cluster_autodiscover_lookup_filter: "Name=tag:Environment,Values=dev Name=tag:Role,Values=kafka" -sansible_kafka_aws_cluster_autodiscover_r53_zone_id: ~ -sansible_kafka_aws_delay: 5 -sansible_kafka_aws_retries: 3 sansible_kafka_conf_dir: /home/kafka/etc -sansible_kafka_controlled_shutdown_enable: true -sansible_kafka_controlled_shutdown_max_retries: 3 -sansible_kafka_controlled_shutdown_retry_backoff_ms: 2000 -sansible_kafka_data_dir: /home/kafka/data -sansible_kafka_default_replication_factor: 1 sansible_kafka_group: kafka sansible_kafka_heap_opts: "-Xmx{{ (ansible_memtotal_mb / 2) | int }}m -Xms{{ (ansible_memtotal_mb / 2) | int }}m" -sansible_kafka_id: 1 sansible_kafka_interface_advertise: ~ sansible_kafka_interface_bind: ~ sansible_kafka_java_vendor: openjdk sansible_kafka_java_version: 8 sansible_kafka_jmx_port: 9999 -sansible_kafka_log_cleanup_interval_mins: 1 sansible_kafka_log_dir: /home/kafka/log -sansible_kafka_log_flush_interval_messages: 10000 -sansible_kafka_log_flush_interval_ms: 1000 sansible_kafka_log_level: WARN -sansible_kafka_log_retention_bytes: 104857600 # 100 M -sansible_kafka_log_retention_hours: 24 -sansible_kafka_log_segment_bytes: 104857600 sansible_kafka_max_open_files: 4098 -sansible_kafka_num_io_threads: 2 -sansible_kafka_num_network_threads: 2 -sansible_kafka_num_partitions: 2 sansible_kafka_port: 9092 -sansible_kafka_socket_receive_buffer_bytes: 1048576 -sansible_kafka_socket_request_max_bytes: 104857600 -sansible_kafka_socket_send_buffer_bytes: 1048576 +sansible_kafka_server_properties: {} sansible_kafka_tarball_location: /home/kafka/tmp sansible_kafka_user: kafka -sansible_kafka_version_kafka: 0.10.1.1 +sansible_kafka_version_kafka: 2.0.0 sansible_kafka_version_scala: 2.11 sansible_kafka_wait_for_kafka_port: 120 sansible_kafka_zookeeper_connection_timeout_ms: 1000000 sansible_kafka_zookeeper_hosts: - - localhost + - localhost:2181 diff --git a/files/aws_cluster_autodiscover b/files/aws_cluster_autodiscover deleted file mode 100755 index 69ff552..0000000 --- a/files/aws_cluster_autodiscover +++ /dev/null @@ -1,136 +0,0 @@ -#!/bin/bash - -set -o nounset - -# Gaffa tape script that uses AWS tags to pick an available ID/Index from a list of hostnames -# This ID can be used to assign then pick the hostname and machine ID for services that require such a setting -# such as Kafka or Zookeeper. -# -# NOTE In the future we should replace this with Consul. -# -# Usage: -# aws_cluster_autodiscover [Max number of machines in cluster] [AWS hostedzone ID] "[AWS ec2 describe-instances filter]" [Name of tag used to store id] -# eg. -# ./bin/aws_cluster_autodiscover "01.zookeeper.app.internal,02.zookeeper.app.internal,03.zookeeper.app.internal" "Z8LHX0FIHPY9F" "Name=tag:Environment,Values=dev Name=tag:Role,Values=zookeeper" "ZookeeperID" -# -# Returns: -# Json array detaling the claimed ID and hostname -# eg. -# {"id": "1", "id_index0": "0", "hostname": "01.zookeeper.app.internal", "info": "Machine had no ID, claimed ID and R53 entry" } - -function die { - echo -e "\033[1;31mError: $1\033[0m" - exit 1 -} - -function output { - local -i CLAIMED_ID=$1 - local CLAIMED_HOST=$2 - local INFO=$3 - local -i CLAIMED_ID_INDEX0=$((CLAIMED_ID - 1 )) - echo "{\"id\": \"${CLAIMED_ID}\", \"id_index0\": \"${CLAIMED_ID_INDEX0}\", \"hostname\": \"${CLAIMED_HOST}\", \"info\": \"${INFO}\" }" - exit 0 -} - -function main { - # Arguments - if [ "$#" -ne 4 ]; then - die "This script takes 4 parameters, only $# given" - fi - local HOSTS=(${1//,/ }) - local HOSTED_ZONE_ID=$2 - local LOOKUP_FILTER=$3 - local ID_TAG_NAME=$4 - - # Initialise vars before usage since local always returns 0 - local MAX_HOSTS=${#HOSTS[@]} - local CLAIMED_ID="" - local CLAIMED_HOST="" - local INSTANCE_ID="" - local INSTANCE_IP="" - local INSTANCE_REGION="" - local AVAILABLE_IDS=( ) - local CLAIMED_IDS=( ) - local UNCLAIMED_IDS=( ) - local ID_CHECK="" - - # Hacky random sleep in an attempt to avoid id clashes in a cluster - sleep $[ ( $RANDOM % 60 ) + 1 ]s - - INSTANCE_ID=$( curl -f -s http://169.254.169.254/latest/meta-data/instance-id/ ) - INSTANCE_IP=$( curl -f -s http://169.254.169.254/latest/meta-data/local-ipv4/ ) - INSTANCE_REGION=$( curl -f -s http://169.254.169.254/latest/meta-data/placement/availability-zone | sed -e '$s/.$//' ) - if [[ -z ${INSTANCE_ID} || -z ${INSTANCE_IP} || -z ${INSTANCE_REGION} ]]; then - die "Could not grab instance ID or IP, are you running outside of EC2?" - fi - - # Check to see if this machine has an ID tag already - CLAIMED_ID=$( aws ec2 describe-tags \ - --region ${INSTANCE_REGION} \ - --filter "Name=resource-id,Values=${INSTANCE_ID}" \ - --output text "Name=key,Values=${ID_TAG_NAME}" \ - --query "Tags[*].Value" ) - if [[ -n ${CLAIMED_ID} ]]; then - output "${CLAIMED_ID}" "${HOSTS[$CLAIMED_ID - 1]}" "Machine already has an ID" - fi - - # Figure out which ID and host to claim - AVAILABLE_IDS=( $(seq 1 "${MAX_HOSTS}") ) - CLAIMED_IDS=( $( aws ec2 describe-instances \ - --region ${INSTANCE_REGION} \ - --filters ${LOOKUP_FILTER} Name=instance-state-name,Values=running \ - --output text \ - --query "Reservations[*].Instances[*].Tags[*] | [] | [] | [?Key=='${ID_TAG_NAME}'] | [*].Value" ) ) - - if [[ "${#CLAIMED_IDS[@]}" -gt 0 ]]; then - # Use comm to diff the ID arrays - UNCLAIMED_IDS=( $( comm -13 \ - <(for X in "${CLAIMED_IDS[@]}"; do echo "${X}"; done | sort) \ - <(for X in "${AVAILABLE_IDS[@]}"; do echo "${X}"; done | sort) ) ) - - if [ ${#UNCLAIMED_IDS[@]} -eq 0 ]; then - # No free IDs, throw an error - die "No free IDs found, aborting" - fi - - # Claim the first free ID - CLAIMED_ID="${UNCLAIMED_IDS[0]}" - else - # No IDs have been claimed, grab the first one - CLAIMED_ID="1" - fi - - # Double check that ID is still free, if not fail so caller can retry if desired - ID_CHECK=$( aws ec2 describe-instances \ - --region ${INSTANCE_REGION} \ - --filters ${LOOKUP_FILTER} \ - Name=instance-state-name,Values=running \ - Name=tag:${ID_TAG_NAME},Values=${CLAIMED_ID} \ - --output text \ - --query "Reservations[*].Instances[*].InstanceId" ) - if [[ -n "${ID_CHECK}" ]]; then - die "Attempted to claim ID but it was taken during discovery" - fi - - # Claim the ID - aws ec2 create-tags \ - --region ${INSTANCE_REGION} \ - --resources ${INSTANCE_ID} \ - --tags Key=${ID_TAG_NAME},Value=${CLAIMED_ID} \ - > /dev/null 2>&1 \ - || die "Failed to update Tag entry for with ID ${CLAIMED_ID}" - - CLAIMED_HOST="${HOSTS[$CLAIMED_ID - 1]}" - - # Claim the hostname - aws route53 change-resource-record-sets \ - --region ${INSTANCE_REGION} \ - --hosted-zone-id ${HOSTED_ZONE_ID} \ - --change-batch "{ \"Changes\": [ { \"Action\": \"UPSERT\", \"ResourceRecordSet\": { \"Name\": \"${CLAIMED_HOST}\", \"Type\": \"A\", \"TTL\": 300, \"ResourceRecords\": [ { \"Value\": \"${INSTANCE_IP}\" } ] } } ] }" \ - > /dev/null 2>&1 \ - || die "Failed to update R53 entry for ${CLAIMED_HOST}" - - output "${CLAIMED_ID}" "${CLAIMED_HOST}" "Machine had no ID, claimed ID and R53 entry" -} - -main "$@" diff --git a/files/kafka_maintenance_at_start b/files/kafka_maintenance_at_start deleted file mode 100644 index bc973c9..0000000 --- a/files/kafka_maintenance_at_start +++ /dev/null @@ -1,117 +0,0 @@ -#!/bin/bash - -CO="__consumer_offsets" -PC="PartitionCount" -RF="ReplicationFactor" -KAFKA="/home/kafka" -BIN="$KAFKA/kafka/bin" -LOG="$KAFKA/log/shutdown.log" -SYNC_STATUS="$KAFKA/etc/syncStatus" -TMP_DIR="/tmp/kafka" -ZK_URL="zookeeper.app.internal" -ZK_PORT="2181" -BROKERS_ALL_FILE="brokers_all_file" -TOPICS_FILE="topics_file" -broker_id="cat $KAFKA/etc/brokerId" # This node - -prepare() { - printf "Preparation " - if [ ! -d $TMP_DIR ]; then mkdir $TMP_DIR && printf "Done\n"; fi -} - -first_check() { - printf "First check " - brokers_all=$(echo "ls /brokers/ids"|$BIN/zookeeper-shell.sh $ZK_URL:$ZK_PORT|tail -1| sed 's/,//g'| sed 's/\[//'| sed 's/\]//' 2>/dev/null > $TMP_DIR/$BROKERS_ALL_FILE) - topics_description=$($BIN/kafka-topics.sh --describe --zookeeper $ZK_URL:$ZK_PORT |grep -v $CO 2>/dev/null > $TMP_DIR/$TOPICS_FILE) - if [[ -f $TMP_DIR/$BROKERS_ALL_FILE ]] && [[ -f $TMP_DIR/$TOPICS_FILE ]]; then - if ! [[ -s $TMP_DIR/$TOPICS_FILE ]]; then - printf "Nothing to replicate. Exit\n" - echo "READY" > $SYNC_STATUS - exit - fi - read -a brokers <<< $(cat $TMP_DIR/$BROKERS_ALL_FILE) - printf "Done\n" - else - echo "Files don't exist" - sleep 1s - first_check - fi -} - -collect_info() { - printf "Collect info " - numbers='[0-9]' - if ! [[ $(cat $TMP_DIR/$BROKERS_ALL_FILE) =~ $numbers ]] ; then - echo "error: Not a number" - echo "Is Zookeeper available? Checking again..." - sleep 2s - else - printf "Done\n" - echo "Available brokers: ${brokers[@]}" - fi -} - -prepare_json() { - topics=$(grep $PC $TMP_DIR/$TOPICS_FILE|awk '{print $1}' |cut -f2 -d ":") - printf "Topics:" - for t in $topics; do - topic_partitions=$(grep $t $TMP_DIR/$TOPICS_FILE| head -1| grep $PC| awk '{print $2}'| cut -f2 -d ":") - replication_factor=$(grep $t $TMP_DIR/$TOPICS_FILE| head -1|grep $RF| awk '{print $3}'| cut -f2 -d ":") - printf "\n" - printf "$t" # topic - printf "\t\tTopic partitions: $topic_partitions\n" - printf "\t\t\tReplication factor: $replication_factor\n" - echo "{\"version\":1,\"partitions\":[" > $TMP_DIR/$t - # Create replicas - declare -a matrix - num_columns=$topic_partitions - num_rows=$replication_factor - index=0 - f1="%$((${#num_rows}+1))s " - f2="%1s" - for (( i=1; i<=num_columns; i++ )) do - partition_num=$(($i-1)) - printf "{\"topic\":\"$t\",\"partition\":"$partition_num",\"replicas\":[" >> $TMP_DIR/$t - printf "$f1" #$i - for (( j=1; j<=num_rows; j++ )) do - lenght=${#brokers[@]} - if [[ "$index" -ge $lenght ]]; then - ((index*=0)) - fi - matrix[$i,$j]=${brokers[$index]} - ((index++)) - printf "$f2," ${matrix[$i,$j]} >> $TMP_DIR/$t - if [[ "$j" -ge "$num_rows" ]]; then - sed -i '$s/,$//' $TMP_DIR/$t - fi - done - printf "]},\n" >> $TMP_DIR/$t - done - sed -i '$s/,$/\]\}/' $TMP_DIR/$t - done - printf "\n" - printf "Prepare Json file Done\n" -} - -assign_partitions () { - printf "Assign partitions " - for t in ${topics}; do - $BIN/kafka-reassign-partitions.sh --zookeeper $ZK_URL:$ZK_PORT --reassignment-json-file $TMP_DIR/$t --execute >> $LOG - echo "READY" > $SYNC_STATUS - done - printf "Done\n" -} - -clean () { - rm -rf $TMP_DIR && printf "Temp dir removed\n" -} - -main () { - prepare - first_check - collect_info - prepare_json - assign_partitions -} - -main diff --git a/files/kafka_maintenance_at_stop b/files/kafka_maintenance_at_stop deleted file mode 100755 index c10e067..0000000 --- a/files/kafka_maintenance_at_stop +++ /dev/null @@ -1,122 +0,0 @@ -#!/bin/bash - -CO="__consumer_offsets" -PC="PartitionCount" -RF="ReplicationFactor" -KAFKA="/home/kafka" -SYNC_STATUS="$KAFKA/etc/syncStatus" -BIN="$KAFKA/kafka/bin" -LOG="$KAFKA/log/shutdown.log" -TMP_DIR="/tmp/kafka" -ZK_URL="zookeeper.app.internal" -ZK_PORT="2181" -BROKERS_ALL_FILE="brokers_all_file" -TOPICS_FILE="topics_file" -broker_id=$(cat $KAFKA/etc/brokerId) # This node - -prepare() { - printf "Preparation " - if [ ! -d $TMP_DIR ]; then mkdir $TMP_DIR && printf "Done\n"; fi -} - -first_check() { - printf "First check " - if [ ! -e $SYNC_STATUS ]; then - echo "Kafka $broker_id doesn't have data. EXIT" - exit - fi - if ! grep -q "READY" $SYNC_STATUS ; then - echo "Kafka $broker_id is not READY. EXIT" - exit - fi - printf "Done\n" -} - -collect_info() { - printf "Collect info " - brokers_all=$(echo "ls /brokers/ids"|$BIN/zookeeper-shell.sh $ZK_URL:$ZK_PORT|tail -1| sed 's/,//g'| sed 's/\[//'| sed 's/\]//' > $TMP_DIR/$BROKERS_ALL_FILE) - brokers_available=$(cat $TMP_DIR/$BROKERS_ALL_FILE|tail -1| sed 's/,//g'| sed 's/\[//'| sed 's/\]//'| sed -e "s/$broker_id//") - topics_description=$($BIN/kafka-topics.sh --describe --zookeeper $ZK_URL:$ZK_PORT |grep -v $CO > $TMP_DIR/$TOPICS_FILE) - topics=$(grep $PC $TMP_DIR/$TOPICS_FILE|awk '{print $1}' |cut -f2 -d ":") - if [ ! -e $TMP_DIR/$BROKERS_ALL_FILE ] ; then - sleep 1s - collect_info - fi - numbers='[0-9]' - if ! [[ $(cat $TMP_DIR/$BROKERS_ALL_FILE) =~ $numbers ]] ; then - echo "error: Not a number" - echo "Is Zookeeper available? Checking again..." - sleep 2s - collect_info - else - printf "Done\n" - echo "All brokers: $(cat $TMP_DIR/$BROKERS_ALL_FILE)" - read -a brokers <<< $brokers_available - echo "Available brokers: ${brokers[@]}" - fi - } - -prepare_json () { - topics=$(grep $PC $TMP_DIR/$TOPICS_FILE|awk '{print $1}' |cut -f2 -d ":") - printf "Topics:" - for t in $topics; do - topic_partitions=$(grep $t $TMP_DIR/$TOPICS_FILE| head -1| grep $PC| awk '{print $2}'| cut -f2 -d ":") - replication_factor=$(grep $t $TMP_DIR/$TOPICS_FILE| head -1|grep $RF| awk '{print $3}'| cut -f2 -d ":") - printf "\n" - printf "$t" # topic - printf "\t\tTopic partitions: $topic_partitions\n" - printf "\t\t\tReplication factor: $replication_factor\n" - echo "{\"version\":1,\"partitions\":[" > $TMP_DIR/$t - # Create replicas - declare -a matrix - num_columns=$topic_partitions - num_rows=$replication_factor - index=0 - f1="%$((${#num_rows}+1))s " - f2="%1s" - for (( i=1; i<=num_columns; i++ )) do - partition_num=$(($i-1)) - printf "{\"topic\":\"$t\",\"partition\":"$partition_num",\"replicas\":[" >> $TMP_DIR/$t - printf "$f1" - for (( j=1; j<=num_rows; j++ )) do - lenght=${#brokers[@]} - if [[ "$index" -ge $lenght ]]; then - ((index*=0)) - fi - matrix[$i,$j]=${brokers[$index]} - ((index++)) - printf "$f2," ${matrix[$i,$j]} >> $TMP_DIR/$t - if [[ "$j" -ge "$num_rows" ]]; then - sed -i '$s/,$//' $TMP_DIR/$t - fi - done - printf "]},\n" >> $TMP_DIR/$t - done - sed -i '$s/,$/\]\}/' $TMP_DIR/$t - done - printf "\n" - printf "Prepare Json file Done\n" -} - -reassign_partitions() { - printf "Re-assign partitions " - for t in ${topics}; do - $BIN/kafka-reassign-partitions.sh --zookeeper $ZK_URL:$ZK_PORT --reassignment-json-file $TMP_DIR/$t --execute >> $LOG - done - printf "Done\n" -} - -clean() { - rm -rf $TMP_DIR && printf "Temp dir removed\n" -} - -main() { - prepare - first_check - collect_info - prepare_json - reassign_partitions - clean -} - -main diff --git a/files/remove_dns_record b/files/remove_dns_record deleted file mode 100644 index 9d5c46a..0000000 --- a/files/remove_dns_record +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash - -TMP_DIR=/tmp -INSTANCE_DETAILS_FILE="$TMP_DIR/instance_details_file" - -remove_dns_record() { - instance_details=$(curl -s http://169.254.169.254/latest/dynamic/instance-identity/document -o $INSTANCE_DETAILS_FILE) - hosted_zone=$(cat /home/kafka/etc/hostedZone) - hostname=$(cat /home/kafka/etc/hostname) - broker_id=$(cat /home/kafka/etc/brokerId) - - readarray -t aws_values < <(grep '"' $INSTANCE_DETAILS_FILE | cut -d '"' -f4) - - echo ${aws_values[@]} - echo "Hosted Zone" ${hosted_zone} - echo "Hostname" ${hostname} - echo "Region" ${aws_values[4]} - - /usr/local/bin/aws route53 change-resource-record-sets \ - --region ${aws_values[4]} \ - --hosted-zone-id ${hosted_zone} \ - --change-batch "{ \"Changes\": [ { \"Action\": \"DELETE\", \"ResourceRecordSet\": { \"Name\": \"${hostname}\", \"Type\": \"A\", \"TTL\": 300, \"ResourceRecords\": [ { \"Value\": \"${aws_values[1]}\" } ] } } ] }" -} - -main() { - remove_dns_record -} - -main diff --git a/files/stop_kafka b/files/stop_kafka deleted file mode 100644 index 6f38df0..0000000 --- a/files/stop_kafka +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash - -KAFKA="/home/kafka" -BIN="$KAFKA/bin" - -PIDS=$(ps ax | grep -i 'kafka\.Kafka' | grep java | grep -v grep | awk '{print $1}') - -stop_kafka() { - if [ -z "$PIDS" ]; then - echo "No kafka server to stop" - exit 1 - else - echo "Prepare Kafka to stop" - /bin/bash $BIN/kafka_maintenance_at_stop && /bin/bash $BIN/remove_dns_record - echo "Kafka maintenance Done" - kill -s TERM $PIDS - fi -} - -main() { - stop_kafka -} - -main diff --git a/meta/main.yml b/meta/main.yml index 41330cf..1d5e04e 100644 --- a/meta/main.yml +++ b/meta/main.yml @@ -3,7 +3,9 @@ galaxy_info: description: "Install Kafka server." license: MIT - min_ansible_version: 2.2 + # SAnsible only supports Ansible 2.4+ + min_ansible_version: 2.4 + min_ansible_container_version: 2.4 platforms: - name: Ubuntu versions: diff --git a/molecule/default/molecule.yml b/molecule/default/molecule.yml index 4f439d4..095a11e 100644 --- a/molecule/default/molecule.yml +++ b/molecule/default/molecule.yml @@ -15,6 +15,23 @@ provisioner: name: ansible lint: name: ansible-lint + inventory: + host_vars: + # currenlty there is no support for netplan, and no interfaces facts + sansible_kafka-xenial: + sansible_kafka_server_properties: + listeners: "PLAINTEXT://127.0.0.1:9092" + broker.id: 11 + sansible_kafka-bionic: + sansible_kafka_server_properties: + listeners: "PLAINTEXT://127.0.0.1:9092" + broker.id: 11 + # trusty is using good old ifconfig + sansible_kafka-trusty: + sansible_kafka_server_properties: + broker.id: 11 + sansible_kafka_interface_bind: lo + sansible_kafka_interface_advertise: lo lint: name: yamllint diff --git a/molecule/default/playbook.yml b/molecule/default/playbook.yml index abc6710..72f34a8 100644 --- a/molecule/default/playbook.yml +++ b/molecule/default/playbook.yml @@ -3,6 +3,8 @@ - name: Converge hosts: all + # Take a look at hosts vars in molecule.yml as well + roles: - role: sansible.zookeeper diff --git a/molecule/default/tests/test_default.py b/molecule/default/tests/test_default.py index a6e412a..8f10a70 100644 --- a/molecule/default/tests/test_default.py +++ b/molecule/default/tests/test_default.py @@ -1,5 +1,4 @@ import os - import testinfra.utils.ansible_runner testinfra_hosts = testinfra.utils.ansible_runner.AnsibleRunner( @@ -13,5 +12,18 @@ def test_users(host): def test_listening(host): assert host.socket('tcp://0.0.0.0:2181').is_listening - assert host.socket('tcp://0.0.0.0:9092').is_listening + assert host.socket('tcp://127.0.0.1:9092').is_listening assert host.socket('tcp://0.0.0.0:9999').is_listening + + +def test_server_properties(host): + server_properties = host.file( + '/home/kafka/etc/server.properties' + ).content_string + + assert 'listeners=PLAINTEXT://127.0.0.1:9092' \ + in server_properties + assert 'broker.id=11' \ + in server_properties + assert 'zookeeper.connect=' \ + in server_properties diff --git a/requirements.txt b/requirements.txt index 4bd21a7..b77d35e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ -ansible==2.5.9 docker-py==1.10.6 molecule==2.19.0 +# there is a bug with cache in latest 3.10.0 +pytest==3.9.3 diff --git a/tasks/configure.yml b/tasks/configure.yml index 7f348e2..5868542 100644 --- a/tasks/configure.yml +++ b/tasks/configure.yml @@ -10,67 +10,6 @@ - restart kafka when: ansible_virtualization_type != 'docker' -- name: Copy AWS autodiscover script - become: yes - become_user: "{{ sansible_kafka_user }}" - copy: - src: aws_cluster_autodiscover - dest: "/home/{{ sansible_kafka_user }}/bin/aws_cluster_autodiscover" - owner: "{{ sansible_kafka_user }}" - group: "{{ sansible_kafka_group }}" - mode: 0750 - when: sansible_kafka_aws_cluster_autodiscover_enabled - -- name: Run AWS autodiscover script and grab cluster settings - become: yes - become_user: "{{ sansible_kafka_user }}" - command: "./aws_cluster_autodiscover {{ sansible_kafka_aws_cluster_autodiscover_hosts | join(',') }} {{ sansible_kafka_aws_cluster_autodiscover_r53_zone_id }} \"{{ sansible_kafka_aws_cluster_autodiscover_lookup_filter }}\" {{ sansible_kafka_aws_cluster_autodiscover_id_tag_name }}" - args: - chdir: "/home/{{ sansible_kafka_user }}/bin" - register: aws_cluster_autodiscover - until: aws_cluster_autodiscover | success - retries: 4 - delay: 10 - when: sansible_kafka_aws_cluster_autodiscover_enabled - -# Combine is used with set fact as successive calls to the module -# seem to remove previous values added to hashes -- name: Set cluster facts based on AWS autodiscover script output - set_fact: - kafka: "{{ kafka | combine( { - 'aws_cluster_autodiscover': { - 'data': aws_cluster_autodiscover.stdout | from_json - } - }, recursive=True) }}" - when: sansible_kafka_aws_cluster_autodiscover_enabled - -- name: Export DNS hostname - become: yes - become_user: "{{ sansible_kafka_user }}" - lineinfile: - create: yes - dest: "/home/{{ sansible_kafka_user }}/etc/hostname" - line: "{{ sansible_kafka_aws_cluster_autodiscover_data_hostname }}" - when: sansible_kafka_aws_cluster_autodiscover_enabled - -- name: Export hosted zone ID from aws_cluster_autodiscover settings - become: yes - become_user: "{{ sansible_kafka_user }}" - lineinfile: - create: yes - dest: "/home/{{ sansible_kafka_user }}/etc/hostedZone" - line: "{{ sansible_kafka_aws_cluster_autodiscover_r53_zone_id }}" - when: sansible_kafka_aws_cluster_autodiscover_enabled - -- name: Export broker ID to local filesystem - become: yes - become_user: "{{ sansible_kafka_user }}" - lineinfile: - create: yes - dest: "/home/{{ sansible_kafka_user }}/etc/brokerId" - line: "{{ sansible_kafka_aws_cluster_autodiscover_data_hostname }}" - when: sansible_kafka_aws_cluster_autodiscover_enabled - - name: Assigned ID EC2 fact hunt action: ec2_instance_metadata register: ec2_vars @@ -93,24 +32,36 @@ - name: Assigned ID set Broker ID set_fact: - sansible_kafka_aws_cluster_assigned_id: "{{ assigned_id_instance_tags.tags[sansible_kafka_aws_cluster_assigned_id_tag_name] }}" + sansible_kafka_server_properties: "{{ {} | combine(sansible_kafka_server_properties, { + 'broker.id': assigned_id_instance_tags.tags[ + sansible_kafka_aws_cluster_assigned_id_tag_name + ] + }) }}" when: ansible_ec2_instance_id is defined and sansible_kafka_aws_cluster_assigned_id_enabled - name: Grab local facts for looking up interfaces - action: setup + setup: {} register: local_facts when: sansible_kafka_interface_bind is not none or sansible_kafka_interface_advertise is not none - name: Set listen_address if interface bind set set_fact: - sansible_kafka_listen_address: "{{ local_facts.ansible_facts['ansible_' + sansible_kafka_interface_bind]['ipv4']['address'] }}" + sansible_kafka_server_properties: "{{ {} | combine(sansible_kafka_server_properties, { + 'listeners': 'PLAINTEXT://' + local_facts.ansible_facts[ + 'ansible_' + sansible_kafka_interface_bind + ]['ipv4']['address'] + ':' + sansible_kafka_port|string + }) }}" when: sansible_kafka_interface_bind is not none - name: Set advertised_host_name if interface advertise set set_fact: - sansible_kafka_advertised_host_name: "{{ local_facts.ansible_facts['ansible_' + sansible_kafka_interface_advertise]['ipv4']['address'] }}" + sansible_kafka_server_properties: "{{ {} | combine(sansible_kafka_server_properties, { + 'advertised.listeners': 'PLAINTEXT://' + local_facts.ansible_facts[ + 'ansible_' + sansible_kafka_interface_advertise + ]['ipv4']['address'] + ':' + sansible_kafka_port|string + }) }}" when: sansible_kafka_interface_advertise is not none - name: Setup environment config @@ -140,6 +91,11 @@ dest: "/home/{{ sansible_kafka_user }}/etc/server.properties" mode: 0640 src: server.properties.j2 + vars: + server_properties: "{{ {} | combine( + sansible_kafka_server_properties_defaults, + sansible_kafka_server_properties + ) }}" notify: - restart kafka @@ -158,43 +114,3 @@ port: "{{ sansible_kafka_port }}" state: started timeout: "{{ sansible_kafka_wait_for_kafka_port }}" - -- name: Copy Kafka maintenance scripts - become: yes - become_user: "{{ sansible_kafka_user }}" - copy: - src: "{{ item }}" - mode: 0750 - dest: "/home/{{ sansible_kafka_user }}/bin/{{ item }}" - with_items: - - kafka_maintenance_at_stop - - kafka_maintenance_at_start - - remove_dns_record - when: sansible_kafka_aws_cluster_autodiscover_enabled - -- name: Run Kafka maintenance script at start - become: yes - become_user: "{{ sansible_kafka_user }}" - command: "/home/{{ sansible_kafka_user }}/bin/kafka_maintenance_at_start" - when: sansible_kafka_aws_cluster_autodiscover_enabled - -- name: Copy Kafka stop script - become: yes - copy: - dest: "/etc/init.d" - mode: 0755 - src: "stop_kafka" - when: sansible_kafka_aws_cluster_autodiscover_enabled - -- name: Add Kafka stop script to system runlevels - become: yes - file: - src: "/etc/init.d/stop_kafka" - dest: "/etc/{{ item }}/K10kafka-controlled-shutdown" - state: link - force: yes - with_items: - - "rc0.d" - - "rc1.d" - - "rc6.d" - when: sansible_kafka_aws_cluster_autodiscover_enabled diff --git a/tasks/main.yml b/tasks/main.yml index e3d0801..0fad20e 100644 --- a/tasks/main.yml +++ b/tasks/main.yml @@ -5,7 +5,7 @@ tags: - build -- name: Install kafka +- name: Configure kafka include: configure.yml tags: - configure diff --git a/templates/server.properties.j2 b/templates/server.properties.j2 index 886c694..0715c19 100644 --- a/templates/server.properties.j2 +++ b/templates/server.properties.j2 @@ -1,115 +1,6 @@ ## # {{ ansible_managed }} -############################# Server Basics ############################# - -# The id of the broker. This must be set to a unique integer for each broker. -# If left off one is auto-assigned -{% if sansible_kafka_aws_cluster_assigned_id is defined %} -broker.id={{ sansible_kafka_aws_cluster_assigned_id }} -{% else %} -broker.id={{ sansible_kafka_id }} -{% endif %} - -# Graceful shutdown. True to enable -controlled.shutdown.enable={{ sansible_kafka_controlled_shutdown_enable }} -controlled.shutdown.max.retries={{ sansible_kafka_controlled_shutdown_max_retries }} -controlled.shutdown.retry.backoff.ms={{ sansible_kafka_controlled_shutdown_retry_backoff_ms }} - -############################# Socket Server Settings ############################# - -# Hostname the broker will bind to. If not set, the server will bind to all interfaces -{% if sansible_kafka_listen_address is defined %} -listeners=PLAINTEXT://{{ sansible_kafka_listen_address }}:{{ sansible_kafka_port }} -{% else %} -listeners=PLAINTEXT://0.0.0.0:{{ sansible_kafka_port }} -{% endif %} - -{% if sansible_kafka_aws_cluster_autodiscover_data is defined %} -advertised.listeners=PLAINTEXT://{{ sansible_kafka_aws_cluster_autodiscover_data_hostname }}:{{ sansible_kafka_port }} -{% elif sansible_kafka_advertised_host_name is defined %} -advertised.listeners=PLAINTEXT://{{ sansible_kafka_advertised_host_name }}:{{ sansible_kafka_port }} -{% endif %} - -{# The number of threads handling network requests #} -num.network.threads={{ sansible_kafka_num_network_threads }} - -# The number of threads doing disk I/O -num.io.threads={{ sansible_kafka_num_io_threads }} - -# The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer.bytes={{ sansible_kafka_socket_send_buffer_bytes }} - -# The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer.bytes={{ sansible_kafka_socket_receive_buffer_bytes }} - -# The maximum size of a request that the socket server will accept (protection against OOM) -socket.request.max.bytes={{ sansible_kafka_socket_request_max_bytes }} - - -############################# Log Basics ############################# - -# A comma seperated list of directories under which to store log files -log.dirs={{ sansible_kafka_data_dir }} - -auto.create.topics.enable={{ sansible_kafka_auto_create_topics }} -# The number of logical partitions per topic per server. More partitions allow greater parallelism -# for consumption, but also mean more files. -num.partitions={{ sansible_kafka_num_partitions }} - -# Set the default replication factor, set to this to the number of kafka brokers in your cluster -default.replication.factor={{ sansible_kafka_default_replication_factor }} - -############################# Log Flush Policy ############################# - -# Messages are immediately written to the filesystem but by default we only fsync() to sync -# the OS cache lazily. The following configurations control the flush of data to disk. -# There are a few important trade-offs here: -# 1. Durability: Unflushed data may be lost if you are not using replication. -# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. -# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. -# The settings below allow one to configure the flush policy to flush data after a period of time or -# every N messages (or both). This can be done globally and overridden on a per-topic basis. - -# The number of messages to accept before forcing a flush of data to disk -log.flush.interval.messages={{ sansible_kafka_log_flush_interval_messages }} - -# The maximum amount of time a message can sit in a log before we force a flush -log.flush.interval.ms={{ sansible_kafka_log_flush_interval_ms }} - -############################# Log Retention Policy ############################# - -# The following configurations control the disposal of log segments. The policy can -# be set to delete segments after a period of time, or after a given size has accumulated. -# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens -# from the end of the log. - -# The minimum age of a log file to be eligible for deletion -log.retention.hours={{ sansible_kafka_log_retention_hours }} - -# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.bytes. -log.retention.bytes={{ sansible_kafka_log_retention_bytes }} - -# The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes={{ sansible_kafka_log_segment_bytes }} - -# The interval at which log segments are checked to see if they can be deleted according -# to the retention policies -log.retention.check.interval.ms=60000 - -# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. -# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. -log.cleaner.enable=false - -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. -zookeeper.connect={{ sansible_kafka_zookeeper_hosts | join(',') }} - -# Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms={{ sansible_kafka_zookeeper_connection_timeout_ms }} +{% for key, value in server_properties.iteritems() %} +{{ key }}={{ value }} +{% endfor %} diff --git a/vars/main.yml b/vars/main.yml new file mode 100644 index 0000000..45d814a --- /dev/null +++ b/vars/main.yml @@ -0,0 +1,18 @@ +--- + +# Visit https://kafka.apache.org/documentation/#brokerconfigs +# for all possible parameters +sansible_kafka_server_properties_defaults: + broker.id: 1 + listeners: "PLAINTEXT://0.0.0.0:{{ sansible_kafka_port }}" + advertised.listeners: "PLAINTEXT://127.0.0.1:{{ sansible_kafka_port }}" + + # A comma seperated list of directories under which to store log files + log.dirs: /home/kafka/data + + # Zookeeper connection string (see zookeeper docs for details). + # This is a comma separated host:port pairs, each corresponding to a zk + # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". + # You can also append an optional chroot string to the urls to specify the + # root directory for all kafka znodes. + zookeeper.connect: "{{ sansible_kafka_zookeeper_hosts | join(',') }}"