HDFS文件

一、操作方式

Hadoop支持的文件系统由很多(见下图),HDFS只是其中一种实现。Java抽象类org.apache.hadoop.fs.FileSystem定义了Hadoop中一个文件系统的客户端接口,并且该抽象类有几个具体实现。Hadoop一般使用URI(下图)方案来选取合适的文件系统实例进行交互。

比如想要列出本地文件系统根目录下的文件,可以输入命令 hadoop fs -ls file:///

Hadoop支持的文件系统

特别的,HDFS文件系统的操作可以使用 FsSystem shell 、客户端(http rest api、Java api、C api等)。

FsSystem shell

FsSystem shell 的用法基本同本地shell类似,命令可参考 FsSystem shell

JAVA

Hadoop是用Java写的,通过Java Api(FileSystem类)可以调用大部分Hadoop文件系统的交互操作。更详细的介绍可参考 hadoop Filesystem

HTTP

非Java开发的应用可以使用由WebHDFS协议提供的HTTP REST API,但是HTTP比原生的Java客户端要慢,所以不到万不得已尽量不要使用HTTP传输特大数据。通过HTTP来访问HDFS有两种方法:

  • 直接访问 HDFS守护进程直接服务于来自客户端的HTTP请求
  • 通过代理访问 客户端通常使用DistributedFileSystem API访问HDFS。

两种如图

Http访问

在第一种情况中,namenode和datanode内嵌的web服务作为WebHDFS的端节点运行(是否启用WebHDFS可通过dfs.webhdfs.enabled设置,默认为true)。文件元数据在namenode上,文件读写操作首先被发往namenode,有namenode发送一个HTTP重定向至某个客户端,指示以流的方式传输文件数据的目的或源datanode。

第二种方法依靠一个或多个独立代理服务器通过HTTP访问HDFS。所有集群的网络通信都需要通过代理,因此客户端从来不直接访问namenode或datanode。使用代理后可以使用更严格的防火墙策略和带宽策略。

HttpFs代理提供和WebHDFS相同的HTTP接口,这样客户端能够通过webhdfs URI访问接口。HttpFS代理启动独立于namenode和datanode的守护进程,使用httpfs.sh 脚本,默认在一个不同的端口上监听(14000)。

二、文件读取

数据流解析

下图描述了

读文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。

客户端从HDFS读数据

对上图的解释如下:

  • 客户端首先通过在 FileSystem 上调用 open() 方法打开它想要打开的文件, 对于 HDFS 来说, 就是在 DistributedFileSystem 的实例上调用(第1步)。
  • 之后 DistributedFileSystem 就使用 remote procedure call(RPCs)去呼叫 namenode,去查明组成文件的前几个块的位置(第2步)。对于每一个块,namenode 返回拥有块复本的 datanode 的地址。幸运的是,这些 datanode 会按照与客户端的接近度来排序(接近度是按照集群网络中的拓扑结构来计算的,后面会说到)。如果客户端节点自己就是一个 datanode,而且该节点的存了一个块的复本,客户端就直接从本地 datanode 读取块。
  • DistributedFileSystem 返回一个 FSDataInputStream(支持文件 seek 的输入流)给客户端,客户端就能从流中读取数据了。 FSDataInputStream 中封装了一个管理了 datanode 与 namenode I/O 的 DFSInputStream
  • 然后客户端就调用 read() 方法(第3步)。
  • 存储了文件的前几个块的地址的 DFSInputStream,就会连接存储了第一个块的第一个(最近的) datanode。 然后 DFSInputStream 就通过重复调用 read() 方法,数据就从 datanode 流动到了客户端(第4步)。
  • 当 该 datanode 中最后一个块的读取完成了, DFSInputStream 会关闭与 datanode 的连接,然后为下一块寻找最佳节点(第5步)。这个过程对客户端来说是透明的,在客户端那边看来,就像是只读取了一个连续不断的流。
  • 块是按顺序读的,通过 DFSInputStream 在 datanode 上打开新的连接去作为客户端读取的流。他也将会呼叫 namenode 来取得下一批所需要的块所在的 datanode 的位置(注意刚才说的只是从 namenode 获取前几个块的)。当客户端完成了读取,就在 FSDataInputStream 上调用 close() 方法结束整个流程(第6步)。

