Spark开发--Spark SQL--内置函数(十四)

一、窗口函数

  窗口函数是spark sql模块从1.4之后开始支持的,主要用于解决对一组数据进行操作,同时为每条数据返回单个结果,比如计算指定访问数据的均值、计算累加和或访问当前行之前行数据等,这些场景使用普通函数实现是比较困难的。
窗口函数计算的一组行,被称为Frame。每一个被处理的行都有一个唯一的frame相关联。

Spark SQL支持三类窗口函数:排名函数、分析函数和聚合函数。
以下汇总了Spark SQL支持的排名函数和分析函数。
对于聚合函数来说,普通的聚合函数都可以作为窗口聚合函数使用。
spark支持两种方式使用窗口函数:
在SQL语句中的支持的函数中添加OVER语句。例如avg(revenue) OVER (…)
使用DataFrame API在支持的函数调用over()方法。例如rank().over(…)。

  当一个函数被作为窗口函数使用时,需要为该窗口函数定义相关的窗口规范。窗口规范定义了哪些行会包括到给定输入行相关联的帧(frame)中。窗口规范包括三部分:
分区规范:定义哪些行属于相同分区,这样在对帧中数据排序和计算之前相同分区的数据就可以被收集到同一台机器上。如果没有指定分区规范,那么所有数据都会被收集到单个机器上处理。
排序规范:定义同一个分区中所有数据的排序方式,从而确定了给定行在他所属分区中的位置
帧规范:指定哪些行会被当前输入行的帧包括,通过其他行对于当前行的相对位置实现。

  如果使用sql语句的话,PARTITION BY关键字用来为分区规范定义分区表达式、 ORDER BY关键字用来为排序规范定义排序表达式。格式:OVER (PARTITION BY ... ORDER BY ... )。
如果使用DataFrame API的话,API提供了函数来定义窗口规范。实例如下:

import org.apache.spark.sql.expressions.Window
val windowSpec = Window.partitionBy(...).orderBy(...)

为了分区和排序操作,需要定义帧的开始边界、结束边界和帧的类型,这也是一个帧规范的三部分。
一共有五种边界:
UNBOUNDED PRECEDING(分区第一行),
UNBOUNDED FOLLOWING(分区最后一行),
CURRENT ROW 当前行
<value> PRECEDING(当前行之前行)
<value> FOLLOWING(当前行之后行)

有两种帧类型:ROW帧和RANGE帧。
  ROW帧是基于当前输入行的位置的物理偏移量:
比如:CURRENT ROW被用作边界表示当前输入行,<value> PRECEDING和<value> FOLLOWING分别表示出现在当前行之前和之后的行数。
例如:
SQL语句ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING表示一个包括当前行、当前行之前1行和之后1行的帧。

  RANGE帧是基于当前行位置的逻辑偏移。逻辑偏移为当前输入行的排序表达式的值和帧边界行的排序表达式的值之差。也因为这种定义,使用RANGE帧时,只能允许使用单个排序表达式。对于RANGE帧,就边界计算而言,和当前输入行的排序表达式的值相同的行都被认为是相同行。例如:排序表达式为revenue,SQL语句为RANGE BETWEEN 2000 PRECEDING AND 1000 FOLLOWING,则边界为[current revenue value - 2000, current revenue value + 1000]。
使用DataFrame API,可以使用以下方法实现ROW帧和RANGE帧:

Window.partitionBy(...).orderBy(...).rowBetween(start, end)
Window.partitionBy(...).orderBy(...).rangeBetween(start, end)

测试数据:

