Usage
If you are not installing the operator using Helm then after installation the CRD for this operator must be created:
kubectl apply -f /etc/stackable/kafka-operator/crd/kafkacluster.crd.yaml
To create an Apache Kafka cluster named simple-kafka
assuming that you already have a Zookeeper cluster named simple-zk
:
---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
name: simple-kafka-znode
spec:
clusterRef:
name: simple-zk
namespace: default
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.3.1
stackableVersion: "23.4.0-rc2"
clusterConfig:
zookeeperConfigMapName: simple-kafka-znode
brokers:
roleGroups:
default:
replicas: 1
If you wish to include integration with Open Policy Agent and already have an OPA cluster, then you can include an opa
field pointing to the OPA cluster discovery ConfigMap
and the required package. The package is optional and will default to the metadata.name
field:
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.3.1
stackableVersion: "23.4.0-rc2"
clusterConfig:
authorization:
opa:
configMapName: simple-opa
package: kafka
zookeeperConfigMapName: simple-kafka-znode
brokers:
roleGroups:
default:
replicas: 1
You can change some opa cache properties by overriding:
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.3.1
stackableVersion: "23.4.0-rc2"
clusterConfig:
authorization:
opa:
configMapName: simple-opa
package: kafka
zookeeperConfigMapName: simple-kafka-znode
brokers:
configOverrides:
server.properties:
opa.authorizer.cache.initial.capacity: "100"
opa.authorizer.cache.maximum.size: "100"
opa.authorizer.cache.expire.after.seconds: "10"
roleGroups:
default:
replicas: 1
A full list of settings and their respective defaults can be found here.
Monitoring
The managed Kafka instances are automatically configured to export Prometheus metrics. See Monitoring for more details.
Log aggregation
The logs can be forwarded to a Vector log aggregator by providing a discovery ConfigMap for the aggregator and by enabling the log agent:
spec:
clusterConfig:
vectorAggregatorConfigMapName: vector-aggregator-discovery
brokers:
config:
logging:
enableVectorAgent: true
Further information on how to configure logging, can be found in Logging.
Encryption
The internal and client communication can be encrypted TLS. This requires the Secret Operator to be present in order to provide certificates. The utilized certificates can be changed in a top-level config.
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.3.1
stackableVersion: "23.4.0-rc2"
clusterConfig:
zookeeperConfigMapName: simple-kafka-znode
tls:
serverSecretClass: tls (1)
internalSecretClass: kafka-internal-tls (2)
brokers:
roleGroups:
default:
replicas: 3
1 | The spec.clusterConfig.tls.serverSecretClass refers to the client-to-server encryption. Defaults to the tls secret. Can be deactivated by setting serverSecretClass to null . |
2 | The spec.clusterConfig.tls.internalSecretClass refers to the broker-to-broker internal encryption. This must be explicitly set or defaults to tls . May be disabled by setting internalSecretClass to null . |
The tls
secret is deployed from the Secret Operator and looks like this:
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: tls
spec:
backend:
autoTls:
ca:
secret:
name: secret-provisioner-tls-ca
namespace: default
autoGenerate: true
You can create your own secrets and reference them e.g. in the spec.clusterConfig.tls.serverSecretClass
or spec.clusterConfig.tls.internalSecretClass
to use different certificates.
Authentication
The internal or broker-to-broker communication is authenticated via TLS. In order to enforce TLS authentication for client-to-server communication, you can set an AuthenticationClass
reference in the custom resource provided by the Commons Operator.
---
apiVersion: authentication.stackable.tech/v1alpha1
kind: AuthenticationClass
metadata:
name: kafka-client-tls (2)
spec:
provider:
tls:
clientCertSecretClass: kafka-client-auth-secret (3)
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
name: kafka-client-auth-secret (4)
spec:
backend:
autoTls:
ca:
secret:
name: secret-provisioner-tls-kafka-client-ca
namespace: default
autoGenerate: true
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
name: simple-kafka
spec:
image:
productVersion: 3.3.1
stackableVersion: "23.4.0-rc2"
clusterConfig:
authentication:
- authenticationClass: kafka-client-tls (1)
zookeeperConfigMapName: simple-kafka-znode
brokers:
roleGroups:
default:
replicas: 3
1 | The clusterConfig.authentication.authenticationClass can be set to use TLS for authentication. This is optional. |
2 | The referenced AuthenticationClass that references a SecretClass to provide certificates. |
3 | The reference to a SecretClass . |
4 | The SecretClass that is referenced by the AuthenticationClass in order to provide certificates. |
Configuration & Environment Overrides
The cluster definition also supports overriding configuration properties and environment variables, either per role or per role group, where the more specific override (role group) has precedence over the less specific one (role).
Overriding certain properties which are set by operator (such as the ports) can interfere with the operator and can lead to problems. |
Configuration Properties
For a role or role group, at the same level of config
, you can specify: configOverrides
for the server.properties
. For example, if you want to set the auto.create.topics.enable
to disable automatic topic creation, it can be configured in the KafkaCluster
resource like so:
brokers:
roleGroups:
default:
configOverrides:
server.properties:
auto.create.topics.enable: "false"
replicas: 1
Just as for the config
, it is possible to specify this at role level as well:
brokers:
configOverrides:
server.properties:
auto.create.topics.enable: "false"
roleGroups:
default:
replicas: 1
All override property values must be strings.
For a full list of configuration options we refer to the Apache Kafka Configuration Reference.
Environment Variables
In a similar fashion, environment variables can be (over)written. For example per role group:
servers:
roleGroups:
default:
envOverrides:
MY_ENV_VAR: "MY_VALUE"
replicas: 1
or per role:
servers:
envOverrides:
MY_ENV_VAR: "MY_VALUE"
roleGroups:
default:
replicas: 1
Storage for data volumes
You can mount volumes where data is stored by specifying PersistentVolumeClaims for each individual role group:
brokers:
roleGroups:
default:
config:
resources:
storage:
data:
capacity: 2Gi
In the above example, all Kafka brokers in the default group will store data (the location of the property log.dirs
) on a 2Gi
volume.
By default, in case nothing is configured in the custom resource for a certain role group, each Pod will have a 1Gi
large local volume mount for the data location.
Resource Requests
Stackable operators handle resource requests in a sligtly different manner than Kubernetes. Resource requests are defined on role or group level. See Roles and role groups for details on these concepts. On a role level this means that e.g. all workers will use the same resource requests and limits. This can be further specified on role group level (which takes priority to the role level) to apply different resources.
This is an example on how to specify CPU and memory resources using the Stackable Custom Resources:
---
apiVersion: example.stackable.tech/v1alpha1
kind: ExampleCluster
metadata:
name: example
spec:
workers: # role-level
config:
resources:
cpu:
min: 300m
max: 600m
memory:
limit: 3Gi
roleGroups: # role-group-level
resources-from-role: # role-group 1
replicas: 1
resources-from-role-group: # role-group 2
replicas: 1
config:
resources:
cpu:
min: 400m
max: 800m
memory:
limit: 4Gi
In this case, the role group resources-from-role
will inherit the resources specified on the role level. Resulting in a maximum of 3Gi
memory and 600m
CPU resources.
The role group resources-from-role-group
has maximum of 4Gi
memory and 800m
CPU resources (which overrides the role CPU resources).
For Java products the actual used Heap memory is lower than the specified memory limit due to other processes in the Container requiring memory to run as well. Currently, 80% of the specified memory limits is passed to the JVM. |
For memory only a limit can be specified, which will be set as memory request and limit in the Container. This is to always guarantee a Container the full amount memory during Kubernetes scheduling.
If no resource requests are configured explicitly, the Kafka operator uses the following defaults:
brokers:
roleGroups:
default:
config:
resources:
memory:
limit: '2Gi'
cpu:
min: '500m'
max: '4'
storage:
log_dirs:
capacity: 1Gi