Skip to content

Commit

Permalink
replace output.image with image for using connect without self building
Browse files Browse the repository at this point in the history
  • Loading branch information
mabulgu committed Jul 8, 2021
1 parent 67c04d0 commit b61bf0b
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 62 deletions.
2 changes: 1 addition & 1 deletion examples/5_connect/connect.properties
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ status.storage.replication.factor=1
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=connectors

output.image=quay.io/systemcraftsman/demo-connect-cluster:latest
image=quay.io/systemcraftsman/demo-connect-cluster:latest
plugin.url=https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz
2 changes: 1 addition & 1 deletion examples/5_connect/connect_v2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ status.storage.replication.factor=1
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=connectors

output.image=quay.io/systemcraftsman/demo-connect-cluster:latest
image=quay.io/systemcraftsman/demo-connect-cluster:latest
plugin.url=https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz,https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-elasticsearch-rest-kafka-connector/0.10.0/camel-elasticsearch-rest-kafka-connector-0.10.0-package.tar.gz
6 changes: 3 additions & 3 deletions examples/5_connect/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,12 @@ By comparing to the original repository, you can see in the `connectors` folder
The url that you set above has the same resources archived.
Strimzi extracts them while building the Connect image in the Kubernetes/OpenShift cluster.

Speaking of the image, we have to set a `output.image`, actually a image repository path, that Strimzi can push the built image into.
Speaking of the image, we have to set a `image`, actually an image repository path, that Strimzi can push the built image into.
This can be either an internal registry of yours, or a public one like Docker Hub or Quay.
In this example we will use Quay and we should set the image URL like the following:

```properties
output.image=quay.io/systemcraftsman/demo-connect-cluster:latest
image=quay.io/systemcraftsman/demo-connect-cluster:latest
```

Here you can set the repository URL of your choice instead of `quay.io/systemcraftsman/demo-connect-cluster:latest`.
Expand Down Expand Up @@ -227,7 +227,7 @@ status.storage.replication.factor=1

...output omitted...

