k8s下部署kafka

1、下载strimzi

一定要下载与k8s同版本的strimzi的版本,我的k8s版本是v1.21.5,因此选用0.36版本。

wget https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.36.0/strimzi-0.36.0.tar.gz
tar zxvf strimzi-0.36.0.tar.gz
cd /root/deploy/kafka/strimzi-0.36.0

2、替换一些镜像地址

查找出所有镜像,将其替换为不需要科学上网的私服地址

grep -r "image: quay.io/strimzi" .

可以使用下面的命令进行替换为你自己的私服镜像地址

grep -rl "quay.io/strimzi/operator:0.36.0" . | xargs sed -i 's|quay.io/strimzi/operator:0.36.0|hb.xxxx.com/library/strimzi/operator:0.36.0|g'

grep -rl "quay.io/strimzi/drain-cleaner:0.5.0" . | xargs sed -i 's|quay.io/strimzi/drain-cleaner:0.5.0|hb.xxxx.com/library/strimzi/drain-cleaner:0.5.0|g'

grep -rl "quay.io/strimzi/canary:0.6.0" . | xargs sed -i 's|quay.io/strimzi/canary:0.6.0|hb.xxxx.com/library/strimzi/canary:0.6.0|g'

3、安装 CRD 、RBAC 资源及kafka集群控制器(用于自动构建kafa集群)

kubectl create namespace kafka

#替换安装文件中的命名空间
sed -i 's/namespace: .*/namespace: kafka/' install/cluster-operator/*RoleBinding*.yaml

kubectl create -f install/cluster-operator -n kafka
kubectl get deployments -n kafka

4、创建kafka集群

使用examples中的示例文件,创建一个使用永久存储,三个zk,三个kafka的集群

cp examples/kafka/kafka-jbod.yaml ./

编辑kafka-jbod.yaml文件,添加命名空间并指定私服中的kafka镜像

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.5.0
    replicas: 3
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: strimzi.io/name
                      operator: In
                      values:
                        - kafka-cluster-kafka
                topologyKey: "kubernetes.io/hostname"
    image: hb.xxxx.com/library/strimzi/kafka:0.36.0-kafka-3.5.0
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
      - name: external
        port: 9094
        type: nodeport
        tls: false
        configuration:
          brokers:
            - broker: 0
              advertisedHost: 192.168.88.21
              advertisedPort: 32317
            - broker: 1
              advertisedHost: 192.168.88.21
              advertisedPort: 30093
            - broker: 2
              advertisedHost: 192.168.88.21
              advertisedPort: 32367

    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: "3.5"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
      - id: 1
        type: persistent-claim
        size: 100Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    template:
      pod:
        affinity:
          podAntiAffinity:
            requiredDuringSchedulingIgnoredDuringExecution:
              - labelSelector:
                  matchExpressions:
                    - key: strimzi.io/name
                      operator: In
                      values:
                        - kafka-cluster-zookeeper
                topologyKey: "kubernetes.io/hostname"
    storage:
      type: persistent-claim
      size: 100Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}
  • 修改创建资源的命名空间
  • 如果不需要对外提供维护服务,可以将nodeport部分删除
  • template.pod.affinity.podAntiAffinity:配置kafka及zookeeper的pod亲后度,避免将所有的pod都部署到一个节点上
  • 另外一定要注意:你的storageclass的VOLUMEBINDINGMODE模式必须是WaitForFirstConsumer,否则会导致创建afka的pvc时遇到问题,也可能造成所有的集群节点(kafka及zookeeper)都会分配到一个node节点上
  • advertisedHost:如果你想用其中的一个节点的vpn服务对外提供nodeport服务,那么就需要你配置advertisedHost字段。ip地址是你想对外提供服务的vpn的地址;端口号是strimzi随机生成的nodeport地址,你可以随意设置创建完集群后通过kubectl get pods -n kafka查看这些ip,再新到文件后执行kubectl apply -f ./xxx.yaml来自动生效。

创建集群

kubectl apply -f ./kafka-jbod.yaml

5、配置nginx

stream {
    #kakfa
    upstream kafka30005 {
        hash $remote_addr consistent;
        server k8s1:30343 weight=5 max_fails=1 fail_timeout=10s;
        server k8s2:30343 weight=5 max_fails=1 fail_timeout=10s;
        server k8s3:30343 weight=5 max_fails=1 fail_timeout=10s;

        server k8s1:32679 weight=5 max_fails=1 fail_timeout=10s;
        server k8s2:32679 weight=5 max_fails=1 fail_timeout=10s;
        server k8s3:32679 weight=5 max_fails=1 fail_timeout=10s;

        server k8s1:30049 weight=5 max_fails=1 fail_timeout=10s;
        server k8s2:30049 weight=5 max_fails=1 fail_timeout=10s;
        server k8s3:30049 weight=5 max_fails=1 fail_timeout=10s;

    }

    server {
        listen 30005;
        proxy_connect_timeout 5s;
        proxy_timeout 10s;
        proxy_pass kafka30005;
    }
}

strimzi会为每个broker创建不同的nodeport端口,而且这些端口也是随机的,我们可以使用【kubectl get svc -n kafka】查看所有的service并找出对应的nodeport端口号。

6、测试

(1)在同一个k8s集群中运行kafka容器进行测试

#创建并进入kafka测试pod
kubectl run kafka-test -n kafka --rm -ti --image=hb.xxxx.com/library/strimzi/kafka:0.36.0-kafka-3.5.0 -- bash

#列出所有的主题
bin/kafka-topics.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --list

#发送测试消息
echo "Hello Kafka! 这是我的第一条消息!" | bin/kafka-console-producer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic quickstart-events

#消费上面的测试消息
bin/kafka-console-consumer.sh --bootstrap-server kafka-cluster-kafka-bootstrap:9092 --topic quickstart-events --from-beginning

(2)使用linux下的kafa客户端

下载kafka_2.13-4.1.0.tgz后运行相关命令

bin/kafka-topics.sh --bootstrap-server 192.168.3.109:31092 --list

7、相关文档

https://strimzi.io/docs/operators/0.36.0/deploying.html#cluster-operator-str