HBase——批量装载 Bulk load

概述

很多时候,我们需要将外部的数据导入到HBase集群中,例如:将一些历史的数据导入到HBase做备份。可以使用HBase的Java API,通过put方式可以将数据写入到HBase中,也可以通过MapReduce编写代码将HDFS中的数据导入到HBase。但这些方式都是基于HBase的原生API方式进行操作的。这些方式有一个共同点,就是需要与HBase连接,然后进行操作。HBase服务器要维护、管理这些连接,以及接受来自客户端的操作,会给HBase的存储、计算、网络资源造成较大消耗。此时,在需要将海量数据写入到HBase时,通过Bulk load(大容量加载)的方式,会变得更高效。可以这么说,进行大量数据操作,Bulk load是必不可少的。

HBase的数据最终是需要持久化到HDFS。HDFS是一个文件系统,那么数据可定是以一定的格式存储到里面的。例如:Hive我们可以以ORC、Parquet等方式存储。而HBase也有自己的数据格式,那就是HFile。Bulk Load就是直接将数据写入到StoreFile(HFile)中,从而绕开与HBase的交互,HFile生成后,直接一次性建立与HBase的关联即可。使用BulkLoad,绕过了Write to WAL,Write to MemStore及Flush to disk的过程。

更多可以参考官方对Bulk load的描述:https://hbase.apache.org/book.html#arch.bulk.load

一、使用java API的方式

java API中的put操作可以将数据导入到hbase中 其中包含单条和批量导入两种方式

@Test
public void test5() throws IOException {

    // 获取Hbase配置文件的对象
    // HBaseConfiguration conf=(HBaseConfiguration) HBaseConfiguration.create();
    Configuration conf = HBaseConfiguration.create();
    // 设置conf的zk访问路径
    conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
    // 创建hbase连接
    Connection conn = ConnectionFactory.createConnection(conf);
    System.out.println(conn);
    // 获取dml的句柄
    // 一个Htable对象代表一个表
    HTable table = (HTable) conn.getTable(TableName.valueOf("test1"));

    // 数据导入 重点****************
    // 插入单条数据 Put对象是封装需要插入的数据,每一条数据都要封装一个普通对象
    Put put = new Put("rk001".getBytes());
    // 参数1是列簇 参数2 是列 参数3 是值
    put.addColumn("info1".getBytes(), "age".getBytes(), "100".getBytes());
    table.put(put);
}


// 批量数据导入 list
// 先将插入的数据放在list集合(也就是放在内存中)并没有提交,等放置完成之后一起提交,这种情况有可能出现内存溢出,因为list集合太大的话就Juin占满内存
@Test
public void test6() throws IOException {
    long start = System.currentTimeMillis();
    // 获取Hbase配置文件的对象
    // HBaseConfiguration conf=(HBaseConfiguration) HBaseConfiguration.create();
    Configuration conf = HBaseConfiguration.create();
    // 设置conf的zk访问路径
    conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
    // 创建hbase连接
    Connection conn = ConnectionFactory.createConnection(conf);
    System.out.println(conn);
    // 获取dml的句柄
    // 一个Htable对象代表一个表
    HTable table = (HTable) conn.getTable(TableName.valueOf("test1"));
    // 创建list
    List<Put> list = new ArrayList<>();
    for (int i = 0; i < 10000; i++) {
        Put put = new Put(("rk" + i).getBytes());
        put.addColumn("info1".getBytes(), "age".getBytes(), ("" + i).getBytes());
        list.add(put);
    }
    table.put(list);

    long end = System.currentTimeMillis();
    System.out.println("用时:" + (end - start));
}


// 利用本地缓存批量数据导入,本地缓存是基于磁盘的,不会占用太多的内存,但是这种方式是没有list集合的方法速度快
@Test
public void test7() throws IOException {
    long start = System.currentTimeMillis();
    // 获取Hbase配置文件的对象
    // HBaseConfiguration conf=(HBaseConfiguration) HBaseConfiguration.create();
    Configuration conf = HBaseConfiguration.create();
    // 设置conf的zk访问路径
    conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
    // 创建hbase连接
    Connection conn = ConnectionFactory.createConnection(conf);
    System.out.println(conn);
    // 获取dml的句柄
    // 一个Htable对象代表一个表
    HTable table = (HTable) conn.getTable(TableName.valueOf("test1"));
    // 设置是否需要自动刷新自动提交put对象,默认true,默认一条数据就会提交一次
    // 将参数改为false 不会立即提交,达到我们设定的值才会提交
    table.setAutoFlushTo(false);
    for (int i = 0; i < 10000; i++) {
        Put put = new Put(("rk" + i).getBytes());
        put.addColumn("info1".getBytes(), "age".getBytes(), ("" + i).getBytes());
        // 这时候不会自动提交到hbase了 提交到本地缓存了
        table.put(put);
        // 如果设置缓存的大小一般就不用设置指定条数提交了,但是这两种方式注意最后提交一次
        table.setWriteBufferSize(10 * 1024 * 1024);// 这是设置缓存的大小,
        if (i % 3000 == 0) {
            table.flushCommits();
        }
    }
    table.flushCommits();
    long end = System.currentTimeMillis();
    System.out.println("用时:" + (end - start));
}

