Kafka之生产者

[TOC]
从编程的角度而言,生产者就是负责向 Kafka 发送消息的应用程序。在 Kafka 的历史变迁 中, 一共有两个大版本的生产者客户端: 第-个是于 Kafka开源之初使用 Scala语言编写的客户 端,我们可以称之为旧生产者客户端(OldProducer)或 Scala版生产者客户端;第二个是从 Kafka 0.9.x 版本开始推出的使用 Java 语言编写的客户端,我们可以称之为新生产者客户端( New Producer)或 Java 版生产者客户端,它弥补了 旧版客户端中存在的诸 多设计缺陷

虽然Kafka是用 Java/Scalai吾言编写的,但这并不妨碍它对于多语言的支持,在 Kafka官网中,“CLIENTS”的入口 l提供了一份多语言的支持列表,其中包括常用的CIC++、 Python、 Go等语 言 ,不过这些其他类语 言 的客户端并非由 Kafka社区维护,如果使用则需要另行下载 。本章主要针对现下流行的新生产者 CJava语言编写的)客户端做详细介绍,而旧生产者客户端己被湖汰, 故不再做相应的介绍了。

客户端开发

一个正常的生产逻辑需要具备以下几个步骤 :
(1 )配置生产者客户端参数及创建相应的生产者实例。
(2)构建待发送的消息 。
(3 )发送消息。
(4)关闭生产者实例。

代码清单 1-2 中己经简单对生产者客户端 的编码做了 一个基本演示 如代码清单 2-1 所示 。

public class KafkaProducerAnalysis {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";

    public static Properties initConfig() {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props.put("key.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer",
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put("client.id", "producer.client.id.demo");
        return props;
    }

    public static Properties initNewConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }

    public static Properties initPerferConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        return props;
    }

    public static void main(String[] args) throws InterruptedException {
        Properties props = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

//        KafkaProducer<String, String> producer = new KafkaProducer<>(props,
//                new StringSerializer(), new StringSerializer());

        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello, Kafka!");
        try {
            producer.send(record);
//            producer.send(record, new Callback() {
//                @Override
//                public void onCompletion(RecordMetadata metadata, Exception exception) {
//                    if (exception == null) {
//                        System.out.println(metadata.partition() + ":" + metadata.offset());
//                    }
//                }
//            });
        } catch (Exception e) {
            e.printStackTrace();
        }

//        TimeUnit.SECONDS.sleep(5);
    }
}

这里有必要单独说明的是构建的消息对象 ProducerRecord,它并不是单纯意义上的消息, 它包含了多个属性 , 原本需要发送的与业务 相 关的消息体只是其 中 的一个 value 属性 ,比 如 “ Hello, Kafka!”只是 ProducerRecord对象中的一个属性。 ProducerRecord类的定义如下(只截
取成员变量) :

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
    川 省略其他成员方法和构造方法
    }

其中 topic 和 partition 字段分别代表消息要发往的主题和分区号。 headers 字段是 消息的头部, Kafka 0.11.x 版本才引入这个属性,它大多用来设定 一些与 应用相关的信息,如无 需要也可以不用设置。 key 是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算 分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个 key 可 以让消息再进行二次归类,同 一个 key 的消息会被划分到同一个分区中,value 是指消息体, 一般不为空, 如果为空则表示特定的消息一一墓碑消息,timestamp 是指消息的时间戳, 它有 CreateTime 和 LogAppendTime 两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

2.1.1 必要的参数配置

在创建真正的生产者实例前需要配置相应的参数,比如需要连接的 Kafka集群地址。参照代码清单 2-1 中的 initConfig()方法,在 Kafka 生产者客户端 KatkaProducer 中有 3 个参数是必填的。

  • bootstrap.servers:该参数用来指定生产者客户端连接 Kafka 集群所需的 broker地址清单,具体的内容格式为hostl:portl,host2:port2,可以设置一个或多个 地址,中间以逗号隔开,此 参数 的默认值为“” 。 注意这里并非需要所有的 broker 地 址,因为生产者会从给定的 broker 里查找到其他 broker 的信息 。不过建议至少要设置 两个以上的 broker 地址信息,当其中任意 一个岩机时,生产者仍然可以连接到 Kafka 集群上。

  • key. serializer 和 value . serializer: broker 端接收的消息必须以字节数组 (byte[])的形式存在。代码清单 2-1 中生产者使用的 KatkaProducer<Stri吨, String>和 ProducerRecord<String, String>中的泛型 <Stri吨, String>对应 的就是消息中 key 和value 的类型,生产者客户端使用这种方式可以让代码具有良好 的可读性 ,不过在发 往 broker之前需要将消息中对应的 key 和 value 做相应的序列化操作来转换成字节 数组。 key . serial工zer 和 value .serializer 这两个参数分 别用来指定 key 和 value 序列化操作 的序列化器,这两个参数无默认值。注意这里必须填写序列化器的 全限定名 ,如代码清单 2-1 中的org.apache.kafka.common.serialization.StringSerializer, 单单指定 StringSerializer是错误的

