1.参考https://github.com/wurstmeister/kafka-docker的实现。
2.参考https://github.com/simplesteph/kafka-stack-docker-compose
3.基于上述两个参考,实现以下的部署文件。
version: '3.1'
services:
zoo1:
image: zookeeper:3.4.9
hostname: zoo1
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
volumes:
- ./zk-multiple-kafka-multiple/zoo1/data:/data
- ./zk-multiple-kafka-multiple/zoo1/datalog:/datalog
zoo2:
image: zookeeper:3.4.9
hostname: zoo2
ports:
- "2182:2182"
environment:
ZOO_MY_ID: 2
ZOO_PORT: 2182
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
volumes:
- ./zk-multiple-kafka-multiple/zoo2/data:/data
- ./zk-multiple-kafka-multiple/zoo2/datalog:/datalog
zoo3:
image: zookeeper:3.4.9
hostname: zoo3
ports:
- "2183:2183"
environment:
ZOO_MY_ID: 3
ZOO_PORT: 2183
ZOO_SERVERS: server.1=zoo1:2888:3888 server.2=zoo2:2888:3888 server.3=zoo3:2888:3888
volumes:
- ./zk-multiple-kafka-multiple/zoo3/data:/data
- ./zk-multiple-kafka-multiple/zoo3/datalog:/datalog
kafka1:
image: wurstmeister/kafka:2.12-2.0.1
container_name: kafka1
hostname: kafka1
ports:
- "9092:9092"
- "1099:1099"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099"
JMX_PORT: 1099
volumes:
- ./zk-multiple-kafka-multiple/kafka1:/kafka
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zoo1
- zoo2
- zoo3
kafka2:
image: wurstmeister/kafka:2.12-2.0.1
container_name: kafka2
hostname: kafka2
ports:
- "9093:9092"
- "2099:1099"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 2
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099"
JMX_PORT: 1099
volumes:
- ./zk-multiple-kafka-multiple/kafka2:/kafka
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zoo1
- zoo2
- zoo3
kafka3:
image: wurstmeister/kafka:2.12-2.0.1
container_name: kafka3
hostname: kafka3
ports:
- "9094:9092"
- "3099:1099"
environment:
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
KAFKA_BROKER_ID: 3
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.10.100:9094
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 3
KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=127.0.0.1 -Dcom.sun.management.jmxremote.rmi.port=1099"
JMX_PORT: 1099
volumes:
- ./zk-multiple-kafka-multiple/kafka3:/kafka
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zoo1
- zoo2
- zoo3
manager:
image: hlebalbau/kafka-manager:2.0.0.2
hostname: manager
ports:
- "9000:9000"
environment:
ZK_HOSTS: "zoo1:2181,zoo2:2182,zoo3:2183"
APPLICATION_SECRET: "random-secret"
KAFKA_MANAGER_AUTH_ENABLED: "true"
KAFKA_MANAGER_USERNAME: "abc"
KAFKA_MANAGER_PASSWORD: "123"
command: -Dpidfile.path=/dev/null
4.测试文件
基于https://github.com/segmentio/kafka-go库的示范,实现如下:
package kaf
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"log"
"time"
)
func LeaderProduce() {
topic := "my-topic"
partition := 0
conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
if err != nil {
log.Fatal(err)
}
conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
conn.WriteMessages(
kafka.Message{Value: []byte(fmt.Sprint("one!", time.Now()))},
kafka.Message{Value: []byte(fmt.Sprint("two!", time.Now()))},
kafka.Message{Value: []byte(fmt.Sprint("three!", time.Now()))},
)
conn.Close()
}
func LeaderConsumer() {
topic := "my-topic"
partition := 0
conn, _ := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max
for {
msg, err := batch.ReadMessage()
if err != nil {
break
}
fmt.Println(string(msg.Value))
}
batch.Close()
conn.Close()
}
func ClusterProduce(port int) {
// make a writer that produces to topic-A, using the least-bytes distribution
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
Topic: "topic-A",
Balancer: &kafka.LeastBytes{},
})
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte(fmt.Sprint("Hello World!", time.Now())),
},
kafka.Message{
Key: []byte("Key-B"),
Value: []byte(fmt.Sprint("One!", time.Now())),
},
)
if err != nil {
fmt.Println(port, "error", err)
}
w.Close()
}
func clusterConsume(port int) {
// make a new reader that consumes from topic-A
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "consumer-group-id",
Topic: "topic-A",
MinBytes: 1024 * 10, // 10KB
MaxBytes: 10e6, // 10MB
})
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
fmt.Println(port, "error.....", err)
time.Sleep(time.Second * 10)
continue
}
fmt.Printf("%v--message at topic/partition/offset %v/%v/%v: %s = %s\n", port, m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
// time.Sleep(time.Second)
}
r.Close()
}