二、使用MAPREDUCE JOB的方式进行导入

具体的导入的方式可以参考下面的博客
https://blog.csdn.net/CHANGGUOLONG/article/details/90732931
这篇博客详细介绍了Hbase的Mapreduce操作,并有相关案例

三、采用BULKLOAD的方式进行导入

在put数据时会先将数据的更新操作信息和数据信息写入WAL,在写入到WAL后,数据就会被放到MemStore中,当MemStore满后数据就会被flush到磁盘(即形成HFile文件),在这过程涉及到的flush,split,compaction等操作都容易造成节点不稳定,数据导入慢,耗费资源等问题,在海量数据的导入过程极大的消耗了系统性能,避免这些问题最好的方法就是使用BlukLoad的方式来加载数据到HBase中。

首先明白一点:Hbase中的内容再hdfs中是以Hfile文件格式进行存储的, HBase中每张Table在根目录(/HBase)下用一个文件夹存储,Table名为文件夹名,在Table文件夹下每个Region同样用一个文件夹存储,每个Region文件夹下的每个列族也用文件夹存储,而每个列族下存储的就是一些HFile文件,HFile就是HBase数据在HFDS下存储格式,其整体目录结构如下:

/hbase/data/default/<tbl_name>/<region_id>/<cf>/<hfile_id>

3.1 Bulk load MapReduce程序开发

Bulk load的流程主要分为两步:

  • 通过MapReduce准备好数据文件(Store Files)
  • 加载数据文件到HBase

3.2 银行转账记录海量冷数据存储案例

银行每天都产生大量的转账记录,超过一定时期的数据,需要定期进行备份存储。本案例,在MySQL中有大量转账记录数据,需要将这些数据保存到HBase中。因为数据量非常庞大,所以采用的是Bulk Load方式来加载数据。

  • 项目组为了方便数据备份,每天都会将对应的转账记录导出为CSV文本文件,并上传到HDFS。我们需要做的就将HDFS上的文件导入到HBase中。
  • 因为我们只需要将数据读取出来,然后生成对应的Store File文件。所以,我们编写的MapReduce程序,只有Mapper,而没有Reducer。

3.2.1 数据集

字段 字段名称
id ID
code 流水单号
rec_account 收款账户
rec_bank_name 收款银行
rec_name 收款人姓名
pay_account 付款账户
pay_name 付款人姓名
pay_comments 转账附言
pay_channel 转账渠道
pay_way 转账方式
status 转账状态
timestamp 转账时间
money 转账金额

3.2.2 项目准备工作

HBase中创建银行转账记录表

create_namespace "ITCAST_BANK"
# disable "TRANSFER_RECORD"
# drop "TRANSFER_RECORD"
create "ITCAST_BANK:TRANSFER_RECORD", { NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => "HexStringSplit"}

3.2.3 导入POM依赖

<dependencies>

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-mapreduce</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-auth</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.5</version>
    </dependency>
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.6</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
                <target>1.8</target>
                <source>1.8</source>
            </configuration>
        </plugin>
    </plugins>
</build>

3.2.4 导入配置文件

将 core-site.xml、hbase-site.xml、log4j.properties三个配置文件拷贝到resources目录中。

如果添加下面属性可以运行的话,可以不用导入上面配置文件

//创建配置文件对象
Configuration conf=new Configuration();
//设置zk的
conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");

3.2.5 确保Windows环境变量配置正确

需要装一个windows版本Hadoop客户端

  • HADOOP_HOME


  • HADOOP_USER_NAME


3.2.6 编写实体类

实现步骤:

  • 创建实体类TransferRecord
  • 添加一个parse静态方法,用来将逗号分隔的字段,解析为实体类
  • 使用以下数据测试解析是否成功
7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0

参考代码:

public class TransferRecord {
    private String id;
    private String code;
    private String rec_account;
    private String rec_bank_name;
    private String rec_name;
    private String pay_account;
    private String pay_name;
    private String pay_comments;
    private String pay_channel;
    private String pay_way;
    private String status;
    private String timestamp;
    private String money;

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getCode() {
        return code;
    }

