Step 2: Use Vector to Read Data from Kafka
Deploy vector aggregator
In the cluster where Kafka is located, execute the following commands to create the vector namespace and deploy the vector-aggregator within that namespace.
| Note |
|---|
|
helm install vector-aggregator aggregator-0.30.0.tgz -n vector --create-namespace --set vectorConfig.image.tag=v0.2.1 --set image.tag=0.36.0-debian
Required images:
docker.io/timberio/vector:0.36.0-debian
docker.io/kubesphere/kubectl:v1.26.13
docker.io/kubesphere/vector-config:v0.2.1
Obtain Certificates
-
On a node in the cluster where Kafka is located, execute the following commands.
Note kafka cluster is the name of the Kafka cluster, kafka namespace is the namespace where Kafka resides, and kafka user is the Kafka user created earlier.
export kafka_cluster=< kafka cluster > export kafka_namespace=< kafka namespace > export kafka_user=< kafka user > echo -e "apiVersion: v1\ndata:" > kafka-aggregator-ca.yaml echo " ca.crt: $(kubectl get secret -n $kafka_namespace ${kafka_cluster}-cluster-ca-cert \ -o jsonpath='{.data.ca\.crt}')" >> kafka-aggregator-ca.yaml echo -e "kind: Secret\nmetadata:\n name: kafka-aggregator-cluster-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Aggregator\n \ namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yaml echo "---" >> kafka-aggregator-ca.yaml echo -e "apiVersion: v1\ndata:" >> kafka-aggregator-ca.yaml echo " user.p12: $(kubectl get secret -n $kafka_namespace ${kafka_user} \ -o jsonpath='{.data.user\.p12}')" >> kafka-aggregator-ca.yaml echo -e "kind: Secret\nmetadata:\n name: kafka-aggregator-user-ca\n labels:\n logging.whizard.io/certification: 'true'\n logging.whizard.io/vector-role: Aggregator\n \ namespace: vector\ntype: Opaque" >> kafka-aggregator-ca.yamlThis command generates the kafka-aggregator-ca.yaml file, containing two secret files: kafka-aggregator-cluster-ca and kafka-aggregator-user-ca. They contain the
ca.crtanduser.p12information from the previous step, respectively. Example:apiVersion: v1 data: ca.crt: xxx kind: Secret metadata: name: kafka-aggregator-cluster-ca labels: logging.whizard.io/certification: 'true' logging.whizard.io/vector-role: Aggregator namespace: vector type: Opaque --- apiVersion: v1 data: user.p12: xxx kind: Secret metadata: name: kafka-aggregator-user-ca labels: logging.whizard.io/certification: 'true' logging.whizard.io/vector-role: Aggregator namespace: vector type: Opaque
Configure vector-aggregator to Send Messages to OpenSearch
Create the vector configuration. Fill in the corresponding Kafka cluster address in bootstrap_servers and the corresponding OpenSearch address in sink:kafka_to_opensearch:endpoints.
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Secret
metadata:
name: vector-aggregator-opensearch
namespace: vector
labels:
logging.whizard.io/vector-role: Aggregator
logging.whizard.io/enable: "true"
stringData:
kafka-pipeline.yaml: >-
sources:
kafka_source:
type: "kafka"
group_id: "ks"
topics: [ "^(vector)-.+" ]
bootstrap_servers: "172.31.53.102:32476"
librdkafka_options:
security.protocol: "ssl"
ssl.endpoint.identification.algorithm: "none"
ssl.ca.location: "/etc/vector/custom/certification/ca.crt"
ssl.keystore.location: "/etc/vector/custom/certification/user.p12"
ssl.keystore.password: "yj5nwJLVqyII1ZHZCW2RQwJcyjKo3B9o"
max.poll.interval.ms: "600000"
partition.assignment.strategy: roundrobin
decoding:
codec: json
session_timeout_ms: 20000
socket_timeout_ms: 90000
transforms:
kafka_remapped:
inputs:
- kafka_source
source: |-
.event.original = encode_json(.)
ts = parse_timestamp!(.timestamp, format: "%+")
.timestamp = format_timestamp!(ts, format: "%+", timezone: "local")
.topictime = to_unix_timestamp(ts, unit: "milliseconds")
.logstamp = from_unix_timestamp!(.topictime, unit: "milliseconds")
.logdate = .timestamp
.idxdate = format_timestamp!(ts, format: "%Y.%m.%d", timezone: "local")
tmp = split!(.topic, "-")
.index = join!(remove!(tmp, [0]), "-")
type: remap
sinks:
kafka_to_opensearch:
api_version: v8
auth:
password: admin
strategy: basic
user: admin
batch:
timeout_secs: 5
buffer:
max_events: 10000
endpoints:
- https://<opensearch-url>:<port>
tls:
verify_certificate: false
type: elasticsearch
inputs:
- kafka_remapped
bulk:
index: "{{ .index }}-%Y.%m.%d"
request:
timeout_sec: 180
type: Opaque
EOF