Usage

If you are not installing the operator using Helm then after installation the CRD for this operator must be created:

kubectl apply -f /etc/stackable/kafka-operator/crd/kafkacluster.crd.yaml

To create an Apache Kafka (v3.2.0) cluster named simple-kafka assuming that you already have a Zookeeper cluster named simple-zk:

---
apiVersion: zookeeper.stackable.tech/v1alpha1
kind: ZookeeperZnode
metadata:
  name: simple-kafka-znode
spec:
  clusterRef:
    name: simple-zk
    namespace: default
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  version: 3.2.0-stackable0.1.0
  zookeeperConfigMapName: simple-kafka-znode
  brokers:
    roleGroups:
      default:
        replicas: 1

If you wish to include integration with Open Policy Agent and already have an OPA cluster, then you can include an opa field pointing to the OPA cluster discovery ConfigMap and the required package. The package is optional and will default to the metadata.name field:

---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  version: 3.2.0-stackable0.1.0
  zookeeperConfigMapName: simple-kafka-znode
  opa:
    configMapName: simple-opa
    package: kafka
  brokers:
    roleGroups:
      default:
        replicas: 1

You can change some opa cache properties by overriding:

---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  version: 3.2.0-stackable0.1.0
  zookeeperConfigMapName: simple-kafka-znode
  opa:
    configMapName: simple-opa
    package: kafka
  brokers:
    configOverrides:
      server.properties:
        opa.authorizer.cache.initial.capacity: "100"
        opa.authorizer.cache.maximum.size: "100"
        opa.authorizer.cache.expire.after.seconds: "10"
    roleGroups:
      default:
        replicas: 1

A full list of settings and their respective defaults can be found here.

Monitoring

The managed Kafka instances are automatically configured to export Prometheus metrics. See Monitoring for more details.

Provide log4j.properties

Per default, the log4j.properties from the kafka package is used. However, you can provide your own log4j.properties via the custom resource:

---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  version: 3.2.0-stackable0.1.0
  zookeeperConfigMapName: simple-kafka-znode
  log4j: |-
    log4j.rootLogger=INFO, stdout, kafkaAppender

    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n

    log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
    log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
    log4j.appender.kafkaAppender.File=${kafka.logs.dir}/server.log
    log4j.appender.kafkaAppender.layout=org.apache.log4j.PatternLayout
    log4j.appender.kafkaAppender.layout.ConversionPattern=[%d] %p %m (%c)%n
  brokers:
    roleGroups:
      default:
        replicas: 3

Encryption

The internal and client communication can be encrypted TLS. This requires the Secret Operator to be present in order to provide certificates. The utilized certificates can be changed in a top-level config.

---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  version: 3.2.0-stackable0.1.0
  zookeeperConfigMapName: simple-kafka-znode
  config:
    tls:
      secretClass: tls (1)
    internalTls:
      secretClass: kafka-internal-tls (2)
  brokers:
    roleGroups:
      default:
        replicas: 3
1 The tls.secretClass refers to the client-to-server encryption. Defaults to the tls secret. It can be deactivated by setting config.tls to null.
2 The internalTls.secretClass refers to the broker-to-broker internal encryption. This must be explicitly set or defaults to tls. Can be disabled by setting config.internalTls to null.

The tls secret is deployed from the Secret Operator and looks like this:

---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
  name: tls
spec:
  backend:
    autoTls:
      ca:
        secret:
          name: secret-provisioner-tls-ca
          namespace: default
        autoGenerate: true

You can create your own secrets and reference them e.g. in the tls.secretClass or internalTls.secretClass to use different certificates.

Authentication

The internal or broker-to-broker communication is authenticated via TLS. In order to enforce TLS authentication for client-to-server communication, you can set an AuthenticationClass reference in the custom resource provided by the Commons Operator.

---
apiVersion: authentication.stackable.tech/v1alpha1
kind: AuthenticationClass
metadata:
  name: kafka-client-tls (2)