    public void setCode(String code) {
        this.code = code;
    }

    public String getRec_account() {
        return rec_account;
    }

    public void setRec_account(String rec_account) {
        this.rec_account = rec_account;
    }

    public String getRec_bank_name() {
        return rec_bank_name;
    }

    public void setRec_bank_name(String rec_bank_name) {
        this.rec_bank_name = rec_bank_name;
    }

    public String getRec_name() {
        return rec_name;
    }

    public void setRec_name(String rec_name) {
        this.rec_name = rec_name;
    }

    public String getPay_account() {
        return pay_account;
    }

    public void setPay_account(String pay_account) {
        this.pay_account = pay_account;
    }

    public String getPay_name() {
        return pay_name;
    }

    public void setPay_name(String pay_name) {
        this.pay_name = pay_name;
    }

    public String getPay_comments() {
        return pay_comments;
    }

    public void setPay_comments(String pay_comments) {
        this.pay_comments = pay_comments;
    }

    public String getPay_channel() {
        return pay_channel;
    }

    public void setPay_channel(String pay_channel) {
        this.pay_channel = pay_channel;
    }

    public String getPay_way() {
        return pay_way;
    }

    public void setPay_way(String pay_way) {
        this.pay_way = pay_way;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(String timestamp) {
        this.timestamp = timestamp;
    }

    public String getMoney() {
        return money;
    }

    public void setMoney(String money) {
        this.money = money;
    }

    @Override
    public String toString() {
        return "TransferRecord{" +
                "id='" + id + '\'' +
                ", code='" + code + '\'' +
                ", rec_account='" + rec_account + '\'' +
                ", rec_bank_name='" + rec_bank_name + '\'' +
                ", rec_name='" + rec_name + '\'' +
                ", pay_account='" + pay_account + '\'' +
                ", pay_name='" + pay_name + '\'' +
                ", pay_comments='" + pay_comments + '\'' +
                ", pay_channel='" + pay_channel + '\'' +
                ", pay_way='" + pay_way + '\'' +
                ", status='" + status + '\'' +
                ", timestamp='" + timestamp + '\'' +
                ", money='" + money + '\'' +
                '}';
    }

    public static TransferRecord parse(String line) {
        TransferRecord transferRecord = new TransferRecord();
        String[] fields = line.split(",");

        transferRecord.setId(fields[0]);
        transferRecord.setCode(fields[1]);
        transferRecord.setRec_account(fields[2]);
        transferRecord.setRec_bank_name(fields[3]);
        transferRecord.setRec_name(fields[4]);
        transferRecord.setPay_account(fields[5]);
        transferRecord.setPay_name(fields[6]);
        transferRecord.setPay_comments(fields[7]);
        transferRecord.setPay_channel(fields[8]);
        transferRecord.setPay_way(fields[9]);
        transferRecord.setStatus(fields[10]);
        transferRecord.setTimestamp(fields[11]);
        transferRecord.setMoney(fields[12]);

        return transferRecord;
    }

    public static void main(String[] args) {
        String str = "7e59c946-b1c6-4b04-a60a-f69c7a9ef0d6,SU8sXYiQgJi8,6225681772493291,杭州银行,丁杰,4896117668090896,卑文彬,老婆,节日快乐,电脑客户端,电子银行转账,转账完成,2020-5-13 21:06:92,11659.0";
        TransferRecord tr = parse(str);

        System.out.println(tr);
    }
}

3.2.7 构建读取数据的Mapper

HBase提供了两个类来专门对MapReduce支持:

  • ImmutableBytesWritable:对应rowkey
  • MapReduceExtendedCell:对应 列 → 值(键值对)

实现步骤:

  • 1、创建一个BankRecordMapper的类继承Mapper类,Mapper的泛型为
    • a)输入key:LongWritable
    • b)输入value:Text
    • c)输出key:ImmutableBytesWritable
    • d)输出value:MapReduceExtendedCell
  • 2、将Mapper获取到Text文本行,转换为TransferRecord实体类
  • 3、从实体类中获取ID,并转换为rowkey
  • 4、使用KeyValue类构建单元格,每个需要写入到表中的字段都需要构建出来单元格
  • 5、使用context.write将输出输出
    • a)构建输出key:new ImmutableBytesWrite(rowkey)
    • b)构建输出的value:new MapReduceExtendedCell(keyvalue对象)