{"EMPNO": 7369,"ENAME": "SMITH","JOB": "CLERK","MGR": 7902,"HIREDATE": "1980-12-17 00:00:00","SAL": 800.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7499,"ENAME": "ALLEN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-20 00:00:00","SAL": 1600.00,"COMM": 300.00,"DEPTNO": 30}
{"EMPNO": 7521,"ENAME": "WARD","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-02-22 00:00:00","SAL": 1250.00,"COMM": 500.00,"DEPTNO": 30}
{"EMPNO": 7566,"ENAME": "JONES","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-04-02 00:00:00","SAL": 2975.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7654,"ENAME": "MARTIN","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-28 00:00:00","SAL": 1250.00,"COMM": 1400.00,"DEPTNO": 30}
{"EMPNO": 7698,"ENAME": "BLAKE","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-05-01 00:00:00","SAL": 2850.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7782,"ENAME": "CLARK","JOB": "MANAGER","MGR": 7839,"HIREDATE": "1981-06-09 00:00:00","SAL": 2450.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7788,"ENAME": "SCOTT","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1987-04-19 00:00:00","SAL": 1500.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7839,"ENAME": "KING","JOB": "PRESIDENT","MGR": null,"HIREDATE": "1981-11-17 00:00:00","SAL": 5000.00,"COMM": null,"DEPTNO": 10}
{"EMPNO": 7844,"ENAME": "TURNER","JOB": "SALESMAN","MGR": 7698,"HIREDATE": "1981-09-08 00:00:00","SAL": 1500.00,"COMM": 0.00,"DEPTNO": 30}
{"EMPNO": 7876,"ENAME": "ADAMS","JOB": "CLERK","MGR": 7788,"HIREDATE": "1987-05-23 00:00:00","SAL": 1100.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7900,"ENAME": "JAMES","JOB": "CLERK","MGR": 7698,"HIREDATE": "1981-12-03 00:00:00","SAL": 950.00,"COMM": null,"DEPTNO": 30}
{"EMPNO": 7902,"ENAME": "FORD","JOB": "ANALYST","MGR": 7566,"HIREDATE": "1981-12-03 00:00:00","SAL": 3000.00,"COMM": null,"DEPTNO": 20}
{"EMPNO": 7934,"ENAME": "MILLER","JOB": "CLERK","MGR": 7782,"HIREDATE": "1982-01-23 00:00:00","SAL": 1300.00,"COMM": null,"DEPTNO": 10}

以编程的方式使用:

scala> val df=spark.read.json("/root/emp.json")

1. 排名函数

1) rank

为相同组的数据计算排名,如果相同组中排序字段相同,当前行的排名值和前一行相同;如果相同组中排序字段不同,则当前行的排名值为该行在当前组中的行号;因此排名序列会出现间隙。

scala> spark.sql("SELECT deptno,sal,rank() OVER(partition by deptno ORDER BY sal) AS rank from emp").show
+------+------+----+
|deptno|   sal|rank|
+------+------+----+
|    10|1300.0|   1|
|    10|2450.0|   2|
|    10|5000.0|   3|
|    30| 950.0|   1|
|    30|1250.0|   2|
|    30|1250.0|   2|
|    30|1500.0|   4|
|    30|1600.0|   5|
|    30|2850.0|   6|
|    20| 800.0|   1|
|    20|1100.0|   2|
|    20|1500.0|   3|
|    20|2975.0|   4|
|    20|3000.0|   5|
+------+------+----+

说明:
部门30因有两个工资同为:1250.0 所以排名丢失了3。

2) dense_rank

为相同组内数据计算排名,如果相同组中排序字段相同,当前行的排名值和前一行相同;如果相同组中排序字段不同,则当前行的排名值为前一行排名值加1;排名序列不会出现间隙。

scala> spark.sql("SELECT deptno,sal,dense_rank() OVER(partition by deptno ORDER BY sal) AS rank from emp").show
+------+------+----+
|deptno|   sal|rank|
+------+------+----+
|    10|1300.0|   1|
|    10|2450.0|   2|
|    10|5000.0|   3|
|    30| 950.0|   1|
|    30|1250.0|   2|
|    30|1250.0|   2|
|    30|1500.0|   3|
|    30|1600.0|   4|
|    30|2850.0|   5|
|    20| 800.0|   1|
|    20|1100.0|   2|
|    20|1500.0|   3|
|    20|2975.0|   4|
|    20|3000.0|   5|
+------+------+----+

说明:
部门30因有两个工资同为:1250.0 所以排名同为2。

3) percent_rank

该值的计算公式(组内排名-1)/(组内行数-1),如果组内只有1行,则结果为0。

