kafka——Producer API

一、Kafka 核心 API

下图是官方文档中的一个图,形象的描述了能与 Kafka集成的客户端类型


Kafka的五类客户端API类型如下:

  • AdminClient API:允许管理和检测Topic、broker以及其他Kafka实例,与Kafka自带的脚本命令作用类似。
  • Producer API:发布消息到1个或多个Topic,也就是生产者或者说发布方需要用到的API。
  • Consumer API:订阅1个或多个Topic,并处理产生的消息,也就是消费者或者说订阅方需要用到的API。
  • Stream API:高效地将输入流转换到输出流,通常应用在一些流处理场景。
  • Connector API:从一些源系统或应用程序拉取数据到Kafka,如上图中的DB。

本文中,我们将主要介绍 Producer API。

二、生产者客户端的基本架构图

由上图可以看出:KafkaProducer有两个基本线程。

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了
两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。

main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。

  • 主线程:负责消息创建,拦截器,序列化器,分区器等操作,并将消息追加到消息收集器RecoderAccumulator中(这里可以看出拦截器确实在序列化和分区之前执行)。

    • 消息收集器主要的作用是缓存消息,让发送线程可以批量发送,减少网络传输资源消耗提升性能,缓存大小可以通过buffer.memory配置,默认值为32MB,如果生产者发送消息的速度超过发送到服务器的速度,则send()方法要么被阻塞,要么抛出异常,取决于参数max.block.ms,默认值为60000ms
    • 主线程发送的消息被追加到消息累加器的一个双端队列中,消息收集器RecoderAccumulator为每个分区都维护了一个 Deque<ProducerBatch> 类型的双端队列,队列中是ProducerBatch,包含多个ProducerRecord。
    • ProducerBatch 可以暂时理解为是 ProducerRecord 的集合,批量发送有利于提升吞吐量,降低网络影响。
    • 由于生产者客户端使用 java.io.ByteBuffer 在发送消息之前进行消息保存,并维护了一个 BufferPool 实现 ByteBuffer 的复用;该缓存池只针对特定大小( batch.size 指定)的 ByteBuffer进行管理,对于消息过大的缓存,不能做到重复利用。
    • 每次追加一条ProducerRecord消息,会寻找/新建对应的双端队列,从其尾部获取一个ProducerBatch,判断当前消息的大小是否可以写入该批次中。若可以写入则写入;若不可以写入,则新建一个ProducerBatch,判断该消息大小是否超过客户端参数配置 batch.size 的值,不超过,则以 batch.size建立新的ProducerBatch,这样方便进行缓存重复利用;若超过,则以计算的消息大小建立对应的 ProducerBatch ,缺点就是该内存不能被复用了。
  • Sender线程

    • 该线程从消息收集器获取缓存的消息,将其处理为 <Node, List<ProducerBatch> 的形式, Node 表示集群的broker节点。
    • 进一步将<Node, List<ProducerBatch>转化为<Node, Request>形式,此时才可以向服务端发送数据。
    • 在发送之前,Sender线程将消息以 Map<NodeId, Deque<Request>> 的形式保存到 InFlightRequests 中进行缓存,可以通过其获取 leastLoadedNode ,即当前Node中负载压力最小的一个,以实现消息的尽快发出。

相关参数:

  • batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据。
  • linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。

三、Producer API

3.1、导入相关依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

3.2、Producer异步发送演示

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送演示
 */
public static void producerSend(){
    Properties properties = new Properties();
    //kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //ack应答级别
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,"3");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    //RecoderAccumulator缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    //key,value的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    //创建生产者对象
    Producer<String,String> producer = new KafkaProducer<>(properties);
    //消息对象 ProducerRecoder
    for (int i = 0; i < 10; i++) {
        ProducerRecord<String,String> record =
                new ProducerRecord<>(TOPIC_NAME,"key-"+i,"value-"+i);
        producer.send(record);
    }
    producer.close();
}

3.3、Producer异步发送演示

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是
RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果
Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送带回调函数演示
 */