参考代码:

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MapReduceExtendedCell;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BankRecordMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, MapReduceExtendedCell> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // HBase需要有rowkey,列名 => 值
        TransferRecord transferRecord = TransferRecord.parse(value.toString());
        String rowkey = transferRecord.getId();

        // 列蔟
        byte[] cf = Bytes.toBytes("C1");
        byte[] colId = Bytes.toBytes("id");
        byte[] colCode = Bytes.toBytes("code");
        byte[] colRec_account = Bytes.toBytes("rec_account");
        byte[] colRec_bank_name = Bytes.toBytes("rec_bank_name");
        byte[] colRec_name = Bytes.toBytes("rec_name");
        byte[] colPay_account = Bytes.toBytes("pay_account");
        byte[] colPay_name = Bytes.toBytes("pay_name");
        byte[] colPay_comments = Bytes.toBytes("pay_comments");
        byte[] colPay_channel = Bytes.toBytes("pay_channel");
        byte[] colPay_way = Bytes.toBytes("pay_way");
        byte[] colStatus = Bytes.toBytes("status");
        byte[] colTimestamp = Bytes.toBytes("timestamp");
        byte[] colMoney = Bytes.toBytes("money");

        KeyValue idKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getId()));
        KeyValue codeKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getCode()));
        KeyValue rec_accountKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_account()));
        KeyValue rec_bank_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_bank_name()));
        KeyValue rec_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getRec_name()));
        KeyValue pay_accountKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_account()));
        KeyValue pay_nameKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_name()));
        KeyValue pay_commentsKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_comments()));
        KeyValue pay_channelKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_channel()));
        KeyValue pay_wayKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getPay_way()));
        KeyValue statusKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getStatus()));
        KeyValue timestampKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getTimestamp()));
        KeyValue moneyKeyValue = new KeyValue(Bytes.toBytes(rowkey), cf, colId, Bytes.toBytes(transferRecord.getMoney()));

        ImmutableBytesWritable rowkeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowkey));
        context.write(rowkeyWritable, new MapReduceExtendedCell(idKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(codeKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(rec_accountKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(rec_bank_nameKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(rec_nameKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_accountKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_nameKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_commentsKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_channelKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(pay_wayKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(statusKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(timestampKeyValue));
        context.write(rowkeyWritable, new MapReduceExtendedCell(moneyKeyValue));
    }
}

3.2.8 编写驱动类

分析

  • MapReduce执行时,需要读取HBase表的region相关信息,故需要获取HBase的表

实现步骤

  • 1.使用HBaseConfiguration.create()加载配置文件
  • 2.创建HBase连接
  • 3.获取HTable
  • 4.构建MapReduce JOB
    • a)使用Job.getInstance构建一个Job对象
    • b)调用setJarByClass设置要执行JAR包的class
    • c)调用setInputFormatClass为TextInputFormat.class
    • d)设置MapperClass
    • e)设置输出键Output Key Class
    • f)设置输出值Output Value Class
    • g)设置输入输出到HDFS的路径,输入路径/bank/input,输出路径/bank/output
      • FileInputFormat.setInputPaths
      • FileOutputFormat.setOutputPath
    • h)使用connection.getRegionLocator获取HBase Region的分布情况
    • i)使用HFileOutputFormat2.configureIncrementalLoad配置HFile输出
  • 5.调用job.waitForCompletion执行MapReduce程序
public class BulkloadDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 加载配置文件
        Configuration configuration = HBaseConfiguration.create();

        // 配置JOB
        Job instance = Job.getInstance(configuration);
        instance.setJarByClass(BulkloadDriver.class);
        instance.setMapperClass(BankRecordMapper.class);

        // 配置输入
        instance.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(instance, new Path("hdfs://node1.itcast.cn:8020/bank/input"));

        // 配置输出
        FileOutputFormat.setOutputPath(instance, new Path("hdfs://node1.itcast.cn:8020/bank/output"));
        instance.setOutputKeyClass(ImmutableBytesWritable.class);
        instance.setOutputValueClass(MapReduceExtendedCell.class);

        // 配置HFileoutputFormat2
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table table = connection.getTable(TableName.valueOf("ITCAST_BANK:TRANSFER_RECORD"));
        // 获取表的Region检索对象
        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf("ITCAST_BANK:TRANSFER_RECORD"));
        HFileOutputFormat2.configureIncrementalLoad(instance, table, regionLocator);

        // 执行job
        if (instance.waitForCompletion(true)) {
            System.out.println("任务执行成功!");
        }
        else {
            System.out.println("任务执行失败!");
            System.exit(1);
        }
    }
}

参考:
https://blog.csdn.net/CSDNGuoYuying/article/details/128803798

https://www.freesion.com/article/44581297343/

https://www.cnblogs.com/smartloli/p/9501887.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容