在读取过程中, 如果 FSDataInputStream 在和一个 datanode 进行交流时出现了一个错误,他就去试一试下一个最接近的块,他当然也会记住刚才发生错误的 datanode 以至于之后不会再在这个 datanode 上进行没必要的尝试。 DFSInputStream 也会在 datanode 上传输出的数据上核查检查数(checknums).如果损坏的块被发现了,DFSInputStream 就试图从另一个拥有备份的 datanode 中去读取备份块中的数据。

在这个设计中一个重要的方面就是客户端直接从 datanode 上检索数据,并通过 namenode 指导来得到每一个块的最佳 datanode。这种设计允许 HDFS 扩展大量的并发客户端,因为数据传输只是集群上的所有 datanode 展开的。期间,namenode 仅仅只需要服务于获取块位置的请求(块位置信息是存放在内存中,所以效率很高)。如果不这样设计,随着客户端数据量的增长,数据服务就会很快成为一个瓶颈。

集群网络拓扑

我们知道,相对于客户端(之后就是 mapreduce task 了),块的位置有以下可能性:

  • 在客户端所在节点上(0,也就是本地化的)
  • 和客户端不在同一个节点上,但在同一个机架上(2)。
  • 和客户端不在同一个机架上,但是在同一个数据中心里(4)。
  • 就是与客户端不在一个数据中心(6)。

我们认为他们对于客户端的带宽递减,距离递增(括号中表示距离)。示意图如下:

网络拓扑

如果集群中的机器都在同一个机架上,我们无需其他配置,若集群比较复杂,由于hadoop无法自动发现网络拓扑,所以需要额外配置网络拓扑。

读取举例(Java)

基本读取程序,将文件内容输出到console

FileSystemCat

// cc FileSystemCat Displays files from a Hadoop filesystem on standard output by using the FileSystem directly
import java.io.InputStream;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
 
// vv FileSystemCat 
public class FileSystemCat {
 
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    InputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}
// ^^ FileSystemCat

随机读取

// cc FileSystemDoubleCat Displays files from a Hadoop filesystem on standard output twice, by using seek
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
 
// vv FileSystemDoubleCat
public class FileSystemDoubleCat {
 
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    FSDataInputStream in = null;
    try {
      in = fs.open(new Path(uri));
      IOUtils.copyBytes(in, System.out, 4096, false);
      in.seek(0); // go back to the start of the file
      IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
      IOUtils.closeStream(in);
    }
  }
}
// ^^ FileSystemDoubleCat

展开原码

三、文件写入

数据流解析

下图描述了写文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。

写文件

对上图的解释如下:

  • 首先客户端通过在 DistributedFileSystem 上调用 create()方法(第1步)来创建一个文件。

  • DistributedFileSystem 使用 RPC 呼叫 namenode ,让他在文件系统的命名空间上创建一个没有与任何块关联的新文件(第2步), namenode 会执行各种各样的检查以确认文件之前是不存在的,并确认客户端是否拥有创建文件的权限。如果检查通过。 namenode 就会为新文件生成一条记录;否则,文件创建就会失败,客户端会抛出一个 IOException。 成功以后,DistributedFileSystem 会返回一个 FSDataOutputStream给客户端以让他开始写数据。和读流程中一样,FSDataOutputStream 包装了一个 DFSOutputStream,他掌握了与 datanode 与 namenode 的联系。

  • 当客户端开始写数据(第3步),DFSOutputStream 将文件分割成很多很小的数据,然后将每个小块放进一个个包(数据包,包中除了数据还有描述数据用的标识)中, 包们会写进一个名为数据队列(data quence)的内部队列。数据队列被 DataStreamr 消费,他负责要求 namenode 去挑选出适合存储块备份的 datanode 的一个列表(注意,这里是文件的一个块,而不是整个文件)。这个列表会构成一个 pipeline(管线),这里假定备份数为3,所以在 pipeline 中就会有三个 datanode , DataStreamer 将能够组成块的的包先流入 pipeline 中的第一个 datanode ,第一个 datanode 会先存储来到的包,然后继续将所有的包转交到 pipeline 中的第二个 datanode 中。相似的,第二个 datande 也会存储这些包,并将他们转交给 pipeline 中的第三个(最后一个) datanode (第4步)。

  • DFSOutputStream 也会维护一个包们的内部队列,其中也会有所有的数据包,该队列等待 datanode们 的写入确认,所以叫做确认队列(ack quence)。当一个包已经被 pipeline 中的所有 datanode 确认了写如磁盘成功,这个包才会从 确认队列中移除(第5步)。

  • 当客户端完成了数据写入,会在流上调用 close() 方法(第6步),这个行为会将所有剩下的包刷新(flush)进 datanode 中,然后等待确认信息达到后,客户端就联系 namenode 告诉他文件数据已经放好了(第七步)。namenode 也一直知道文件被分成了哪些块(因为在之前是 DataStreamer 请求了块分配),所以现在在成功之前,只需要等待块满足最低限度的备份。