public static void producerSendWithCallback(){
    Properties properties = new Properties();
    //kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //ack应答级别
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,"0");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    //RecoderAccumulator缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    //key,value的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    //创建生产者对象
    Producer<String,String> producer = new KafkaProducer<>(properties);
    //消息对象 ProducerRecoder
    ProducerRecord<String,String> record =
            new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
    producer.send(record, new Callback() {
        //回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e == null) {
                log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
            } else {
                log.error("exception",e);
            }
        }
    });
    producer.close();
}

3.4、Producer异步发送带回调函数和Partition负载均衡

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步发送带回调函数和Partition负载均衡
 */
public static void producerSendWithCallbackAndPartition(){
    Properties properties = new Properties();
    //kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //ack应答级别
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,"0");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    //RecoderAccumulator缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    //key,value的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    //Partition负载均衡
    properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.yibo.kafka.producer.SamplePartition");
    //创建生产者对象
    Producer<String,String> producer = new KafkaProducer<>(properties);
    //消息对象 ProducerRecoder
    ProducerRecord<String,String> record =
            new ProducerRecord<>(TOPIC_NAME,"key-hello","value-hello world");
    producer.send(record, new Callback() {
        //回调函数,该方法会在 Producer 收到 ack 时调用,为异步调用
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e == null) {
                log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
            } else {
                log.error("exception",e);
            }
        }
    });
    producer.close();
}

/**
 * @Description: Partitioner分区接口,以实现自定义的消息分区
 *
 * 默认分区器DefaultPartitioner org.apache.kafka.clients.producer.internals.DefaultPartitioner
 *
 * 如果消息的key为null,此时producer会使用默认的partitioner分区器将消息随机分布到topic的可用partition中。
 * 如果key不为null,并且使用了默认的分区器,kafka会使用自己的hash算法对key取hash值,
 * 使用hash值与partition数量取模,从而确定发送到哪个分区。
 * 注意:此时key相同的消息会发送到相同的分区(只要partition的数量不变化)
 */
public class SamplePartition implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        /**
         *由于我们按key分区,在这里我们规定:key值不允许为null。在实际项目中,key为null的消息*,可以发送到同一个分区。
         */
        if(keyBytes == null) {
            throw new InvalidRecordException("key cannot be null");
        }
        if(((String)key).equals("1")) {
            return 1;
        }
        System.out.println("key: " + key);
        //如果消息的key值不为1,那么使用hash值取模,确定分区。
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3.5、Producer异步阻塞发送演示

由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同
步发送的效果,只需在调用 Future 对象的 get 方发即可。

private static final String TOPIC_NAME = "yibo_topic";

/**
 * Producer异步阻塞发送演示
 */
public static void producerSyncSend() throws Exception {
    Properties properties = new Properties();
    //kafka集群
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //ack应答级别
    properties.put(ProducerConfig.ACKS_CONFIG,"all");
    //重试次数
    properties.put(ProducerConfig.RETRIES_CONFIG,"0");
    //批次大小
    properties.put(ProducerConfig.BATCH_SIZE_CONFIG,"16384");
    //等待时间
    properties.put(ProducerConfig.LINGER_MS_CONFIG,"1");
    //RecoderAccumulator缓冲区大小
    properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
    //key,value的序列化类
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
    //创建生产者对象
    Producer<String,String> producer = new KafkaProducer<>(properties);
    //消息对象 ProducerRecoder
    ProducerRecord<String,String> record = new ProducerRecord<>(TOPIC_NAME,"hello","hello world");
    Future<RecordMetadata> send = producer.send(record);
    RecordMetadata recordMetadata = send.get();
    log.info("send success partition: {}, offset: {}",recordMetadata.partition(),recordMetadata.offset());
    producer.close();
}

四、自定义 Interceptor

4.1、拦截器原理

Producer 拦截器(interceptor)是在 Kafka 0.10 版本被引入的,主要用于实现 clients 端的定制化控制逻辑。

