Deploy vector aggregator

In the cluster where Kafka is located, execute the following commands to create the vector namespace and deploy the vector-aggregator within that namespace.

Note
  • Do not deploy the vector aggregator to the kubesphere-logging-system namespace to avoid conflicts with the built-in vector aggregator in KubeSphere.

  • Please contact KubeSphere delivery service experts to obtain the helm package for the vector aggregator.

helm install vector-aggregator aggregator-0.30.0.tgz -n vector --create-namespace --set vectorConfig.image.tag=v0.2.1 --set image.tag=0.36.0-debian

Required images:

docker.io/timberio/vector:0.36.0-debian
docker.io/kubesphere/kubectl:v1.26.13
docker.io/kubesphere/vector-config:v0.2.1

Obtain Certificates

  1. On a node in the cluster where Kafka is located, execute the following commands.

    Note

    kafka cluster is the name of the Kafka cluster, kafka namespace is the namespace where Kafka resides, and kafka user is the Kafka user created earlier.

    export kafka_cluster=< kafka cluster >
    export kafka_namespace=< kafka namespace >
    export kafka_user=< kafka user >
    echo -e "apiVersion: v1\ndata:" > kafka-aggregator-ca.yaml
    echo "  ca.crt: $(kubectl get secret -n $kafka_namespace ${kafka_cluster}-cluster-ca-cert  \
    -o jsonpath='{.data.ca\.crt}')" >> kafka-aggregator-ca.yaml
    echo -e "kind: Secret\nmetadata:\n  name: kafka-aggregator-cluster-ca\n  labels:\n    logging.whizard.io/certification: 'true'\n    logging.whizard.io/vector-role: Aggregator\n  \
    namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml
    echo "---" >> kafka-aggregator-ca.yaml
    echo -e "apiVersion: v1\ndata:" >> kafka-aggregator-ca.yaml
    echo "  user.p12: $(kubectl get secret -n $kafka_namespace ${kafka_user}  \
    -o jsonpath='{.data.user\.p12}')" >> kafka-aggregator-ca.yaml
    echo -e "kind: Secret\nmetadata:\n  name: kafka-aggregator-user-ca\n  labels:\n    logging.whizard.io/certification: 'true'\n    logging.whizard.io/vector-role: Aggregator\n  \
    namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml

    This command generates the kafka-aggregator-ca.yaml file, containing two secret files: kafka-aggregator-cluster-ca and kafka-aggregator-user-ca. They contain the ca.crt and user.p12 information from the previous step, respectively. Example:

    apiVersion: v1
    data:
      ca.crt: xxx
    kind: Secret
    metadata:
      name: kafka-aggregator-cluster-ca
      labels:
        logging.whizard.io/certification: 'true'
        logging.whizard.io/vector-role: Aggregator
      namespace: vector
    type: Opaque
    ---
    apiVersion: v1
    data:
      user.p12: xxx
    kind: Secret
    metadata:
      name: kafka-aggregator-user-ca
      labels:
        logging.whizard.io/certification: 'true'
        logging.whizard.io/vector-role: Aggregator
      namespace: vector
    type: Opaque

Configure vector-aggregator to Send Messages to OpenSearch

Create the vector configuration. Fill in the corresponding Kafka cluster address in bootstrap_servers and the corresponding OpenSearch address in sink:kafka_to_opensearch:endpoints.

cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
  name: vector-aggregator-opensearch
  namespace: vector
  labels:
    logging.whizard.io/vector-role: Aggregator
    logging.whizard.io/enable: "true"
stringData:
  kafka-pipeline.yaml: >-
    sources:
      kafka_source:
        type: "kafka"
        group_id: "ks"
        topics: [ "^(vector)-.+" ]
        bootstrap_servers: "172.31.53.102:32476"
        librdkafka_options:
          security.protocol: "ssl"
          ssl.endpoint.identification.algorithm: "none"
          ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
          ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
          ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
          max.poll.interval.ms: "600000"
          partition.assignment.strategy: roundrobin
        decoding:
          codec: json
        session_timeout_ms: 20000
        socket_timeout_ms: 90000
    transforms:
      kafka_remapped:
        inputs:
        - kafka_source
        source: |-
          .event.original = encode_json(.)
          ts = parse_timestamp!(.timestamp, format: "%+")
          .timestamp = format_timestamp!(ts, format: "%+", timezone: "local")
          .topictime = to_unix_timestamp(ts, unit: "milliseconds")
          .logstamp = from_unix_timestamp!(.topictime, unit: "milliseconds")
          .logdate = .timestamp
          .idxdate = format_timestamp!(ts, format: "%Y.%m.%d", timezone: "local")
          tmp = split!(.topic, "-")
          .index = join!(remove!(tmp, [0]), "-")
        type: remap
    sinks:
      kafka_to_opensearch:
        api_version: v8
        auth:
          password: admin
          strategy: basic
          user: admin
        batch:
          timeout_secs: 5
        buffer:
          max_events: 10000
        endpoints:
        -  https://<opensearch-url>:<port>
        tls:
          verify_certificate: false
        type: elasticsearch
        inputs:
        - kafka_remapped
        bulk:
          index: "{{ .index }}-%Y.%m.%d"
        request:
          timeout_sec: 180
type: Opaque
EOF