如果在任何一个 datanode 在写入数据的时候失败了,接下来所做的一切对客户端都是透明的:首先, pipeline 被关闭,在确认队列中的剩下的包会被添加进数据队列的起始位置上,以至于在失败的节点下游的任 何节点都不会丢失任何的包。然后与 namenode 联系后,当前在一个好的 datanode 会联系 namenode, 给失败节点上还未写完的块生成一个新的标识ID, 以至于如果这个失败的 datanode 不久后恢复了,这个不完整的块将会被删除。失败节点会从 pipeline 中移除,然后剩下两个好的 datanode 会组成一个的新的 pipeline ,剩下的 这些块的包(也就是刚才放在数据队列队首的包)会继续写进 pipeline中好的 datanode 中。最后,namenode 注意到块备份数小于规定的备份数,他就安排在另一个节点上创建完成备份,直接从已有的块中复制就可以。然后一直到满足了备份数(dfs.replication)。如果有多个节点的写入失败了,如果满足了最小备份数的设置(dfs.namenode.repliction.min),写入也将会成功,然后剩下的备份会被集群异步的执行备份,直到满足了备份数(dfs.replication)。

写入举例(Java)

// cc FileCopyWithProgress Copies a local file to a Hadoop filesystem, and shows progress
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
 
// vv FileCopyWithProgress
public class FileCopyWithProgress {
  public static void main(String[] args) throws Exception {
    String localSrc = args[0];
    String dst = args[1];
     
    InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
     
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(dst), conf);
    OutputStream out = fs.create(new Path(dst), new Progressable() {
      public void progress() {
        System.out.print(".");
      }
    });
     
    IOUtils.copyBytes(in, out, 4096, true);
  }
}
// ^^ FileCopyWithProgress

四、其他文件操作

创建目录

创建目录

public blooean mkdir(Path f) throws IOException

查看文件元数据

// cc ShowFileStatusTest Demonstrates file status information
import static org.junit.Assert.*;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
import java.io.*;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.*;
 
// vv ShowFileStatusTest
public class ShowFileStatusTest {
   
  private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing
  private FileSystem fs;
 
  @Before
  public void setUp() throws IOException {
    Configuration conf = new Configuration();
    if (System.getProperty("test.build.data") == null) {
      System.setProperty("test.build.data", "/tmp");
    }
    cluster = new MiniDFSCluster.Builder(conf).build();
    fs = cluster.getFileSystem();
    OutputStream out = fs.create(new Path("/dir/file"));
    out.write("content".getBytes("UTF-8"));
    out.close();
  }
   
  @After
  public void tearDown() throws IOException {
    if (fs != null) { fs.close(); }
    if (cluster != null) { cluster.shutdown(); }
  }
   
  @Test(expected = FileNotFoundException.class)
  public void throwsFileNotFoundForNonExistentFile() throws IOException {
    fs.getFileStatus(new Path("no-such-file"));
  }
   