scala> spark.sql("SELECT deptno,sal,percent_rank() OVER(partition by deptno ORDER BY sal) AS rank from emp").show
+------+------+----+
|deptno|   sal|rank|
+------+------+----+
|    10|1300.0| 0.0|
|    10|2450.0| 0.5|
|    10|5000.0| 1.0|
|    30| 950.0| 0.0|
|    30|1250.0| 0.2|
|    30|1250.0| 0.2|
|    30|1500.0| 0.6|
|    30|1600.0| 0.8|
|    30|2850.0| 1.0|
|    20| 800.0| 0.0|
|    20|1100.0|0.25|
|    20|1500.0| 0.5|
|    20|2975.0|0.75|
|    20|3000.0| 1.0|
+------+------+----+

说明:
第1行:10部门:只有一行结果为:0
第2行:10部门: (2-1)/(3-1)=0.5

4) ntile

将组内数据排序然后按照指定的n切分成n个桶,该值为当前行的桶号(桶号从1开始)。

# 指定2个桶
scala> spark.sql("SELECT deptno,sal,ntile(2) OVER(partition by deptno ORDER BY sal) AS rank from emp").show
+------+------+----+
|deptno|   sal|rank|
+------+------+----+
|    10|1300.0|   1|
|    10|2450.0|   1|
|    10|5000.0|   2|
|    30| 950.0|   1|
|    30|1250.0|   1|
|    30|1250.0|   1|
|    30|1500.0|   2|
|    30|1600.0|   2|
|    30|2850.0|   2|
|    20| 800.0|   1|
|    20|1100.0|   1|
|    20|1500.0|   1|
|    20|2975.0|   2|
|    20|3000.0|   2|
+------+------+----+

5)row_number

将组内数据排序后,该值为当前行在当前组内的从1开始的递增的唯一序号值。

scala> spark.sql("SELECT deptno,sal,row_number() OVER(partition by deptno ORDER BY sal) AS rank from emp").show
+------+------+----+
|deptno|   sal|rank|
+------+------+----+
|    10|1300.0|   1|
|    10|2450.0|   2|
|    10|5000.0|   3|
|    30| 950.0|   1|
|    30|1250.0|   2|
|    30|1250.0|   3|
|    30|1500.0|   4|
|    30|1600.0|   5|
|    30|2850.0|   6|
|    20| 800.0|   1|
|    20|1100.0|   2|
|    20|1500.0|   3|
|    20|2975.0|   4|
|    20|3000.0|   5|
+------+------+----+

2. 分析函数

1)cume_dist():

该值的计算公式为:组内小于等于当前行值的行数/组内总行数
计算一个值相对于分区中所有值的位置。

scala> spark.sql("SELECT deptno,sal,CUME_DIST() OVER(ORDER BY deptno) AS rn1 from emp").show
20/04/04 09:34:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+------+------+-------------------+
|deptno|   sal|                rn1|
+------+------+-------------------+
|    10|2450.0|0.21428571428571427|
|    10|5000.0|0.21428571428571427|
|    10|1300.0|0.21428571428571427|
|    20| 800.0| 0.5714285714285714|
|    20|2975.0| 0.5714285714285714|
|    20|1500.0| 0.5714285714285714|
|    20|1100.0| 0.5714285714285714|
|    20|3000.0| 0.5714285714285714|
|    30|1600.0|                1.0|
|    30|1250.0|                1.0|
|    30|1250.0|                1.0|
|    30|2850.0|                1.0|
|    30|1500.0|                1.0|
|    30| 950.0|                1.0|
+------+------+-------------------+

没有分区,所有数据均为一组,总行数为14行,
10值的行:小于等于14的行数为3,因此,3/14=0.21428571428571427
20值的行:小于等于14的行数为8,因此,5+3/14=0.5714285714285714

2)lag

用法:lag(input, [offset, [default]]),计算组内当前行按照排序字段排序的之前offset行的input列的值,如果offset大于当前窗口(组内当前行之前行数)则返回default值,default值默认为null。

scala> spark.sql("SELECT deptno,sal,lag(sal,2) OVER(partition by deptno ORDER BY deptno) AS row from emp").show
+------+------+------+
|deptno|   sal|   row|
+------+------+------+
|    10|2450.0|  null|
|    10|5000.0|  null|
|    10|1300.0|2450.0|
|    30|1600.0|  null|
|    30|1250.0|  null|
|    30|1250.0|1600.0|
|    30|2850.0|1250.0|
|    30|1500.0|1250.0|
|    30| 950.0|2850.0|
|    20| 800.0|  null|
|    20|2975.0|  null|
|    20|1500.0| 800.0|
|    20|1100.0|2975.0|
|    20|3000.0|1500.0|
+------+------+------+

