Usage
If you are not installing the operator using Helm then after installation the CRD for this operator must be created:
kubectl apply -f /etc/stackable/kafka-operator/crd/kafkacluster.crd.yaml
To create an Apache Kafka (v3.2.0) cluster named simple-kafka
assuming that you already have a Zookeeper cluster named simple-zk
:
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: simple-kafka-znode
spec:
clusterRef:
name: simple-zk
namespace: default
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
version: 3.2.0-stackable0.1.0
zookeeperConfigMapName: simple-kafka-znode
brokers:
roleGroups:
default:
replicas: 1
If you wish to include integration with Open Policy Agent and already have an OPA cluster, then you can include an opa
field pointing to the OPA cluster discovery ConfigMap
and the required package. The package is optional and will default to the metadata.name
field:
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
version: 3.2.0-stackable0.1.0
zookeeperConfigMapName: simple-kafka-znode
opa:
configMapName: simple-opa
package: kafka
brokers:
roleGroups:
default:
replicas: 1
You can change some opa cache properties by overriding:
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
version: 3.2.0-stackable0.1.0
zookeeperConfigMapName: simple-kafka-znode
opa:
configMapName: simple-opa
package: kafka
brokers:
configOverrides:
server.properties:
opa.authorizer.cache.initial.capacity: "100"
opa.authorizer.cache.maximum.size: "100"
opa.authorizer.cache.expire.after.seconds: "10"
roleGroups:
default:
replicas: 1
A full list of settings and their respective defaults can be found here.
Monitoring
The managed Kafka instances are automatically configured to export Prometheus metrics. See Monitoring for more details.
Provide log4j.properties
Per default, the log4j.properties
from the kafka package is used. However, you can provide your own log4j.properties
via the custom resource:
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
version: 3.2.0-stackable0.1.0
zookeeperConfigMapName: simple-kafka-znode
log4j: |-
log4j.rootLogger=INFO, stdout, kafkaAppender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
brokers:
roleGroups:
default:
replicas: 3
Encryption
The internal and client communication can be encrypted TLS. This requires the Secret Operator to be present in order to provide certificates. The utilized certificates can be changed in a top-level config.
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
version: 3.2.0-stackable0.1.0
zookeeperConfigMapName: simple-kafka-znode
config:
tls:
secretClass: tls (1)
internalTls:
secretClass: kafka-internal-tls (2)
brokers:
roleGroups:
default:
replicas: 3
1 | The tls.secretClass refers to the client-to-server encryption. Defaults to the tls secret. It can be deactivated by setting config.tls to null . |
2 | The internalTls.secretClass refers to the broker-to-broker internal encryption. This must be explicitly set or defaults to tls . Can be disabled by setting config.internalTls to null . |
The tls
secret is deployed from the Secret Operator and looks like this:
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: tls
spec:
backend:
autoTls:
ca:
secret:
name: secret-provisioner-tls-ca
namespace: default
autoGenerate: true
You can create your own secrets and reference them e.g. in the tls.secretClass
or internalTls.secretClass
to use different certificates.
Authentication
The internal or broker-to-broker communication is authenticated via TLS. In order to enforce TLS authentication for client-to-server communication, you can set an AuthenticationClass
reference in the custom resource provided by the Commons Operator.
---
apiVersion: authentication.stackable.tech/v1alpha1
kind: AuthenticationClass
metadata:
name: kafka-client-tls (2)
spec:
provider:
tls:
clientCertSecretClass: kafka-client-auth-secret (3)
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: kafka-client-auth-secret (4)
spec:
backend:
autoTls:
ca:
secret:
name: secret-provisioner-tls-kafka-client-ca
namespace: default
autoGenerate: true
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
version: 3.2.0-stackable0.1.0
zookeeperConfigMapName: simple-kafka-znode
config:
tls:
secretClass: tls
clientAuthentication:
authenticationClass: kafka-client-tls (1)
brokers:
roleGroups:
default:
replicas: 3
1 | The config.clientAuthentication.authenticationClass can be set to use TLS for authentication. This is optional. |
2 | The referenced AuthenticationClass that references a SecretClass to provide certificates. |
3 | The reference to a SecretClass . |
4 | The SecretClass that is referenced by the AuthenticationClass in order to provide certificates. |
Configuration & Environment Overrides
The cluster definition also supports overriding configuration properties and environment variables, either per role or per role group, where the more specific override (role group) has precedence over the less specific one (role).
Overriding certain properties which are set by operator (such as the ports) can interfere with the operator and can lead to problems. |
Configuration Properties
For a role or role group, at the same level of config
, you can specify: configOverrides
for the server.properties
. For example, if you want to set the auto.create.topics.enable
to disable automatic topic creation, it can be configured in the KafkaCluster
resource like so:
brokers:
roleGroups:
default:
configOverrides:
server.properties:
auto.create.topics.enable: "false"
replicas: 1
Just as for the config
, it is possible to specify this at role level as well:
brokers:
configOverrides:
server.properties:
auto.create.topics.enable: "false"
roleGroups:
default:
replicas: 1
All override property values must be strings.
For a full list of configuration options we refer to the Apache Kafka Configuration Reference.
Environment Variables
In a similar fashion, environment variables can be (over)written. For example per role group:
servers:
roleGroups:
default:
envOverrides:
MY_ENV_VAR: "MY_VALUE"
replicas: 1
or per role:
servers:
envOverrides:
MY_ENV_VAR: "MY_VALUE"
roleGroups:
default:
replicas: 1
Storage for data volumes
You can mount volumes where data is stored by specifying PersistentVolumeClaims for each individual role group:
brokers:
roleGroups:
default:
config:
resources:
storage:
data:
capacity: 2Gi
In the above example, all Kafka brokers in the default group will store data (the location of the property log.dirs
) on a 2Gi
volume.
By default, in case nothing is configured in the custom resource for a certain role group, each Pod will have a 1Gi
large local volume mount for the data location.
Memory requests
You can request a certain amount of memory for each individual role group as shown below:
brokers:
roleGroups:
default:
config:
resources:
memory:
limit: '2Gi'
In this example, each Kafka container in the "default" group will have a maximum of 2 gigabytes of memory. To be more precise, these memory limits apply to the containers running the Kafka daemons but not to any sidecar containers that are part of the pod.
Setting this property will also automatically set the maximum Java heap size for the corresponding process to 80% of the available memory. Be aware that if the memory constraint is too low, the cluster might fail to start. If pods terminate with an 'OOMKilled' status and the cluster doesn’t start, try increasing the memory limit.
For more details regarding Kubernetes memory requests and limits see: Assign Memory Resources to Containers and Pods.
CPU requests
Similarly to memory resources, you can also configure CPU limits, as shown below:
brokers:
roleGroups:
default:
config:
resources:
cpu:
max: '500m'
min: '250m'
For more details regarding Kubernetes CPU limits see: Assign CPU Resources to Containers and Pods.