output.image=quay.io/systemcraftsman/demo-connect-cluster:latest
image=quay.io/systemcraftsman/demo-connect-cluster:latest
plugin.url=https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz
```

Expand Down
89 changes: 50 additions & 39 deletions kfk/commands/connect/clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from kfk.commands.connect import connectors

CONNECT_SKIPPED_PROPERTIES = (
SpecialTexts.CONNECT_BOOTSTRAP_SERVERS, SpecialTexts.CONNECT_OUTPUT_IMAGE, SpecialTexts.CONNECT_PLUGIN_URL,
SpecialTexts.CONNECT_BOOTSTRAP_SERVERS, SpecialTexts.CONNECT_IMAGE, SpecialTexts.CONNECT_PLUGIN_URL,
SpecialTexts.CONNECT_PLUGIN_PATH)


Expand Down Expand Up @@ -73,30 +73,33 @@ def create(cluster, replicas, registry_username, registry_password, config_file,

connect_properties = get_properties_from_file(config_file)

del cluster_dict["spec"]["tls"]
cluster_dict["spec"].pop("tls")

cluster_dict["spec"]["bootstrapServers"] = connect_properties.get(
SpecialTexts.CONNECT_BOOTSTRAP_SERVERS).data

cluster_dict["spec"]["build"] = {}
cluster_dict["spec"]["build"]["output"] = {}
cluster_dict["spec"]["build"]["output"]["type"] = CONNECT_OUTPUT_TYPE_DOCKER
cluster_dict["spec"]["build"]["output"]["image"] = connect_properties.get(
SpecialTexts.CONNECT_OUTPUT_IMAGE).data
cluster_dict["spec"]["build"]["output"]["pushSecret"] = f"{cluster}-push-secret"
cluster_dict["spec"]["build"]["plugins"] = []
if connect_properties.get(SpecialTexts.CONNECT_PLUGIN_URL) is None:
cluster_dict["spec"]["image"] = connect_properties.get(
SpecialTexts.CONNECT_IMAGE).data
else:
cluster_dict["spec"]["build"] = {}
cluster_dict["spec"]["build"]["output"] = {}
cluster_dict["spec"]["build"]["output"]["type"] = CONNECT_OUTPUT_TYPE_DOCKER
cluster_dict["spec"]["build"]["output"]["image"] = connect_properties.get(SpecialTexts.CONNECT_IMAGE).data
cluster_dict["spec"]["build"]["output"]["pushSecret"] = f"{cluster}-push-secret"
cluster_dict["spec"]["build"]["plugins"] = []

for i, plugin_url in enumerate(
get_list_by_split_string(connect_properties.get(SpecialTexts.CONNECT_PLUGIN_URL).data, COMMA),
start=1):
for i, plugin_url in enumerate(
get_list_by_split_string(connect_properties.get(SpecialTexts.CONNECT_PLUGIN_URL).data, COMMA),
start=1):

if not is_valid_url(plugin_url):
raise click.ClickException(Errors.NOT_A_VALID_URL + f": {plugin_url}")
if not is_valid_url(plugin_url):
raise click.ClickException(Errors.NOT_A_VALID_URL + f": {plugin_url}")

plugin_dict = {"name": f"connector-{i}",
"artifacts": [{"type": _get_plugin_type(plugin_url), "url": plugin_url}]}
plugin_dict = {"name": f"connector-{i}",
"artifacts": [{"type": _get_plugin_type(plugin_url), "url": plugin_url}]}

cluster_dict["spec"]["build"]["plugins"].append(plugin_dict)
cluster_dict["spec"]["build"]["plugins"].append(plugin_dict)

cluster_dict["spec"]["config"] = {}

Expand All @@ -113,17 +116,20 @@ def create(cluster, replicas, registry_username, registry_password, config_file,
is_confirmed = click.confirm(Messages.CLUSTER_CREATE_CONFIRMATION)

if is_confirmed:
username = registry_username if registry_username is not None else click.prompt(
Messages.IMAGE_REGISTRY_USER_NAME, hide_input=False)
password = registry_password if registry_password is not None else click.prompt(
Messages.IMAGE_REGISTRY_PASSWORD, hide_input=True)

return_code = os.system(
Kubectl().create().secret("docker-registry", f"{cluster}-push-secret",
"--docker-username={username}", "--docker-password={password}",
"--docker-server={server}").namespace(
namespace).build().format(username=username, password=password, server=connect_properties.get(
SpecialTexts.CONNECT_OUTPUT_IMAGE).data))
return_code = 0

if connect_properties.get(SpecialTexts.CONNECT_PLUGIN_URL) is not None:
username = registry_username if registry_username is not None else click.prompt(
Messages.IMAGE_REGISTRY_USER_NAME, hide_input=False)
password = registry_password if registry_password is not None else click.prompt(
Messages.IMAGE_REGISTRY_PASSWORD, hide_input=True)

return_code = os.system(
Kubectl().create().secret("docker-registry", f"{cluster}-push-secret",
"--docker-username={username}", "--docker-password={password}",
"--docker-server={server}").namespace(
namespace).build().format(username=username, password=password, server=connect_properties.get(
SpecialTexts.CONNECT_IMAGE).data))

if return_code == 0:
return_code = os.system(
Expand Down Expand Up @@ -172,22 +178,27 @@ def alter(cluster, replicas, config_file, namespace):
cluster_dict["spec"]["bootstrapServers"] = connect_properties.get(
SpecialTexts.CONNECT_BOOTSTRAP_SERVERS).data

cluster_dict["spec"]["build"]["output"]["image"] = connect_properties.get(
SpecialTexts.CONNECT_OUTPUT_IMAGE).data
if connect_properties.get(SpecialTexts.CONNECT_PLUGIN_URL) is None:
cluster_dict["spec"].pop("build", None)
cluster_dict["spec"]["image"] = connect_properties.get(SpecialTexts.CONNECT_IMAGE).data
else:
cluster_dict["spec"].pop("image", None)
cluster_dict["spec"]["build"]["output"]["image"] = connect_properties.get(
SpecialTexts.CONNECT_IMAGE).data

cluster_dict["spec"]["build"]["plugins"] = []
cluster_dict["spec"]["build"]["plugins"] = []

for i, plugin_url in enumerate(
get_list_by_split_string(connect_properties.get(SpecialTexts.CONNECT_PLUGIN_URL).data, COMMA),
start=1):
for i, plugin_url in enumerate(
get_list_by_split_string(connect_properties.get(SpecialTexts.CONNECT_PLUGIN_URL).data, COMMA),
start=1):

if not is_valid_url(plugin_url):
raise click.ClickException(Errors.NOT_A_VALID_URL + f": {plugin_url}")
if not is_valid_url(plugin_url):
raise click.ClickException(Errors.NOT_A_VALID_URL + f": {plugin_url}")

plugin_dict = {"name": f"connector-{i}",
"artifacts": [{"type": _get_plugin_type(plugin_url), "url": plugin_url}]}
plugin_dict = {"name": f"connector-{i}",
"artifacts": [{"type": _get_plugin_type(plugin_url), "url": plugin_url}]}

cluster_dict["spec"]["build"]["plugins"].append(plugin_dict)
cluster_dict["spec"]["build"]["plugins"].append(plugin_dict)

cluster_dict["spec"]["config"] = {}

Expand Down
4 changes: 2 additions & 2 deletions kfk/commands/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def create(user, authentication_type, quota, cluster, namespace):
user_dict["metadata"]["labels"]["strimzi.io/cluster"] = cluster

user_dict["spec"]["authentication"]["type"] = authentication_type
del user_dict["spec"]["authorization"]
user_dict["spec"].pop("authorization")

if len(quota) > 0:
if user_dict["spec"].get("quotas") is None:
Expand Down Expand Up @@ -125,7 +125,7 @@ def alter(user, authentication_type, authorization_type, add_acl, delete_acl, op
if authorization_type != "none":
user_dict["spec"]["authorization"]["type"] = authorization_type
else:
del user_dict["spec"]["authorization"]
user_dict["spec"].pop("authorization")

if add_acl:
if user_dict["spec"].get("authorization") is not None:
Expand Down
5 changes: 2 additions & 3 deletions kfk/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ def delete_resource_config(config, dict_part, *converters):
for config_str in config:
for converter in converters:
config_str = converter(config_str)
if config_str in dict_part:
del dict_part[config_str]
dict_part.pop(config_str, None)
else:
del dict_part[config]
dict_part.pop(config, None)


def resource_exists(resource_type=None, resource_name=None, cluster=None, namespace=None):
Expand Down
2 changes: 1 addition & 1 deletion kfk/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
class SpecialTexts:
BROKER_CONFIG_FILE_USER_CONFIG_HEADER = "\n##########\n# User provided configuration\n##########\n"
CONNECT_BOOTSTRAP_SERVERS = "bootstrap.servers"
CONNECT_OUTPUT_IMAGE = "output.image"
CONNECT_IMAGE = "image"
CONNECT_PLUGIN_URL = "plugin.url"
CONNECT_PLUGIN_PATH = "plugin.path"
CONNECTOR_NAME = "name"
Expand Down
2 changes: 1 addition & 1 deletion tests/files/connect.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

output.image=quay.io/systemcraftsman/test-connect-cluster:latest
image=quay.io/systemcraftsman/test-connect-cluster:latest
plugin.url=https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz,https://github.com/jcustenborder/kafka-connect-twitter/releases/download/0.2.26/kafka-connect-twitter-0.2.26.tar.gz
2 changes: 1 addition & 1 deletion tests/files/connect_with_invalid_url.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

output.image=quay.io/systemcraftsman/test-connect-cluster:latest
image=quay.io/systemcraftsman/test-connect-cluster:latest
plugin.url=https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.3.1.Final/debezium-connector-postgres-1.3.1.Final-plugin.tar.gz,notavalidurl
13 changes: 13 additions & 0 deletions tests/files/connect_with_only_image.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
bootstrap.servers=my-cluster-kafka-bootstrap:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-cluster-offsets
config.storage.topic=connect-cluster-configs
status.storage.topic=connect-cluster-status
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

image=quay.io/systemcraftsman/test-connect-cluster:latest
2 changes: 1 addition & 1 deletion tests/files/connect_with_zip_jar_plugins.properties
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

output.image=quay.io/systemcraftsman/test-connect-cluster:latest
image=quay.io/systemcraftsman/test-connect-cluster:latest
plugin.url=https://test.com/file.zip,https://test.com/file.jar
22 changes: 22 additions & 0 deletions tests/files/yaml/kafka-connect_with_image.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
annotations:
strimzi.io/use-connector-resources: 'true'
name: my-connect-cluster
spec:
bootstrapServers: my-cluster-kafka-bootstrap:9092
config:
config.storage.replication.factor: 1
config.storage.topic: connect-cluster-configs
key.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
offset.storage.replication.factor: 1
offset.storage.topic: connect-cluster-offsets
status.storage.replication.factor: 1
status.storage.topic: connect-cluster-status
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
image: quay.io/systemcraftsman/test-connect-cluster:latest
replicas: 1
version: 2.8.0
Loading

0 comments on commit b61bf0b

Please sign in to comment.