3)lead

用法:lead(input, [offset, [default]]),计算组内当前行按照排序字段排序的之后offset行的input列的值,如果offset大于当前窗口(组内当前行之后行数)则返回default值,default值默认为null。

scala> spark.sql("SELECT deptno,sal,lead(sal,1) OVER(partition by deptno ORDER BY deptno) AS row from emp").show
+------+------+------+
|deptno|   sal|   row|
+------+------+------+
|    10|2450.0|5000.0|
|    10|5000.0|1300.0|
|    10|1300.0|  null|
|    30|1600.0|1250.0|
|    30|1250.0|1250.0|
|    30|1250.0|2850.0|
|    30|2850.0|1500.0|
|    30|1500.0| 950.0|
|    30| 950.0|  null|
|    20| 800.0|2975.0|
|    20|2975.0|1500.0|
|    20|1500.0|1100.0|
|    20|1100.0|3000.0|
|    20|3000.0|  null|
+------+------+------+

3. 聚合函数

1)分组排序

row_number为行号。

scala> spark.sql("SELECT deptno,sal,row_number() OVER(partition by deptno ORDER BY deptno) AS row from emp").show
+------+------+---+                                                             
|deptno|   sal|row|
+------+------+---+
|    10|2450.0|  1|
|    10|5000.0|  2|
|    10|1300.0|  3|
|    30|1600.0|  1|
|    30|1250.0|  2|
|    30|1250.0|  3|
|    30|2850.0|  4|
|    30|1500.0|  5|
|    30| 950.0|  6|
|    20| 800.0|  1|
|    20|2975.0|  2|
|    20|1500.0|  3|
|    20|1100.0|  4|
|    20|3000.0|  5|
+------+------+---+

2)分组sum
scala> spark.sql("SELECT deptno,sal,sum(sal) OVER(partition by deptno ORDER BY deptno) AS row from emp").show
+------+------+------+                                                          
|deptno|   sal|   row|
+------+------+------+
|    10|2450.0|8750.0|
|    10|5000.0|8750.0|
|    10|1300.0|8750.0|
|    30|1600.0|9400.0|
|    30|1250.0|9400.0|
|    30|1250.0|9400.0|
|    30|2850.0|9400.0|
|    30|1500.0|9400.0|
|    30| 950.0|9400.0|
|    20| 800.0|9375.0|
|    20|2975.0|9375.0|
|    20|1500.0|9375.0|
|    20|1100.0|9375.0|
|    20|3000.0|9375.0|
+------+------+------+

3)分组avg
scala> spark.sql("SELECT deptno,sal,avg(sal) OVER(partition by deptno ORDER BY deptno) AS row from emp").show
+------+------+------------------+                                              
|deptno|   sal|               row|
+------+------+------------------+
|    10|2450.0|2916.6666666666665|
|    10|5000.0|2916.6666666666665|
|    10|1300.0|2916.6666666666665|
|    30|1600.0|1566.6666666666667|
|    30|1250.0|1566.6666666666667|
|    30|1250.0|1566.6666666666667|
|    30|2850.0|1566.6666666666667|
|    30|1500.0|1566.6666666666667|
|    30| 950.0|1566.6666666666667|
|    20| 800.0|            1875.0|
|    20|2975.0|            1875.0|
|    20|1500.0|            1875.0|
|    20|1100.0|            1875.0|
|    20|3000.0|            1875.0|
+------+------+------------------+

4)分组count
scala> spark.sql("SELECT deptno,sal,count(deptno) OVER(partition by deptno ORDER BY deptno) AS row from emp").show
+------+------+---+
|deptno|   sal|row|
+------+------+---+
|    10|2450.0|  3|
|    10|5000.0|  3|
|    10|1300.0|  3|
|    30|1600.0|  6|
|    30|1250.0|  6|
|    30|1250.0|  6|
|    30|2850.0|  6|
|    30|1500.0|  6|
|    30| 950.0|  6|
|    20| 800.0|  5|
|    20|2975.0|  5|
|    20|1500.0|  5|
|    20|1100.0|  5|
|    20|3000.0|  5|
+------+------+---+