注意到代码清单 2-1 中的 initConfig()方法里还设置了 一个参数 client.id,这个参数用来 设定 KafkaProducer 对应的客户端 id, 默认值为“” 。

注意到代码清单 2-1 中的 initConfig()方法里还设置了 一个参数 client.id,这个参数用来 设定 KafkaProducer 对应的客户端 id, 默认值为“” 。 如果客户端不设置, 则 KafkaProducer 会 自动生成一个非空字符串,内容形式如“producer-I”“producer-2”

KafkaProducer 中的参数众多,远非示例 initConfig()方法 中的那样只有 4 个,开发人员可以 根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普 通开发人员无法记住所有的参数名称,只能有个大致的印象。在实际使用过程中,诸如
“ key.serializer”“max.request.size” “interceptor.classes” 之类的字符串经常 由于人为因素而书 写错误。为 此,我们可以 直接使用客户端中的 org.apache.kafka.clients.producer.ProducerConfig 类来做一定程度上的预防措施,每个参数在 ProducerConfig 类中都有对应 的名称,以代码清单 2-1 中的 initConfig()方法为例 ,引入 ProducerConfig 后的修改结果如 下:

    public static Properties initNewConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo");
        return props;
    }

注意到上面的代码中 key . serializer 和 value . serializer 参数对应类 的全限定名 比较长,也比较容易写错, 这里通过 Java 中的技巧来做进一步的改进, 相关代码如下:

props.put (ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG , StringSerializer .class.getName());
props.put(ProducerConfig .VALUE SERIALIZER CLASS CONFIG, StringSerializer .class .getName()) ;

如此代码便简洁了许多,同时进一步降低了人为出错的可能性。在配置完参数之后,我们就可以使用它来创建一个生产者实例,示例如下:

KafkaProducer<String, String> producer= new KafkaProducer<>(props) ;

KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将KafkaProducer 实例进行池化来供其他线程调用。

KafkaProducer 中有多个构造方法,比如在创 建 KafkaProducer 实例时并没有设定 key.serializer 和 value .serializer 这两个配置参数,那么就需要在构造方法中添加 对应的序列化器,示例如下:

KafkaProducer<String, String> producer= new KafkaProducer<>(props, new StringSerializer() , new StringSerializer());

其内部原理和无序列化器的构造方法 - 样,不过就实际应用而言,一般都选用 KafkaProducer(Properties properties)这个构造方法来创建 KafkaProducer 实例。

2.1.2 消息的发送

在创建完生产者实例之后,接下来的工作就是构建消息,即创建 ProducerRecord 对象 。通 过代码清单 2-1 中我们己经了解了 ProducerRecord 的属性结构, 其中 topic 属性和 value 属 性是必填项,其余属性是选填项,对应的 ProducerRecord 的构造方法也有多种,参考如下:

image.png

代码清单 2-1 中使用的是最后一种构造方法,也是最简单的一种,这种方式相当于将 ProducerRecord 中除 topic 和 value 外的属性全部值设置为 null。在实际 的应用中,还会用到 其他构造方法,比如要指定 key,或者添加 headers 等。