对于 producer 而言,interceptor 使得用户在消息发送前以及 producer 回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer 允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain)。Intercetpor 的实现接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定义的方法包括:

  • 1、configure(configs):
    获取配置信息和初始化数据时调用。

  • 2、onSend(ProducerRecord):
    该方法封装进 KafkaProducer.send 方法中,即它运行在用户主线程中。Producer 确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的 topic 和分区,否则会影响目标分区的计算。

  • 3、onAcknowledgement(RecordMetadata, Exception):
    该方法会在消息从 RecordAccumulator 成功发送到 Kafka Broker 之后,或者在发送过程
    中失败时调用。并且通常都是在 producer 回调逻辑触发之前。onAcknowledgement 运行在producer 的 IO 线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer 的消息发送效率。

  • 4、close:
    关闭 interceptor,主要用于执行一些资源清理工作。

如前所述,interceptor 可能被运行在多个线程中,因此在具体实现时用户需要自行确保
线程安全。另外倘若指定了多个 interceptor,则 producer 将按照指定顺序调用它们,并仅仅是捕获每个 interceptor 可能抛出的异常记录到错误日志中而非在向上传递。这在使用过程中要特别留意。

4.2、 拦截器案例

  • 1、需求:
    实现一个简单的双 interceptor 组成的拦截链。第一个 interceptor 会在消息发送前将时间
    戳信息加到消息 value 的最前部;第二个 interceptor 会在消息发送后更新成功发送消息数或失败发送消息数。
  • 2、案例实操

  • 1)增加时间戳拦截器

public class TimeInterceptor implements ProducerInterceptor<String, String> {

    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        // 创建一个新的 record,把时间戳写入消息体的最前部
        return new ProducerRecord(producerRecord.topic(),
                producerRecord.partition(), producerRecord.timestamp(), producerRecord.key(),
                System.currentTimeMillis() + "," + producerRecord.value().toString());

    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {

    }

    @Override
    public void close() {

    }
}
  • 2)统计发送消息成功和发送失败消息数,并在 producer 关闭时打印这两个计数器
public class CounterInterceptor implements ProducerInterceptor<String, String> {

    private int errorCounter = 0;
    private int successCounter = 0;

    @Override
    public void configure(Map<String, ?> map) {

    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        return null;
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        // 统计成功和失败的次数
        if (e == null) {
            successCounter++;
        } else {
            errorCounter++;
        }

    }

    @Override
    public void close() {
        // 保存结果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}
  • 3)producer 主程序
public class InterceptorProducer {

    private static final String TOPIC_NAME = "yibo_topic";

    public static void main(String[] args) {
        // 1 设置配置信息
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop102:9092");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2 构建拦截链
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.yibo.kafka.producer.TimeInterceptor");
        interceptors.add("com.yibo.kafka.producer.CounterInterceptor");
        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

        Producer<String, String> producer = new KafkaProducer<>(props);

        // 3 发送消息
        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "message" + i);
            producer.send(record);
        }

        // 4 一定要关闭 producer,这样才会调用 interceptor 的 close 方法
        producer.close();
    }
}

五、SpringBoot 集成 Kafka

5.1、添加maven依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.1</version>
</dependency>

5.2、配置 application.properties

# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=192.168.174.128:9092

#=============== provider  =======================
# 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
# 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
spring.kafka.producer.retries=0
# 每次批量发送消息的数量,produce积累到一定数据,一次发送
spring.kafka.producer.batch-size=16384
# produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 设置自动提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#=============== listener  =======================
# 在侦听器容器中运行的线程数。
spring.kafka.listener.concurrency=5
#listner负责ack,每调用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false

5.3、新建Producer

@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    private static final String TOPIC_NAME = "yibo_topic";

    public void send(Object obj) {
        String obj2String = JSONObject.toJSONString(obj);
        log.info("准备发送消息为:{}", obj2String);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_NAME, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info(TOPIC_NAME + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(TOPIC_NAME + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}

参考:
https://www.cnblogs.com/L-Test/p/13443178.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,905评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,140评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,791评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,483评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,476评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,516评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,905评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,560评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,778评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,557评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,635评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,338评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,925评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,898评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,142评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,818评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,347评论 2 342

推荐阅读更多精彩内容