5)分组min
scala> spark.sql("SELECT deptno,sal,min(sal) OVER(partition by deptno ORDER BY deptno) AS row from emp").show
+------+------+------+
|deptno|   sal|   row|
+------+------+------+
|    10|2450.0|1300.0|
|    10|5000.0|1300.0|
|    10|1300.0|1300.0|
|    30|1600.0| 950.0|
|    30|1250.0| 950.0|
|    30|1250.0| 950.0|
|    30|2850.0| 950.0|
|    30|1500.0| 950.0|
|    30| 950.0| 950.0|
|    20| 800.0| 800.0|
|    20|2975.0| 800.0|
|    20|1500.0| 800.0|
|    20|1100.0| 800.0|
|    20|3000.0| 800.0|
+------+------+------+

6)分组max

同min。

4. 帧规范

currentRow(): 返回表示窗口分区中当前行的特殊帧边界。
unboundedFollowing():返回表示窗口分区中最后一行的特殊帧边界。

scala> spark.sql("SELECT deptno,sal, count(*) OVER(PARTITION BY deptno ORDER BY sal RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as row from emp").show
+------+------+---+
|deptno|   sal|row|
+------+------+---+
|    10|1300.0|  1|
|    10|2450.0|  2|
|    10|5000.0|  3|
|    30| 950.0|  1|
|    30|1250.0|  3|
|    30|1250.0|  3|
|    30|1500.0|  4|
|    30|1600.0|  5|
|    30|2850.0|  6|
|    20| 800.0|  1|
|    20|1100.0|  2|
|    20|1500.0|  3|
|    20|2975.0|  4|
|    20|3000.0|  5|
+------+------+---+

scala> spark.sql("SELECT deptno,sal, max(sal) OVER(PARTITION BY deptno ORDER BY sal ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING) as row from emp").show
+------+------+------+
|deptno|   sal|   row|
+------+------+------+
|    10|1300.0|5000.0|
|    10|2450.0|5000.0|
|    10|5000.0|5000.0|
|    30| 950.0|1250.0|
|    30|1250.0|1500.0|
|    30|1250.0|1600.0|
|    30|1500.0|2850.0|
|    30|1600.0|2850.0|
|    30|2850.0|2850.0|
|    20| 800.0|1500.0|
|    20|1100.0|2975.0|
|    20|1500.0|3000.0|
|    20|2975.0|3000.0|
|    20|3000.0|3000.0|
+------+------+------+

二、混杂(misc)函数

1.crc32(e: Column) 计算CRC32,返回bigint

scala> spark.sql("SELECT crc32('Spark')").show
+----------------------------+
|crc32(CAST(Spark AS BINARY))|
+----------------------------+
|                  1557323817|
+----------------------------+

2.hash(cols: Column*) 计算 hash code,返回int

scala> spark.sql("SELECT hash('Spark')").show
+-----------+
|hash(Spark)|
+-----------+
|  228093765|
+-----------+

3.md5(e: Column) 计算MD5摘要,返回32位,16进制字符串

scala> spark.sql("SELECT md5('Spark')").show(false)
+--------------------------------+
|md5(CAST(Spark AS BINARY))      |
+--------------------------------+
|8cde774d6f7333752ed72cacddb05126|
+--------------------------------+

4.sha1(e: Column) 计算SHA-1摘要,返回40位,16进制字符串

scala> spark.sql("SELECT sha1('Spark')").show(false)
+----------------------------------------+
|sha1(CAST(Spark AS BINARY))             |
+----------------------------------------+
|85f5955f4b27a9a4c2aab6ffe5d7189fc298b92c|
+----------------------------------------+

5.sha2(e: Column, numBits: Int) 计算SHA-1摘要,返回numBits位,16进制字符串。numBits支持224, 256, 384, or 512

scala> spark.sql("SELECT sha2('Spark',256)").show(false)
+----------------------------------------------------------------+
|sha2(CAST(Spark AS BINARY), 256)                                |
+----------------------------------------------------------------+
|529bc3b07127ecb7e53a4dcf1991d9152c24537d919178022b2c42657f79a26b|
+----------------------------------------------------------------+

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容