集群规划:
Zookeeper集群共三台服务器,分别为:sto1、sto2、sto3。
Kafka集群共三台服务器,分别为:sto1、sto2、sto3。
1、Zookeeper集群准备
kafka是一个分布式消息队列,需要依赖ZooKeeper,请先安装好zk集群。
Zookeeper集群安装步骤略。
2、安装Kafka
下载压缩包(官网地址:http://kafka.apache.org/downloads.html)
解压:
tar zxvf kafka_2.10-0.9.0.1.tgz -C /opt/sgb
mv kafka_2.10-0.9.0.1/ kafka
修改配置文件:config/server.properties
broker.id=0 #参考zookeeper的myid ,每个id不能一样,0,1,2集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器) zookeeper.connect=sto1:2181,sto2:2181,sto3:2181 #zookeeper集群地址
代码分发 scp -r /opt/sgb/kafka/ sto2:/opt scp -r /opt/sgb/kafka/ sto3:/opt
修改sto2、sto3上Kafka配置文件中的broker.id(分别在sto2、3服务器上执行以下命令修改broker.id) sed -i -e 's/broker.id=.*/broker.id=1/' /opt/sgb/kafka/config/server.properties sed -i -e 's/broker.id=.*/broker.id=2/' /opt/sgb/kafka/config/server.properties
3、启动Kafka集群
A、启动Zookeeper集群。
B、启动Kafka集群。
分别在三台服务器上执行以下命令启动:
bin/kafka-server-start.sh config/server.properties
4、测试
创建topic: bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --create --replication-factor 2 --partitions 3 --topic test 查看topic列表 bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --list 查看“test”topic描述 bin/kafka-topics.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --describe --topic test 创建生产者 bin/kafka-console-producer.sh --broker-list sto1:9092,sto2:9092,sto3:9092 --topic test 创建消费者 bin/kafka-console-consumer.sh --zookeeper sto1:2181,sto2:2181,sto3:2181 --from-beginning --topic test
----------------------------------------------------------------------------------------------------------
消費者 package bhz.kafka.example; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class KafkaConsumer { public static final String topic = "test"; public static void main(String[] args) { Properties props = new Properties(); props.put("zookeeper.connect", "sto1:2181,sto2:2181,sto3:2181"); //group 代表一个消费组 props.put("group.id", "group1"); //zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); //序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); //设置订阅主题。 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(topic).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) System.out.println(it.next().message()); } }
生產者 package bhz.kafka.example; import java.util.Properties; import java.util.concurrent.TimeUnit; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import kafka.serializer.StringEncoder; public class KafkaProducer { public static final String topic = "test"; public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.put("zookeeper.connect", "sto1:2181,sto2:2181,sto3:2181"); //声明zk properties.put("serializer.class", StringEncoder.class.getName()); properties.put("metadata.broker.list", "sto1:9092"); // 声明kafka broker properties.put("request.required.acks", "1"); Producer producer = new Producer<Integer, String>(new ProducerConfig(properties)); for(int i=0; i < 10; i++){ producer.send(new KeyedMessage<Integer, String>(topic, "hello kafka" + i)); System.out.println("send message: " + "hello kafka" + i); TimeUnit.SECONDS.sleep(1); } producer.close(); } }
--------------------------------------------------------------------------------------------------------------------------------
基于docker环境搭建kafka集群(单机版)
1.kafka依赖于zookeeper ,所以需要拉取kafka和zookeeper镜像。
docker pull wurstmeister/kafka docker pull wurstmeister/zookeeper
2.启动zookeeper容器(wurstmeister/zookeeper镜像拥有默认命令“/usr/sbin/sshd && bash /usr/bin/start-zk.sh”,所以只需启动一个守护式容器即可)
docker run --name zookeeper -p 12181:2181 -d wurstmeister/zookeeper:latest
3.启动三个kafka容器
docker run -p 19092:9092 --name kafka1 -d -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest docker run -p 19093:9093 --name kafka2 -d -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19093 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest docker run -p 19094:9094 --name kafka3 -d -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=宿主机IP:12181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://宿主机IP:19094 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka:latest
4.分别进入容器kafka1、kafka2、kafka3创建主题test1、test2、test3。下面以kafka3为例。
#在守护式容器中启动一个交互式进程 docker exec -i -t kafka3 /bin/bash cd /opt/kafka_2.12-2.5.0/bin #创建主题test3 ./kafka-topics.sh --zookeeper 192.168.181.163:12181 --create --topic test3 --replication-factor 1 --partitions 3 Created topic "test3" #查看主题test3 ./kafka-topics.sh --zookeeper 192.168.181.163:12181 --describe --topic test3
可以看到,已经是集群环境,可以看到leader机器、副本在分区上的保存情况,和ISR列表成员
5.测试集群,在kafka3上向test1发送消息,在kafka2上消费test1
kafka3
./kafka-console-producer.sh --broker-list 192.168.181.163:19092,192.168.181.163:19093,192.168.181.163:19094 --topic test1 >testword >test
kafka1
./kafka-console-consumer.sh --bootstrap-server 192.168.181.163:19092,192.168.181.163:19093,192.168.181.163:19094 --topic test1 --from-beginning
6.依次关闭kafka2、test1后查看集群状态
./kafka-topics.sh --zookeeper 192.168.181.163:12181 --describe --topic test3 ./kafka-topics.sh --zookeeper 192.168.181.163:12181 --describe --topic test3 ./kafka-topics.sh --zookeeper 192.168.181.163:12181 --describe --topic test3
相关推荐
kafka集群部署文档及kafka详解,包括kafka常用命令,详细部署说明,如何运维以及一些FAQ,下载app注册免费获取:http://m3w.cn/jcsh
zookeeper集群部署,kafka集群部署,kafka介绍,topic创建、删除、kafka监控
kafka集群部署步骤
zookeeper配置、集群部署 kafka配置、集群部署 Window平台下
《Kafka集群部署》配置文件。有需要的同学可以下载下来看看。
KAFKA集群部署文档,详细说明见文档,请大家多多分享。
redhat linux 部署Kafka集群
第5章 Kafka集群部署 根据前面章节的介绍,知道了Fabric组网过程的第一步是需要生成证书等文件,而这些默认配置信息的生成依赖于configtx.yaml及crypto-config.yaml配置文件。 在采用Kafka作为启动过类型的Fabric...
kafka集群版部署说明,该文档详细的说明了kafka消息中间件的集群部署说明,为想要使用kafka的人员提供了有效的参考
kafka集群部署
Kafka配置构建创建文件在130、131、132服务器rm -rf /opt/deploy/data/kafka && mkdir -p /opt/deplo
本文不讲kafka集群原理,只谈部署步骤。 默认读者已对kafka有最基本的认知,纯粹作为部署笔记,方便回忆。 另外本文是基于Windows部署的,Linux的步骤是基本相同的(只是启动脚本位置不同)。 kafka集群类型: ...
10.2 克隆虚拟机从1704E中克隆出三个虚拟机 ip如下:one 192.168.73.200 two 192.168.73.201 three 192.1
云计算基础架构应用
搭建基于sasl的安全认证的kafka集群,并配置acl,使用户能分权分域接受发送消息
包好zk集群部署、kafka集群部署、kafka/zk的注册服务,开机启动设置。自己搭建总结的文档,照着做就能搭成功。
现在我们就来看看在生产环境中的Kafka集群方案该怎么做。既然是集群,那必然就要有多个Kafka节点机 器,因为只有单台机器构成的Kafka伪集群只能用于日常测试之用,根本无法满足实际的线上生产需求。而 真正的线上...
kakfa,kafka集群安装部署全量安装包,及部署文档,详见附件,包含kafka_2.11-1.1.1,zookeeper-3.4.9.tar,kafka快速安装
Kafka集群及Kafka-Manager安装部署