spec:
  provider:
    tls:
      clientCertSecretClass: kafka-client-auth-secret (3)
---
apiVersion: secrets.stackable.tech/v1alpha1
kind: SecretClass
metadata:
  name: kafka-client-auth-secret (4)
spec:
  backend:
    autoTls:
      ca:
        secret:
          name: secret-provisioner-tls-kafka-client-ca
          namespace: default
        autoGenerate: true
---
apiVersion: kafka.stackable.tech/v1alpha1
kind: KafkaCluster
metadata:
  name: simple-kafka
spec:
  version: 3.2.0-stackable0.1.0
  zookeeperConfigMapName: simple-kafka-znode
  config:
    tls:
      secretClass: tls
    clientAuthentication:
      authenticationClass: kafka-client-tls (1)
  brokers:
    roleGroups:
      default:
        replicas: 3
1 The config.clientAuthentication.authenticationClass can be set to use TLS for authentication. This is optional.
2 The referenced AuthenticationClass that references a SecretClass to provide certificates.
3 The reference to a SecretClass.
4 The SecretClass that is referenced by the AuthenticationClass in order to provide certificates.

Configuration & Environment Overrides

The cluster definition also supports overriding configuration properties and environment variables, either per role or per role group, where the more specific override (role group) has precedence over the less specific one (role).

Overriding certain properties which are set by operator (such as the ports) can interfere with the operator and can lead to problems.

Configuration Properties

For a role or role group, at the same level of config, you can specify: configOverrides for the server.properties. For example, if you want to set the auto.create.topics.enable to disable automatic topic creation, it can be configured in the KafkaCluster resource like so:

brokers:
  roleGroups:
    default:
      configOverrides:
        server.properties:
          auto.create.topics.enable: "false"
      replicas: 1

Just as for the config, it is possible to specify this at role level as well:

brokers:
  configOverrides:
    server.properties:
      auto.create.topics.enable: "false"
  roleGroups:
    default:
      replicas: 1

All override property values must be strings.

For a full list of configuration options we refer to the Apache Kafka Configuration Reference.

Environment Variables

In a similar fashion, environment variables can be (over)written. For example per role group:

servers:
  roleGroups:
    default:
      envOverrides:
        MY_ENV_VAR: "MY_VALUE"
      replicas: 1

or per role:

servers:
  envOverrides:
    MY_ENV_VAR: "MY_VALUE"
  roleGroups:
    default:
      replicas: 1

Storage for data volumes

You can mount volumes where data is stored by specifying PersistentVolumeClaims for each individual role group:

brokers:
  roleGroups:
    default:
      config:
        resources:
          storage:
            data:
              capacity: 2Gi

In the above example, all Kafka brokers in the default group will store data (the location of the property log.dirs) on a 2Gi volume.

By default, in case nothing is configured in the custom resource for a certain role group, each Pod will have a 1Gi large local volume mount for the data location.

Memory requests

You can request a certain amount of memory for each individual role group as shown below:

brokers:
  roleGroups:
    default:
      config:
        resources:
          memory:
            limit: '2Gi'

In this example, each Kafka container in the "default" group will have a maximum of 2 gigabytes of memory. To be more precise, these memory limits apply to the containers running the Kafka daemons but not to any sidecar containers that are part of the pod.

Setting this property will also automatically set the maximum Java heap size for the corresponding process to 80% of the available memory. Be aware that if the memory constraint is too low, the cluster might fail to start. If pods terminate with an 'OOMKilled' status and the cluster doesn’t start, try increasing the memory limit.

For more details regarding Kubernetes memory requests and limits see: Assign Memory Resources to Containers and Pods.

CPU requests

Similarly to memory resources, you can also configure CPU limits, as shown below:

brokers:
  roleGroups:
    default:
      config:
        resources:
          cpu:
            max: '500m'
            min: '250m'

For more details regarding Kubernetes CPU limits see: Assign CPU Resources to Containers and Pods.