  @Test
  public void fileStatusForFile() throws IOException {
    Path file = new Path("/dir/file");
    FileStatus stat = fs.getFileStatus(file);
    assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
    assertThat(stat.isDirectory(), is(false));
    assertThat(stat.getLen(), is(7L));
    assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(), is((short) 1));
    assertThat(stat.getBlockSize(), is(128 * 1024 * 1024L));
    assertThat(stat.getOwner(), is(System.getProperty("user.name")));
    assertThat(stat.getGroup(), is("supergroup"));
    assertThat(stat.getPermission().toString(), is("rw-r--r--"));
  }
   
  @Test
  public void fileStatusForDirectory() throws IOException {
    Path dir = new Path("/dir");
    FileStatus stat = fs.getFileStatus(dir);
    assertThat(stat.getPath().toUri().getPath(), is("/dir"));
    assertThat(stat.isDirectory(), is(true));
    assertThat(stat.getLen(), is(0L));
    assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
    assertThat(stat.getReplication(), is((short) 0));
    assertThat(stat.getBlockSize(), is(0L));
    assertThat(stat.getOwner(), is(System.getProperty("user.name")));
    assertThat(stat.getGroup(), is("supergroup"));
    assertThat(stat.getPermission().toString(), is("rwxr-xr-x"));
  }
   
}
// ^^ ShowFileStatusTest

查看文件状态

// cc ListStatus Shows the file statuses for a collection of paths in a Hadoop filesystem
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
 
// vv ListStatus
public class ListStatus {
 
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
     
    Path[] paths = new Path[args.length];
    for (int i = 0; i < paths.length; i++) {
      paths[i] = new Path(args[i]);
    }
     
    FileStatus[] status = fs.listStatus(paths);
    Path[] listedPaths = FileUtil.stat2Paths(status);
    for (Path p : listedPaths) {
      System.out.println(p);
    }
  }
}
// ^^ ListStatus

文件模式匹配

import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
 
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
 
public class DateRangePathFilter implements PathFilter {
   
  private final Pattern PATTERN = Pattern.compile("^.*/(\\d\\d\\d\\d/\\d\\d/\\d\\d).*$");
   
  private final Date start, end;
 
  public DateRangePathFilter(Date start, Date end) {
    this.start = new Date(start.getTime());
    this.end = new Date(end.getTime());
  }
   
  public boolean accept(Path path) {
    Matcher matcher = PATTERN.matcher(path.toString());
    if (matcher.matches()) {
      DateFormat format = new SimpleDateFormat("yyyy/MM/dd");
      try {
        return inInterval(format.parse(matcher.group(1)));
      } catch (ParseException e) {
        return false;
      }
    }
    return false;
  }
 
  private boolean inInterval(Date date) {
    return !date.before(start) && !date.after(end);
  }
 
}

文件删除

import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
import org.junit.Test;
 
public class FileSystemDeleteTest {
 
  private FileSystem fs;
   
  @Before
  public void setUp() throws Exception {
    fs = FileSystem.get(new Configuration());
    writeFile(fs, new Path("dir/file"));
  }
   
  private void writeFile(FileSystem fileSys, Path name) throws IOException {
    FSDataOutputStream stm = fileSys.create(name);
    stm.close();
  }
   
  @Test
  public void deleteFile() throws Exception {
    assertThat(fs.delete(new Path("dir/file"), false), is(true));
    assertThat(fs.exists(new Path("dir/file")), is(false));
    assertThat(fs.exists(new Path("dir")), is(true));
    assertThat(fs.delete(new Path("dir"), false), is(true));
    assertThat(fs.exists(new Path("dir")), is(false));
  }
 
  @Test
  public void deleteNonEmptyDirectoryNonRecursivelyFails() throws Exception {
    try {
      fs.delete(new Path("dir"), false);
      fail("Shouldn't delete non-empty directory");
    } catch (IOException e) {
      // expected
    }
  }
   
  @Test
  public void deleteDirectory() throws Exception {
    assertThat(fs.delete(new Path("dir"), true), is(true));
    assertThat(fs.exists(new Path("dir")), is(false));
  }
   
}

五、文件压缩

压缩格式

文件压缩有两大好处:

  • 减少存储文件所需要的磁盘空间
  • 加速数据在网络和磁盘上的传输。

Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。