创建生产者实例和构建消息之后 , 就可以开始发送消息了。发送消息主要有三种模式 : 发 后 即忘( fire-and-forget〕、同步( sync)及异步 (async)。

代码清单 2-1 中的这种发送方式就是发后即忘,它只管往 Kafka 中发送消息而并不关心消 息是否正确到达。在大多数情况下,这种发送方式没有什么 问题 , 不过在某些时候( 比如发生 不可重试异常时〉会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

KafkaProducer 的 send()方法井非是 void 类型 , 而是 Future<RecordMetadata>类型 , send() 方法有 2 个重载方法,具体定义如下 :

publ工C Future<RecordMetadata> send(ProducerRecord<K, V> record)

public Future<RecordMetadata> send(ProducerRecord<K , V> record ,
Callback callback)

要实现同步的发送方式,可以利用返回的 Future对象实现, 示例如下:

try {
producer . send(record) .get() ;
} catch (ExecutionException I InterruptedException e) { 
e .printStackTrace() ;
}

实际上 send()方法本身就是异步的, send()方法返回的 Future对象可以使调用方稍后获得发 送的结果。示例中在执行 send()方法之后直接链式调用了 get()方法来阻塞等待 Kafka 的响应, 直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交 由外层逻辑处理。

也可以在执行完 send()方法之后不直接调用 get()方法, 比如下面的一种同步发送方式的实现

try {
Future<RecordMetadata> future= producer.send(record) ; 
RecordMetadata metadata =future .get();

System.out.println(metadata . top工c () + ” - " +
metadata.partition() + ”: ” + metadata .offset() );

这样可以获取一个 RecordMetadata 对象,在 RecordMetadata 对象里包含了消息的 一 些元数 据信息,比如当前消息的主题、分区号、分区中的偏移量( offset〕、时间戳等。如果在应用代 码中需要这些信息,则可以使用这个方式 。 如果不需要,则直接采用 producer.send(record).get() 的方式更省事。

Future 表示一个任务的生命周期,并提供了相应的方法来判断任务是否己经完成或取消, 以及获取任务的结果和取消任务等 。 既然 KafkaProduc巳r.send()方法的返回值是一个 Future 类型 的对象,那么完全可以用 Java 语言层面的技巧来丰富应用的实现,比如使用 Future 中的 get(long
timeout, TimeUnit unit)方法实现可超时的阻塞。

KafkaProducer 中一般会发生两种类型的异常 : 可重试的异常和不可重试的异常 。常见的可 重试异常有 : NetworkException、 LeaderNotAvailableException、 UnknownTopicOrPartitionException、 NotEnoughReplicasException、 NotCoordinatorException 等。 比如 NetworkException 表示网络异 常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如 LeaderNotAvailableException表示分区的 leader副本不可用,这个异常通常发生在 leader副本下 线而新的 leader 副本选举完成之前,重试之后可以重新恢复。不可重试的异常,比如 1.4 节中 提及的 RecordTooLargeException异常,暗示了所发送的消息太大, KafkaProducer对此不会进行 任何重试 , 直接抛 出异常 。

对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复 了,就不会抛出异常 。 retries 参数的默认值为 0,配置方式参考如下:

props . put(ProducerConf 工q . RETRIES_CONFIG, 1 0 ) ;

示例中配置了 10 次重试。如果重试了 10 次之后还没有恢复,那么仍会抛出异常,进而发 送的外层逻辑就要处理这些异常了。

同步发送的方式可 靠性高,要么消息被发送成功,要么发生异常。如果发生异常 ,则可以 捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的 方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。

我们再来了解一下异步发送的方式,一般是在 send()方法里指定一个 Callback 的回调函数,
Kafka在返回响应时调用该函数来实现异步的发送确认。有读者或许会有疑问, send()方法的返 回值类型就是 Future,而 Future本身就可以用作异步的逻辑处理 。这样做不是不行,只不过 Future 里的 get()方法在何时调用,以及怎么调用都是需要面对的问题,消息不停地发送,那么诸多消息对应的 Future对象的处理难免会引起代码处理逻辑的混乱。使用 Callback的方式非常简洁明了, Kafka 有 响应时就会回调 , 要么发送成功,要么抛出异常。异步发送方式的示例如下 :


image.png

示例代码中遇到异常时( exception!=null)只是做了 简单的打印操作,在实际应用中应该使 用更加稳妥的方式来处理,比如可以将异常记录以便日后分析,也可以做 一定 的处理来进行消 息重发。 onCompletion()方法的两个参数是互斥的,消息发送成功时, metadata 不为 null 而 exception为 null:消息发送异常时, metadata为 null而 exception不为 null。

producer .send(recordl, callbackl) ; 
producer . send (record2 , callback2);

对于同一个分区而言,如果消息 recordl 于 record2 之前先发送(参考上面的示例代码〉, 那么 KafkaProducer就可以保证对应的 callbackl 在 callback2 之前调用,也就是说,因调函数的 调用也可以保证分 区有序。

通常,一个 KafkaProducer 不会只负 责发送单条消息,更多 的是发送多条消息,在发送完这 些消息之后,需要调用 KafkaProducer 的 close()方法来回收资源。 下面的示例中发送了 100 条消 息,之后就调用了 close()方法来回收所占用的资源:

int i = O;
while (i < 100) {
ProducerRecord<Str工ng , String> record =
new ProducerRecord<>{topic, ” msg” +i++) ;
try {
producer . send(record) .get() ;
} catch (InterruptedExcept工on I ExecutionException e) {

close()方法会阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer。与此同时, KafkaProducer 还提供了一个带超时时间的 close()方法,具体定义如下 :

public void close(long timeout, TimeUnit timeUnit)

如果调用了带超时时 间 timeout 的 close()方法,那么只会在等待 timeout 时间 内来完成所有 尚未完成 的请求 处理 , 然后强行退出。在实际应用 中 ,一般使用 的都是无参的 close()方法。

2.1.3 序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给 Kafka。而 在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的 对象。在代码清单2-1 中,为了方便,消息的key和value都使用了字符串, 对应程序中的 序列化器也使用 了客户端自带的 org.apache.kafka.common.serialization.StringSerializer, 除了用于 String类型的序列化器, 还有 ByteAη町、 ByteBuffer、 Bytes、 Double、 Integer、 Long这几种类 型,它们都实现了 org.apache.kafka.common.serialization.Serializer接口, 此接口有 3个方法:

publiC void configure (Map<String , ?> configs , boolean isKey) public byte[] serialize(String topic , T data)
public void close()

configure()方法用来配置当前类 , serialize()方法用来执行序列化操作。而 close()方法用来关 闭当前的序列化器, 一般情况下 close()是一个空方法, 如果实现了此方法,则必须确保此方法 的幕等性,因为这个方法很可能会被 KafkaProducer调用多次。

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了 某种序列化器,比如 StringSerializer, 而消费者使用了另一种序列化器,比如 IntegerSerializer, 那么是无法解析 出想要的数据的。

package chapter2;

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.util.Map;

/**
 * 代码清单2-2
 * 摘抄至Kafka源码:org.apache.kafka.common.serialization.StringSerializer
 *
 */
public class StringSerializer implements Serializer<String> {
    private String encoding = "UTF8";

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null)
            encodingValue = configs.get("serializer.encoding");
        if (encodingValue instanceof String)
            encoding = (String) encodingValue;
    }

    @Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + encoding);
        }
    }

    @Override
    public void close() {
        // nothing to do
    }
}

首先是 configureO方法, 这个方法是在创建 KafkaProducer实例的时候调用的, 主要用来确 定编码类型, 不过一般客户端对于 key. serializer.encoding, val ue .seria lizer . encoding和 serializer.e口coding这几个参数都不会配置,在 KafkaProducer的参数集合( ProducerConfig)里也没有这几个 参数( 它们可以看作用户 自定义的参数〉,所以一般情况下 encoding 的值就为默认的“ UTF-8” 。 serialize()方法非常直观,就是将 String 类型转为 byte[] 类型。

如果 Kafka 客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如 Avro、JSON、 Thri鱼、 ProtoBuf和 Protos阳ff等通用的序列化工具来实现 , 或者使用自定义类型的序列 化器来实现 。 下面就以 一个简单的例子来介绍自定义类型的使用方法 。

假设我们要发送的消息都是 Company对象,这个 Company的 定义很简单,只有名称 name和 地址address,示例代码参考如下(为了构建方便,示例中使用 了lombok1工具) :

package chapter2;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 代码清单2-3中的Company类
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class Company {
    private String name;
    private String address;
//    private String telphone;
}

下面我们再来看一下 Company对应的序列化器 CompanySerializer,示例代码如代码清单 2-3 所示。

package chapter2;

import org.apache.kafka.common.serialization.Serializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

/**
 * 代码清单2-3
 */
public class CompanySerializer implements Serializer<Company> {
    @Override
    public void configure(Map configs, boolean isKey) {
    }

    @Override
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.
                    allocate(4 + 4 + name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public void close() {
    }
}

上面的这段代码的逻辑很简单, configure()和close()方法也都为空。与此对应的反序列化器 CompanyDeserializer 的详细实现参见 3.2.3 节。

如何使用自定义的序列化器 CompanySerializer 呢?只需将 KafkaProducer 的 value . serializer 参数设置为 CompanySerializ巳r类的全限定名即可。假如我们要发送一个 Company对象到 Kafka, 关键代码如代码清单 2-4所示。

package chapter2;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * 代码清单2-4
 */
public class ProducerSelfSerializer {
    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                CompanySerializer.class.getName());
//        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
//                ProtostuffSerializer.class.getName());
        properties.put("bootstrap.servers", brokerList);

        KafkaProducer<String, Company> producer =
                new KafkaProducer<>(properties);
        Company company = Company.builder().name("hiddenkafka")
                .address("China").build();
//        Company company = Company.builder().name("hiddenkafka")
//                .address("China").telphone("13000000000").build();
        ProducerRecord<String, Company> record =
                new ProducerRecord<>(topic, company);
        producer.send(record).get();
    }
}

2.1.4 分区器

消息在通过 send()方法发往 broker 的过程 中, 有可能需要经过拦截器( Interceptor)、序列 化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器 (下一章会详细介绍〉一般不是必需的,而序列化器是必需的。消息经过序列 化 之后就需要确 定它发往的分区,如果消息 ProducerRecord 中指定了 partition 字段, 那么就不需要分区器
的作用 ,因 为 partition 代表的就是所要发往的分区号。

如 果 消 息 ProducerRecord 中没有指定 partition 字段,那么就需要依赖分区器 , 根据 key这个字段来计算 partition 的值。分区器的作用就是为消息分配分区。

Kafka 中提供的默认分区器是 org.apache.kafka.clients.producer.intemals.DefaultPartitioner, 它 实现了org.apache.kafka.clients.producer.Partitioner接口 , 这个接口中定义了 2 个方法 , 具体如下 所示。

public int partition(Str工ng topic , Object key, byte[] keyBytes ,
Ob] ect value , byte[] valueBytes , Cluster cluster) ;
public void close() ;

其中 partition()方法用来计算分区号,返回值为 int 类型。 partition()方法中的参数分别表示
主题 、键、序列化后的 键、值、序列 化后的值,以及集群的元数据信息,通过这些信息 可以实 现功能丰富的分区器。 close()方法在关闭分区器的时候用来回收一些资源。

在默认分区器 DefaultPartitioner 的实现中, close()是空方法,而在 partition()方法中定义了 主要的分区分配逻辑 。 如果 key 不为 null,那 么默认的分区器会对 key 进行哈 希(采 用 MurmurHash2 算法 ,具备高运算性能及低碰撞率),最终根据得到 的哈希值来计算分区号, 拥 有相同 key 的消息会被写入同一个分区 。 如果 key 为 null,那么消息将会以轮询的方式发往主 题内的各个可用分区。

在不改变主题分区数量的情况下 , key 与分区之间的映射可 以保持不变。不过, 一旦主题中增加了分区,那么就难 以保证 key 与分区之间的映射关系了。

除了使用 Kafka 提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同
DefaultPartitioner一样实现 Partitioner接口即可。默认的分区器在 key为 null时不会选择非可用 的分区,我们可以通过自 定义的分区器 DemoPartitioner来打破这一限制,具体的实现可以参考 下面的示例代码,如代码清单 2-5 所示。

package chapter2;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 代码清单2-5
 */
public class DemoPartitioner implements Partitioner {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (null == keyBytes) {
            return counter.getAndIncrement() % numPartitions;
        } else
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

实现 自定义的 DemoPartitioner 类之后 , 需要通过配置参数 partitioner. class 来显式 指定 这个分区器。示例 如 下 :

props . put (ProducerConfig . PARTITIONER_CLASS_CONFIG , DemoPartitioner .class .getName() ) ;

2.1.5 生产者拦截器

拦截器 (Interceptor)是早在 Kafka0.10.0.0 中就已经引入的一个功能, Kafka一共有两种拦 截器 : 生产者拦截器和消费者拦截器。本节主要讲述生产者拦截器 的相关 内容

生产者拦截器既可以用来在消息发送前做一些准备工作, 比如按照某个规则过滤不符合要 求的消息、修改消息的内容等, 也可以用来在发送回调逻辑前做一些定制化的需求,比如统计 类工作。

生产者拦截器 的 使用 也 很方便,主要是自定义实现 org.apache.kafka.clients.producer. Producerlnterceptor接口。 ProducerInterceptor接口中包含3个方法:

public ProducerRecord<K, V> onSend (ProducerRecord<K, V> record);

public void onAcknowledgement(RecordMetadata metadata, Excepti on exception ); 

public void close() ;

KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的 onSend()方法来对消 息进行相应 的定制化操作。一般来说最好不要修改消息 ProducerRecord 的 topic、 key 和partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏 差。比如修改 key 不仅会影响分区的计算,同样会影响 broker 端日志压缩( Log Compaction) 的功能 。

KafkaProducer 会在消息被应答( Acknowledgement)之前或消息发送失败时调用生产者拦 截器的 onAcknowledgement()方法,优先于用户设定的 Callback 之前执行。这个方法运行在 Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好, 否则会影响消息的发送 速度。

下面通过一个示例来 演示生产者拦截器的具体用法 , ProducerlnterceptorPrefix 中通过 onSend()方法来为每条消息添加一个前缀“prefixl-”,井且通过 onAcknowledgement()方法来计 算发送消息的成功率。 ProducerlnterceptorPrefix 类的具体实现如代码清单 2-6 所示 。

package chapter2;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

/**
 * 代码清单2-6
 */
public class ProducerInterceptorPrefix implements
        ProducerInterceptor<String, String> {
    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(
            ProducerRecord<String, String> record) {
        String modifiedValue = "prefix1-" + record.value();
        return new ProducerRecord<>(record.topic(),
                record.partition(), record.timestamp(),
                record.key(), modifiedValue, record.headers());
//        if (record.value().length() < 5) {
//            throw new RuntimeException();
//        }
//        return record;
    }

    @Override
    public void onAcknowledgement(
            RecordMetadata recordMetadata,
            Exception e) {
        if (e == null) {
            sendSuccess++;
        } else {
            sendFailure++;
        }
    }

    @Override
    public void close() {
        double successRatio = (double) sendSuccess / (sendFailure + sendSuccess);
        System.out.println("[INFO] 发送成功率="
                + String.format("%f", successRatio * 100) + "%");
    }

    @Override
    public void configure(Map<String, ?> map) {
    }
}

实现自定义的 ProducerInterceptorPrefix 之后,需要在 KafkaProducer 的配置参数 川 terceptor.classes 中指定这个拦截器,此参数的默认值为“”。示例如下:

properties.put(ProducerConfiq .INTERCEPTOR_CLASSES_ CONFIG , ProducerinterceptorPrefix.class.getName());

然后使用指定了 ProducerInterceptorPrefix 的生产者连续发送 10 条内容为“ kafka”的消 息, 在发送完之后客户端打印出如下信息:

[INFO]发送成功率= 100.000000%

如果消费这 10 条消息,会发现消费了的消息都变成了“ prefix1-kafka”,而不是原来的
“kafka” 。

KafkaProducer 中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链 会按照 interceptor.classes 参数配置的拦截器的顺序来一一执行(配置的时候,各个拦 截器之间使用逗号隔开)。下面我们再添加一个自定义拦截器 ProducerlnterceptorPrefixPlus,它 只实现了 Interceptor接口中的 onSend()方法,主要用来为每条消息添加另一个前缀“prefix2-”, 具体实现如下:

properties.put(ProducerConfig . INTERCEPTOR CLASSES CONFIG , ProducerinterceptorPrefix.class.getName( ) + ”,”
+ ProducerinterceptorPrefi xPlus . class.getName ());

在拦截链中,如果某个拦截器执行失败,那么下 一个拦截器会接着从上-个执行成功 的拦截器继续执行 。

原理分析

在前面的章节中,我们己经了解了 KafkaProducer的具体使用方法,而本节的内容主要是对 Kafka 生产者客户端的内部原理进行分析,通过了解生产者客户端的整体脉络可以让我们更好 地使用它,避免因为一些理解上的偏 差 而造成使用上的错误。

2.2.1 整体架构

在 2.1.4 节的 开头介绍了消息在真正发往 Kafka 之前,有可能 需要经历 拦截器 Clnterceptor)、 序列化器 (Serializer)和分区器(Partitioner)等一系列的作用,那么在此之后又会发生什么呢? 下面我们来看一下生产者客户端的整体架构,如图 2”l 所示。


image.png

整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和 Sender线程 (发送线 程)。在主线程中由 KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作 用之后缓存到消息累加器( RecordAccumulator,也称为消息收 集器〉中。 Sender 线程负责从 RecordAccumulator中获取消息并将其发送到 Kafka中。

RecordAccumulator 主要用来缓存消息 以便 Sender 线程可以批量发送,进而减少网络传输 的资源消耗以提升性能 。 RecordAccumulator 缓存的大 小可以通过生产者客户端参数 buffer.memory 配置,默认值为 33554432B,即 321\侣。 如果生产者发送消息的速度超过发 送到服务器的速度,则会导致生产者空间不足,这个时候 KafkaProducer的 send()方法调用要么 被阻塞,要么抛出异常,这个取决于参数 max.block.ms 的配置,此参数的默认值为 60000, 即 60 秒 。

主线程中发送过来的消息都会被迫加到 RecordAccumulator 的某个双端队列( Deque)中, 在 RecordAccumulator 的内部为每个分区都维护了 一 个双端队列,队列中的内容就是 ProducerBatch,即 Deque<ProducerBatch>。消息写入缓存 时,追加到双端队列的尾部: Sender读取消息时 ,从双端队列的头部读取。注意 ProducerBatch 不是 ProducerRecord, ProducerBatch 中可以包含一至多个 ProducerRecord。 通俗地说, ProducerRecord 是生产者中创建的消息,而 ProducerBatch 是指一个消息批次 , ProducerRecord 会被包含在 ProducerBatch 中,这样可以使 宇 节的使用更加紧凑。与此同时,将较小的 ProducerRecord 拼凑成一个较大 的 ProducerBatch,也 可以减少网络请求的次数以提升整体的吞吐量 。 ProducerBatch 和消息的具体格式有关,更多的 详细内容可以参考 5.2 节。如果生产者客户端需要向很多分区发送消息, 则可以 将buffer .memory 参数适当调大以增加整体的吞吐量 。

ProducerBatch 的大小和 batch . size 参数也有着密切的关系。当一条消息( ProducerRecord ) 流入 RecordAccumulator 时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从 这个双端队列的尾部获取一个 ProducerBatch (如果没有则新建),查看 ProducerBatch 中是否 还可以写入这个 ProducerRecord,如 果可以 则 写入,如果不可 以则 需要 创 建一个新 的 ProducerBatch。在新建 ProducerBatch时评估这条消息的大小是否超过 batch.size 参数的大 小,如果不超过,那么就以 batch. size 参数的大小来创建 ProducerBatch,这样在使用完这 段内存区域之后,可以通过 BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来 创建 ProducerBatch, 这段内存区域不会被复用。

Sender 从 RecordAccumulator 中 获取缓存的消息之后,会进 一 步将原本<分区, Deque< ProducerBatch>>的保存形式转变成<Node, List< ProducerBatch>的形式,其中 Node 表示 Kafka 集群 的 broker 节点 。对于网络连接来说,生产者客户端是与具体 的 broker 节点建立 的连接,也 就是向具体的 broker 节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer 的应用逻辑而 言 ,我们只 关注向哪个分区中发送哪些消息,所 以在这里需要做一个应用逻辑层 面到网络 1/0 层面的转换。

在转换成<Node, List<ProducerBatch>>的形式之后, Sender 还 会进一步封装成<Node, Request>的形式,这样就可以将 Request 请求发往各个 Nod巳了, 这里 的 Request 是指 Kafka 的 各种协议请求,对于消息发送而言就是指具体的 ProduceRequest

请求在从 Sender 线程发往 Kafka 之前还会保存到 InFlightRequests 中, InFlightRequests 保存对象的具体形式为 Map<Nodeld, Deque<R巳quest>>,它的主要作用是缓存 了已经发出去但还 没有收到响应的请求( Nodeld 是一个 String 类型,表示节点的 id 编号)。与此同时, InFlightRequests 还提供了许多管理类 的方法,并且通过配置参数还可 以限制每个连接(也就是 客户端与 Node 之间的连接)最多缓存的请求数。这个配置参数为 max. 工n . flight.requests . p e r . connection,默认值为 5,即每个连接最多只能缓存 5 个未响应的请求,超过该数值 之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应( Response)。通 过比较 Deque<Request>的 size 与这个参数的大小来判断对应的 Node 中是否己 经堆积了很多未 响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向 其发送请求会增大请求超时的可能。

2.2.2 元数据的更新

2.2.1 节中提及的 InFlightRequests 还可以获得 leastLoadedNode,即所有 Node 中负载最小的 那一个 。这里 的负载最小是通过每个 Node 在 InFlightRequests 中还未确认的请求决定的,未确 认的请求越多则认为负载越大 。 对于图 2-2 中的 InFlightRequests 来说,图中展示了 三个节点 NodeO、Nodel 和 Node2,很明显 Nodel 的 负载最 小 。也就是说, Nodel 为当前的 leastLoadedNode。 选择 leastLoadedNode 发送请求可以使它能够尽快发出,避免因网络拥塞 等异常 而影响整体的进 度。 leastLoadedNode 的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。


image.png

我们使用如下的方式创建了一条消息 ProducerRecord:
new ProducerRecord<>(topic, ” Hello, Kafka !”) ;

我们只知道主题的名称,对于其他一些必要的信息却一无所知 。 KafkaProducer要将此消息 追加到指定主题的某个分区所对应的 leader 副本之前,首先需要知道主题的分区数量,然后经 过计算得出(或者直接指定〉目标分区,之后 KafkaProducer 需要知道目标分区的 leader 副本所 在的 broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka,在这一过 程中 所 需要 的信息都属于元数据信息。

在 2.1.1 节中我们了解了 bootstrap.servers 参数只需要配置部分 broker节点的地址即 可,不需要配置所有 broker节点的地址,因为客户端可以自己发现其他 broker节点的地址, 这 一过程也属于元数据相关的更新操作 。与此同时 ,分区数量及 leader 副本的分布都会动态地变 化, 客户端也需要动态地捕捉这些变化。

元数据是指 Kafka 集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有 哪些分区,每个分区的 lead巳r副本分配在哪个节点上, follower副本分配在哪些节点上,哪些副 本在 AR、 ISR 等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超 过 rnetadata .rnax.age.rns 时间没有更新元数据都会引起元数据的更新操作 。客 户端参数 rnetadata .rnax.age.rns 的默认值为 300000,即 5 分钟。元数据的更新操作是在客户端 内部 进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出 leastLoadedNode, 然后 向这个 Node发送 MetadataRequest请求来获取具体的元数据信息。这个更新操作是由 Sender 线程发起的, 在创建完 MetadataRequest之后同样会存入 InF!ightRequests,之后的步骤就和发送 消息时的类似 。 元数据虽然由 Sender 线程负责更新,但是主线程也需要读取这些信息,这里的 数据同步通过 synchronized 和 final 关键字来保障。

2.3 重要的生产者参数

在 KafkaProducer 中 ,除了 2.1.1 节提及的 3 个默认的客户端参数,大部分 的参数都有合理 的默认值, 一般不需要修改它们。不过了解这些参数可以让我们更合理地使用生产者客户端, 其中还有一些重要的参数涉及程序的可用性和性能,如果能够熟练掌握它 们,也可以让我们在 编写相关的程序时能够更好地进行性能调优与故障排查 。 下面挑选一些重要的参数进行讲解。

1. acks

这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消 息是成功写入的。 acks 是生产者客户端中一个非常重要 的参数 ,它涉及消息的可靠性和吞吐 量之间的权衡。 acks参数有3种类型的值(都是字符串类型)。

acks = 1。默认值即为 l。生产者发送消息之后,只要分区的 leader副本成功写入消 息,那么它就会收到来自服务端的成功响应 。 如果消息无法写入 leader 副本,比如在 leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误 的响应,为了避免消息丢失,生产者可以选择重发消息 。如果消息写入 leader 副本并 返回成功响应给生产者,且在被其他 follower 副本拉取之前 leader 副本崩溃,那么此 时消息还是会丢失,因为新选举的 leader 副本中并没有这条对应的消息 。 acks 设置为 l,是消息可 靠性和吞吐量之 间的折中方案。

acks = 0。生产者发送消 息之后不需要等待任何服务端的响应 。如果在消息从发送到 写入 Kafka 的过程中出现某些异常,导致 Kafka 并没有收到这条消息,那么生产者也 无从得知,消息也就丢失了。在其他配置环境相同的情况下, acks 设置为 0 可以达 到最大的吞吐量。

acks =一l 或 acks =all。生产者在消息发送之后,需要等待 ISR 中的所有副本都成功 写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下, acks 设置为 1 (all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因 为 JSR 中可 能只有 leader 副本,这样就退化成了 acks=l 的情况。要获得更高的消息 可靠性需要配合 min.insync.replicas 等参数的联动,消息可靠性分析的具体内 容可以参考 8.3节。

注意 acks 参数配置的值是一个字符串类型,而不是整数类型。举个例子,将 acks 参数 设置为 0, 需要采用下面这两种形式 :

properties . put( " acks ”, ” 。 ” );

2. max.「equest.size

这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即 lMB。 一般情况下,这个默认值就可以满足大多数的应用场景了。笔者并不建议读者盲目地增大这个 参数的配置值,尤其是在对 Kafka 整体脉络没有足够把控的时候。因为这个参数还涉及一些其 他参数 的联动,比如 broker 端的 message.max .bytes 参数,如果配置错误可能会引起一些不 必要的异常。比如将broker端的message.max.bytes参数配置为 10,而max.request.size 参数配置为 20,那么当我们发送一条大小为 15B 的消息时,生产者客户端就会报出如下的异常:

org . apache . kafka.commo口 . errors.RecordTooLargeExcept工on : The request included a message larger than the max message size the server will accept.

3. retries 和 retry.backoffms

retries 参数用来配置生产者重试的次数,默认值为 0,即在发生异常的时候不进行任何 重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常, 比如网 络 抖动、 leader副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置 retries 大于 0 的值,以此通过 内部重试来恢复而不是一昧地将异常抛给生产者的应用程序。 如果重试 达到设定的 次数 ,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过 重试来解决的,比如消息太大,超过 max . request . size 参数配置的值时,这种方式就不可 行了 。

重试还和另一个参数 retry.backoff.ms 有关,这个参数的默认值为 100, 它用来设定 两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和 retry . backoff.ms 之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时 间,以此来避免生产者过早地放弃重试 。

Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那 么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。对于某些应用 来说,顺序性非常重要,比如 MySQL 的 binlog传输,如果出现错误就会造成非常严重的后果。 如 果将 acks 参数配置为非零值,并且 max . i口 .flight.requests . per . connection 参数 配置为大于 l 的值,那么就会出现错序的现象: 如果第一批次消息写入失败, 而第二批次消息 写入成功,那么生产者会重试发送第一批次的消息, 此时如果第一批次的消息写入成功,那么 这两个批次的消息就出现了错序 。 一般而言,在需要保证消息顺序的场合建议把参数 max.in.flight . requests .per.connection 配置为 1,而不是把 acks 配置为 0, 不过 这样也会影响整体的吞吐。

4. compression.type

这个参数用来指定消息的压缩方式,默认值为“ none”,即默认情况下,消息不会被压缩。

该参数还可以配置为“ gzip” “snappy” 和“ lz4” 。 对消息进行压缩可以极大地减少网络传输 量、降低网络 1/0,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对 时延有一定的要求,则不推荐对消息进行压缩 。

6. linger.ms

这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息( ProducerRecord)加入 ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过 linger .ms 值时发迭出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞 吐量。 这个linger.ms参数与TCP协议中的Nagle算法有异曲同工之妙。

7. receive.buffe「.bytes

这个参数用来设置 Socket 接收消息缓冲区( SO 阻 CBUF)的大小,默认值为 32768 (岛, 即 32阻。如果设置为一l,则使用操作系统的默认值。如果 Producer与 Kafka处于不同的机房, 则可以 适地调大这个参数值 。

8. send.buffer.bytes

这个参数用来设置Socket发送消息缓冲区CSO SNDBUF)的大小,默认值为131072CB),
即 128KB。与 receive . buffer .bytes 参数一样 , 如果设置为 l,则使用操作系统的默认值。

9. request.timeout.ms

这个参数用来配置 Producer等待请求响应的最长时间,默认值为 30000 (ms)。请求超时 之后可以选择进行重试 。注意这个参数需要 比 broker 端参数 replica.lag.time.max.ms 的 值要大 ,这样 可 以减少因客户端重试而引起的消息重复的概率。

还有一些生产者客户端的参数在本节中没有提及,这些 参数同样非常重要 ,它们 需要单独 的章节或场景来描述。部分参数在前面的章节 中 己经提及 ,比如 bootstrap . servers,还有 部分参数会在后面的 章节 中提及,比如 transactional . id。表 2-1 中罗列 了一份详细的参 数列表以供读者参阅。


image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,390评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,821评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,632评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,170评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,033评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,098评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,511评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,204评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,479评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,572评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,341评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,213评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,576评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,893评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,171评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,486评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,676评论 2 335