了解Kafka
Kafka是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。
1、 消息的发布称为 producer,即生产者。消息的订阅称作 consumer,即消费者。一台 kafka服务器就是一个broker。一个集群由多个broker组成。
2、Producer和broker之间没有负载均衡机制。broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且 zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。
3、producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理。
4、使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少性能的创建对象和垃圾回收。
Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。
kafka单机实例部署(预准备java环境)
zhupengfei@debian10:/opt/kafka$ sudo wget https://archive.apache.org/dist/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz # zookeeper包
zhupengfei@debian10:/opt/kafka$ sudo tar -zxvf zookeeper-3.3.6.tar.gz # 解压
zhupengfei@debian10:/opt/kafka$ sudo vim /etc/profile
#zookeeper
export zookeeper_home=/opt/kafka/zookeeper-3.3.6
zhupengfei@debian10:/opt/kafka$ source /etc/profile
zhupengfei@debian10:/opt/kafka$ sudo vim /opt/kafka/zookeeper-3.3.6/conf/zoo.cfg
tickTime=2000
dataDir=/opt/kafka/zookeeper-3.3.6/data
clientPort=2181
initLimit=10
syncLimit=5
server.1=10.0.2.15:2888:3888
zhupengfei@debian10:/opt/kafka$ sudo mkdir -p /opt/kafka/zookeeper-3.3.6/data/
zhupengfei@debian10:/opt/kafka$ cd /opt/kafka/zookeeper-3.3.6/bin
zhupengfei@debian10:/opt/kafka/zookeeper-3.3.6/bin$ sudo ./zkServer.sh start
JMX enabled by default
Using config: /opt/kafka/zookeeper-3.3.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
zhupengfei@debian10:/opt/kafka$ sudo wget https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz #kafka包
zhupengfei@debian10:/opt/kafka$ sudo tar -zxvf kafka_2.8.0-0.8.0.tar.gz
zhupengfei@debian10:/opt/kafka$ cd /opt/kafka/kafka_2.8.0-0.8.0/config/
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/config$ sudo vim server.properties
host.name=10.0.2.15
log.dirs=/opt/kafka/kafka_2.8.0-0.8.0/logs
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/config$ sudo mkdir -p /opt/kafka/kvafka_2.8.0-0.8.0/logscd
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/config$ sudo mkdir -p /opt/kafka/kafka_2.8.0-0.8.0/zookeeper # #创建zookeeper目录
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/config$ sudo mkdir -p /opt/kafka/kafka_2.8.0-0.8.0/log/zookeeper # 创建zookeeper日志目录
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/config$ cd /opt/kafka/kafka_2.8.0-0.8.0/config
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/config$ sudo vim zookeeper.properties # 配置kafka,修改文件server.properties
dataDir=/opt/kafka/kafka_2.8.0-0.8.0/zookeeper
dataLogDir=/opt/kafka/kafka_2.8.0-0.8.0/log/zookeeper
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/config$ cd /opt/kafka/kafka_2.8.0-0.8.0/bin
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/bin$./kafka-server-start.sh ../config/server.properties
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/bin$ netstat -anptu | grep 9092
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
tcp6 0 0 10.0.2.15:9092 :::* LISTEN -
tcp6 0 0 10.0.2.15:57152 10.0.2.15:9092 ESTABLISHED -
tcp6 0 0 10.0.2.15:9092 10.0.2.15:57152 ESTABLISHED -
测试kafka
1、创建 topic
zhupengfei@debian10:/opt/kafka/bin$ sudo ./kafka-create-topic.sh -partition 1 -replica 1 -zookeeper localhost:2181 -topic test # 创建一个叫做"test"的topic,它只有一个分区,一个副本
creation succeeded!
2、通过list命令查看创建的topic
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/bin$ sudo ./kafka-list-topic.sh -zookeeper localhost:2181
topic: test partition: 0 leader: 0 replicas: 0 isr: 0
3、启动producer 发送消息
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/bin$ sudo ./kafka-console-producer.sh -broker-list 10.0.2.15:9092 -topic test # 运行producer并在控制台中输一些消息,这些消息将被发送到服务端
this is a message
4、启动consumer消费消息
zhupengfei@debian10:/opt/kafka/kafka_2.8.0-0.8.0/bin$ sudo ./kafka-console-consumer.sh -zookeeper localhost:2181 -topic test # 在一个终端中运行consumer命令行,在另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息
this is a message
结合脚本启停kafka
zhupengfei@debian10:/home/zhupengfei/scripts$ cat stop_zookeeper-kafka.sh
#!/bin/sh
MID=`ps -ef |grep zookeeper |awk '{print $2}'`
echo "准备停止 【zookeeper-3.3.6】 主进程..."
if [ ! $MID ]
then
echo "警告:没有正在运行的 【zookeeper-3.3.6】 "
else
echo "正在结束 【zookeeper-3.3.6】..."" 应用进程号为:"$MID
kill -9 $MID
fi
zhupengfei@debian10:/home/zhupengfei/scripts$ cat start_zookeeper-kafka.sh
#! /bin/sh
echo "准备启动 【zookeeper-3.3.6】和 【kafka_2.8.0-0.8.0】..."
cd /opt/kafka/zookeeper-3.3.6/bin
./zkServer.sh start &
sleep 10
cd /opt/kafka/kafka_2.8.0-0.8.0/bin
./kafka-server-start.sh ../config/server.properties & > /opt/kafka/run.log
echo "成功启动 【zookeeper-3.3.6】和 【kafka_2.8.0-0.8.0】!!!"