Hadoop中有多种压缩格式、算法和工具,下图列出了常用的压缩方法。

压缩格式

表中的“是否可切分”表示对应的压缩算法是否支持切分,也就是说是否可以搜索数据流的任意位置并进一步往下读取数据,可切分的压缩格式尤其适合MapReduce。

比较

所有的压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间。不同的压缩工具有不同的特性:

  • gzip是一个通用的压缩工具,在空间/时间性能权衡中,居于其他两个压缩方法之间。
  • bzip2的压缩能力强于gzip,但压缩速度更慢一些,尽管bzip2的解压速度比压缩速度快,但仍比其他压缩格式慢。
  • LZO、LZ4和Snappy均优化压缩速度,其速度比gzip快一个数量级,但压缩效率低一点。
  • Snappy和LZ4的压缩速度比LZO高出很多。

更详细的比较如下

1.压缩性能比较


性能

2.优缺点

优缺点

另外使用hadoop原生(native)类库比其他java实现有更快的压缩和解压缩速度。特征比较如下:

特征

使用容器文件格式结合压缩算法也能更好的提高效率。顺序文件、Arvo文件、ORCFiles、Parqurt文件同时支持压缩和切分。

压缩举例(Java)

压缩

// cc PooledStreamCompressor A program to compress data read from standard input and write it to standard output using a pooled compressor
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.util.ReflectionUtils;
 
// vv PooledStreamCompressor
public class PooledStreamCompressor {
 
  public static void main(String[] args) throws Exception {
    String codecClassname = args[0];
    Class<?> codecClass = Class.forName(codecClassname);
    Configuration conf = new Configuration();
    CompressionCodec codec = (CompressionCodec)
      ReflectionUtils.newInstance(codecClass, conf);
    /*[*/Compressor compressor = null;
    try {
      compressor = CodecPool.getCompressor(codec);/*]*/
      CompressionOutputStream out =
        codec.createOutputStream(System.out, /*[*/compressor/*]*/);
      IOUtils.copyBytes(System.in, out, 4096, false);
      out.finish();
    /*[*/} finally {
      CodecPool.returnCompressor(compressor);
    }/*]*/
  }
}
// ^^ PooledStreamCompressor

解压缩

// cc FileDecompressor A program to decompress a compressed file using a codec inferred from the file's extension
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
 
// vv FileDecompressor
public class FileDecompressor {
 
  public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
     
    Path inputPath = new Path(uri);
    CompressionCodecFactory factory = new CompressionCodecFactory(conf);
    CompressionCodec codec = factory.getCodec(inputPath);
    if (codec == null) {
      System.err.println("No codec found for " + uri);
      System.exit(1);
    }
 
    String outputUri =
      CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
 
    InputStream in = null;
    OutputStream out = null;
    try {
      in = codec.createInputStream(fs.open(inputPath));
      out = fs.create(new Path(outputUri));
      IOUtils.copyBytes(in, out, conf);
    } finally {
      IOUtils.closeStream(in);
      IOUtils.closeStream(out);
    }
  }
}
// ^^ FileDecompressor

六、文件序列化

序列化是指将结构化数据转换为字节流以便在网络上传输或写到磁盘进行永久存储。反序列化狮子将字节流转换回结构化对象的逆过程。

序列化用于分布式数据处理的两大领域:进程间通信和永久存储。

对序列化的要求时是格式紧凑(高效使用存储空间)、快速(读写效率高)、可扩展(可以透明地读取老格式数据)且可以互操作(可以使用不同的语言读写数据)。

Hadoop使用的是自己的序列化格式Writable,它绝对紧凑、速度快,但不太容易用java以外的语言进行扩展或使用。

当然,用户也可以使用其他序列化框架或者自定义序列化方式,如Avro框架。

Hadoop内部还使用了Apache ThriftProtocal Buffers 来实现RPC和数据交换。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 198,932评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,554评论 2 375
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 145,894评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,442评论 1 268
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,347评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,899评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,325评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,980评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,196评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,163评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,085评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,826评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,389评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,501评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,753评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,171评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,616评论 2 339

推荐阅读更多精彩内容