1 非docker 安装
https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.11-2.3.0.tgz
tar -zxvf kafka_2.11-2.3.0.tgz
mv kafka_2.11-2.3.0 /usr/local/kafka
vim /usr/local/kafka/config/server.properties
broker.id=1
log.dir=/data/kafka/logs
配置zookeeper 使用kafka/bin/下自带的zk
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
kafka-zk 需要依赖java环境
apt-get install openjdk-8-jdk
运行 报错 卒。配置低了
bin/kafka-server-start.sh config/server.properties
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /usr/local/kafka/hs_err_pid25796.log
2 重新开始
使用docker安装
sudo docker pull wurstmeister/kafka
sudo docker pull wurstmeister/zookeeper
docker-compose.yml
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 10.1.1.1
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
sudo docker-compose up
报错
Creating kafka_zookeeper_1 ... error
ERROR: for kafka_zookeeper_1 Cannot start service zookeeper: no status provided on response: unknown
ERROR: for zookeeper Cannot start service zookeeper: no status provided on response: unknown
ERROR: Encountered errors while bringing up the project.
换云搬瓦工的机器试一下
docker-compose up -d
Creating network "kafka_default" with the default driver
Creating kafka_zookeeper_1 ... done
Creating kafka_kafka_1 ... done
但是docker ps -a 发现只有zookeeper启动了,kafka失败, 检查日志 发现kafka运行需要java环境,而且对内存有要求,搬瓦工的vps不足够
docker-compose logs -f | grep "kafka"
Attaching to kafka_kafka_1, kafka_zookeeper_1
kafka_1 | waiting for kafka to be ready
kafka_1 | [Configuring] 'advertised.port' in '/opt/kafka/config/server.properties'
kafka_1 | Excluding KAFKA_HOME from broker config
kafka_1 | [Configuring] 'advertised.host.name' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'port' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
kafka_1 | Excluding KAFKA_VERSION from broker config
kafka_1 | [Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
kafka_1 | #
kafka_1 | # There is insufficient memory for the Java Runtime Environment to continue.
kafka_1 | # Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
kafka_1 | # An error report file with more information is saved as:
kafka_1 | # //hs_err_pid1.log
kafka_1 | OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Out of memory' (errno=12)
kafka_1 | waiting for kafka to be ready
kafka_1 | [Configuring] 'advertised.port' in '/opt/kafka/config/server.properties'
kafka_1 | Excluding KAFKA_HOME from broker config
kafka_1 | [Configuring] 'advertised.host.name' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'port' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
kafka_1 | Excluding KAFKA_VERSION from broker config
kafka_1 | [Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
kafka_1 | [Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
kafka_1 | OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Out of memory' (errno=12)
kafka_1 | #
kafka_1 | # There is insufficient memory for the Java Runtime Environment to continue.
kafka_1 | # Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
kafka_1 | # An error report file with more information is saved as:
kafka_1 | # /tmp/hs_err_pid1.log
kafka_kafka_1 exited with code 1
^CERROR: Aborting.
因此修改docker-compose.yml 加入以下
KAFKA_HEAP_OPTS: "-Xmx256M -Xms128M"
stop 再启动
docker-compose stop
docker-compose up -d
完美
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
08a4907a4ada wurstmeister/kafka "start-kafka.sh" 22 minutes ago Up 22 minutes 0.0.0.0:9092->9092/tcp kafka_kafka_1
162253966fde wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" About an hour ago Up 22 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafka_zookeeper_1
测试
进入容器
docker exec -it kafka_kafka_1 bash
查看已经建好的topic (docker-compose.yml)
$KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper kafka_zookeeper_1:2181
test
发送消息
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=test --broker-list kafka_kafka_1:9092
>
>xx
>yy
>zz
接收消息
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka_kafka_1:9092 --from-beginning --topic test
xx
yy
zz
接下来是golang接入kafka了
package main
import (
"context"
"fmt"
"time"
"github.com/segmentio/kafka-go"
)
var (
addr = "10.1.1.1:9092"
topic = "test"
)
func main() {
go Receive()
go Publish()
time.Sleep(time.Second * 100000)
}
func Receive() {
// make a new reader that consumes from topic-A, partition 0, at offset 42
fmt.Println("receive connecting ...")
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{addr},
Topic: topic,
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
//MaxWait: time.Hour,
})
//r.SetOffset(42)
ctx, _ := context.WithTimeout(context.Background(), time.Second*2)
for {
fmt.Println("receive start ...")
m, err := r.ReadMessage(ctx)
fmt.Println("receive one ...")
if err != nil {
fmt.Println("readmessave error", err)
break
}
fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
fmt.Println("receive close")
r.Close()
}
func Publish() {
fmt.Println("publish connecting ...")
w := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{addr},
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
fmt.Println("publish start ...")
for {
message := "msg" + time.Now().String()
err := w.WriteMessages(context.Background(),
kafka.Message{
Key: []byte("Key-A"),
Value: []byte(message),
},
)
if err != nil {
print("publish err:", err)
return
} else {
fmt.Println("publish sucess")
}
time.Sleep(time.Second)
}
fmt.Println("public close")
w.Close()
}
运行
go run main.go
receive connecting ...
publish connecting ...
publish start ...
receive start ...
message at offset 0: =
message at offset 1: = xx
message at offset 2: = yy
message at offset 3: = zz
message at offset 4: Key-A = msg2019-10-11 17:51:32.305919841 +0800 CST m=+0.000266091
message at offset 5: Key-A = msg2019-10-11 17:53:22.818207089 +0800 CST m=+0.000368552
message at offset 6: Key-A = msg2019-10-11 17:53:26.847210435 +0800 CST m=+4.029372111
message at offset 7: Key-A = msg2019-10-11 17:53:42.670721312 +0800 CST m=+0.000364712