FLINK 高性能JDBC多线程插入数据库

···
package com.loongair.linky.app.example;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.loongair.linky.utils.GetSqlUtil1;
import com.loongair.linky.utils.HikariUtil;
import com.ververica.cdc.connectors.oceanbase.OceanBaseSource;
import com.ververica.cdc.connectors.oceanbase.table.StartupMode;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.;
import java.util.concurrent.
;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;

public class LongAirJdbcSinkObToOb {
static String jdbcURL = "jdbc:mysql://10.1.128.113:22883/db_d_ods?useSSL=false&rewriteBatchedStatements=true&allowMultiQueries=true&useServerPrepStmts=true&serverTimezone=UTC";
static String jdbcUser = "uatods@loongair_ob_cx#loongair_ssd:3";
static String jdbcPassword = "UATods123!!";

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Properties properties = new Properties();
    // properties.setProperty("scan.startup.mode", "latest_offset");
    // properties.setProperty("snapshot.locking.mode", "none");
    // properties.setProperty("scan.incremental.snapshot.enabled","true");
    // properties.setProperty("scan.incremental.snapshot.chunk.size","4096");
    // properties.setProperty("scan.snapshot.fetch.size","10000");
    // properties.setProperty("debezium.min.row. count.to.stream.result","10000");
    // MySqlSource<String> stringMySqlSource = MySqlSource.<String>builder()
    //         // .startupOptions(StartupOptions.latest())
    //         .hostname("10.1.147.147")
    //         .username("uatads")
    //         .password("UATads123!")
    //         .port(3308)
    //         .databaseList("db_t_ads") // monitor all tables under inventory database
    //         .tableList("db_t_ads.ads_rd_foc_t2018") // 如果不写则监控库下的所有表,需要使用【库名.表名】
    //         .serverTimeZone("Asia/Shanghai")
    //         .startupOptions(StartupOptions.initial())
    //         .deserializer(new JsonDebeziumDeserializationSchema()) // 自定义返回数据格式
    //         .debeziumProperties(DebeziumProperties.getDebeziumProperties())
    //         .build();

    env.enableCheckpointing(3000);
    String serverTimeZone = "+08:00";
    SourceFunction<String> oceanBaseSource =
            OceanBaseSource.<String>builder()
                    .rsList("10.1.129.154:2882:2881;10.1.129.155:2882:2881;10.1.129.156:2882:2881")
                    .startupMode(StartupMode.LATEST_OFFSET)
                    .username("uathsd@loongair_ob_cx#loongair_ssd:3")
                    .password("UAThsd123!!")

// .username("uatods@loongair_ob_cx#loongair_ssd:3")
// .password("UATods123!!")
.tenantName("loongair_ob_cx")
// .databaseName("ob_db_psg_data_center_test")
// .tableName("flight_info")
// .tableList("ob_db_psg_data_center_test.flight_info,ob_db_psg_data_center_test.market_flight_info,ob_db_psg_data_center_test.passenger_frequent_info,ob_db_psg_data_center_test.vip_boarding_record")
.tableList("ob_db_psg_data_center_test.vip_boarding_record")
.hostname("10.1.128.113")
.port(22883)
.compatibleMode("mysql")
.jdbcDriver("com.mysql.jdbc.Driver")
.logProxyHost("10.1.129.157")
.logProxyPort(2983)
.serverTimeZone(serverTimeZone)
.workingMode("memory")
.deserializer(new OceanBaseDeserializer())
.build();
DataStreamSource<String> OBsource = env.addSource(oceanBaseSource, "flight_info1");
// // 模拟源数据流
// DataStream<String> source = env.fromSource(stringMySqlSource, WatermarkStrategy.noWatermarks(), "mysql-source");

    OBsource.print(">>>>>OBsource");

// env.execute();
DataStream<JSONObject> jsonStream = OBsource.map((MapFunction<String, JSONObject>) value -> {
JSONObject jsonObject = JSON.parseObject(value);
// System.out.println("==="+jsonObject.toJSONString()+"===");
return jsonObject;
});

    // 定义一个滚动窗口,窗口大小为5秒
    DataStream<List<JSONObject>> windowedStream = jsonStream
            // .keyBy((key) -> key.getString("ts_ms")) // 使用你的key选择器
            .keyBy((key) -> key.getString("tableName")) // 使用你的key选择器
            .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
            .aggregate(new AggregateFunction<JSONObject, List<JSONObject>, List<JSONObject>>() {
                @Override
                public List<JSONObject> createAccumulator() {
                    return new ArrayList<>();
                }

                @Override
                public List<JSONObject> add(JSONObject value, List<JSONObject> accumulator) {
                    accumulator.add(value);
                    return accumulator;
                }

                @Override
                public List<JSONObject> getResult(List<JSONObject> accumulator) {
                    return accumulator;
                }

                @Override
                public List<JSONObject> merge(List<JSONObject> a, List<JSONObject> b) {
                    a.addAll(b);
                    return a;
                }
            });

// windowedStream.print(">>>>>>>windowedStream");

    windowedStream.addSink(new RichSinkFunction<List<JSONObject>>() {
        Connection conn = null;


        @Override
        public void open(Configuration parameters) throws Exception {
            conn = HikariUtil.getDruidDataSource();
            conn.setAutoCommit(false);
        }

        @Override
        public void close() throws Exception {
            if (conn != null) {
                conn.close();
            }
        }

        @Override
        public void invoke(List<JSONObject> value, Context context) throws Exception {
            long start = new Date().getTime();

// CopyOnWriteArrayList<CopyOnWriteArrayList<String>> jdbcsql = new CopyOnWriteArrayList<>();
List<String> sqllist = GetSqlUtil1.getSqlByArray(JSON.toJSONString(value));
// System.out.println("sqllist" + sqllist.size());
// 开始时间
long start1 = System.currentTimeMillis();
// 每500条数据开启一条线程
int threadSize = 5000;
// 总数据条数
int dataSize = sqllist.size();
// 线程数
int threadNum = dataSize / threadSize + 1;
// 定义标记,过滤threadNum为整数
boolean special = dataSize % threadSize == 0;
// 创建一个线程池
ExecutorService exec = Executors.newFixedThreadPool(threadNum);
List<Callable<Integer>> tasks = new ArrayList<Callable<Integer>>();
Callable<Integer> task = null;
CopyOnWriteArrayList<String> cutList = null;
final int[] num = {0};
for (int n = 0; n < threadNum; n++) {
if (n == threadNum - 1) {
if (special) {
break;
}
cutList = new CopyOnWriteArrayList(sqllist.subList(threadSize * n, dataSize).toArray());
} else {
cutList = new CopyOnWriteArrayList(sqllist.subList(threadSize * n, threadSize * (n + 1)).toArray());
}
final CopyOnWriteArrayList<String> listStr = cutList;
task = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
AtomicInteger count = new AtomicInteger(0);
listStr.parallelStream().forEach(sql -> {
try (Statement statement = conn.createStatement()) {
System.out.println("sql---->"+sql);
statement.addBatch(sql);
if (count.incrementAndGet() % 1 == 0) {
synchronized (conn) { // Synchronize on the connection to ensure safe batch execution
int[] i = statement.executeBatch();
System.out.println("更新了" + i[0] + "数据");
conn.commit();
}
}
} catch (SQLException e) {
try {
conn.rollback();
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
});
if (count.get() % 500 != 0) {
try (Statement statement = conn.createStatement()) {
int[] i = statement.executeBatch();
System.out.println("外面更新了" + i[0] + "数据");
conn.commit();
}
}

                        return 1;
                    }
                };
                tasks.add(task);
            }
            System.out.println("tasksSize=" + tasks.size());
            List<Future<Integer>> results = exec.invokeAll(tasks);
            for (Future<Integer> future : results) {
                System.out.println("results.size=" + results.size());
                //       System.out.println(future.get());
            }
            // 关闭线程池
            exec.shutdown();
            System.out.println("线程任务执行结束");
            System.err.println("执行任务消耗了 :" + (System.currentTimeMillis() - start1) + "毫秒");

// AtomicInteger count = new AtomicInteger(0);
// sqllist.parallelStream().forEach(sql->{
// try (Statement statement = conn.createStatement()) {
// // If you have parameters in SQL, set them here
// statement.addBatch(sql);
// if (count.incrementAndGet() % 500 == 0) {
// synchronized (conn) { // Synchronize on the connection to ensure safe batch execution
// int[] i = statement.executeBatch();
// System.out.println("更新了"+i[0]+"数据");
// conn.commit();
// count.set(0);
// }
// }
// } catch (SQLException e) {
// throw new RuntimeException(e);
// }
// });
// if (count.get() % 500 != 0) {
// try (Statement statement = conn.createStatement()) {
// int[] i = statement.executeBatch();
// System.out.println("外面更新了"+i[0]+"数据");
// conn.commit();
// }
// }
// long cost = new Date().getTime()-start;
// System.out.println("解析好了"+cost/1000+"秒");
// try {
//// value.forEach(json -> {
//// jdbcsql.add(GetSqlUtil1.getSqlByArray(JSON.toJSONString(json)));
//// });
//// int count = 0;
//// for(List<String> sqllist:jdbcsql){
//// for(String sql:sqllist){
//// count++;
//// try (PreparedStatement pstmt = conn.prepareStatement(sql)){
//// pstmt.addBatch();
//// if(count%500==0){
//// int[] i = pstmt.executeBatch();
//// System.out.println("更新了"+i[0]+"数据");
//// conn.commit();
//// count=0;
//// }
//// }
//// }
//// }
//// if (count > 0) {
//// try (Statement statement = conn.createStatement()) {
//// int[] i = statement.executeBatch();
//// System.out.println("外面更新了"+i[0]+"数据");
//// conn.commit();
//// }
//// }
// // Use a thread-safe structure to accumulate results
//// AtomicInteger count = new AtomicInteger(0);
//// // Using try-with-resources for auto-closing of Statement
//// jdbcsql.parallelStream().forEach(sqlObj -> {
//// sqlObj.parallelStream().forEach(sql->{
//// try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
//// // If you have parameters in SQL, set them here
//// pstmt.addBatch();
//// if (count.incrementAndGet() % 500 == 0) {
//// synchronized (conn) { // Synchronize on the connection to ensure safe batch execution
//// int[] i = pstmt.executeBatch();
//// System.out.println("更新了"+i[0]+"数据");
//// conn.commit();
//// count.set(0);
//// }
//// }
//// } catch (SQLException e) {
//// throw new RuntimeException(e);
//// }
//// });
////
//// });
////
//// if (count.get() % 500 != 0) {
//// try (Statement statement = conn.createStatement()) {
//// int[] i = statement.executeBatch();
//// System.out.println("外面更新了"+i[0]+"数据");
//// conn.commit();
//// }
//// }
//
// } catch (Exception e) {
// conn.rollback(); // Rollback transaction on exception
// e.printStackTrace();
// throw e;
// }
}
}).setParallelism(1);
try {
env.execute("JDBC Sink Example");
} catch (
Exception e) {
throw new RuntimeException(e);
}

}

}

// ------------JdbcSink.sink 示例 -------------------

// SinkFunction<JSONObject> jdbcSink = JdbcSink.sink(
// "INSERT INTO ads_rd_foc_t2001 (FLIGHT_ID, FLIGHT_DATE, FLIGHT_TYPE,
// FLIGHT_NO, AC_TYPE, LAYOUT, AC_REG, DEPARTURE_AIRPORT, ARRIVAL_AIRPORT, HTD,
// STD, ETD, OUT, ACARS_ATD, ATD, HTA, STA, ETA, ACARS_ATA, ATA, INN, BAY,
// BAY_2, ON_BOARD_TIME, CLOSE_DOOR_TIME, OPEN_DOOR_TIME, D_OR_I, P_OR_C,
// ADJUST_TYPE, FLG_DELAY, FLG_VR, FLG_PATCH, FLG_CS, BASE, CARRIER, FILIALE,
// SERIAL, AC_LINK_LINE, CREW_LINK_LINE, DEPA_DIV_AIRPORT, FPL_DIV_AIRPORT1,
// FPL_DIV_AIRPORT2, FLAG_FPL, FLAG_RLS, FLAG_TAKEOFF, FLY_TIMES, FLIGHT_STATUS,
// REMARK, ESTIMATE_SK_TIME, BUSINESS_DYNAMIC_TIME, REMARK_BUSINESS,
// PROVIDER_ID, DIET_TYPE, CREW_PROVIDER_ID, CREW_DIET_TYPE, JW_OPERATOR,
// SELECT_FLG, TOTAL_FUEL, TRIP_FUEL, SLIDE_FUEL, IMPT_FLT, RR_COUNT, DIET_FLAG,
// READ_OK, CHK_STEP, PTD, PTA, DISPATCH_IMPT, TELE_REMARK, CREW_TICKET,
// FLY_DEVICE, ICE_FLAG, WATER_FLAG, CLOSE_CARGODOOR_TIME,
// CLOSE_CARGODOOR_REMARK, REAL_FUEL, REAL_FUEL_MODI, REAL_SEGMENT_FUEL,
// REAL_SEGMENT_FUEL_MODI, PZ_FUEL_FLAG, RLS_FUEL_FLAG, REAL_RLS_FUEL,
// REAL_RLS_FUEL_MODI, FLAG_CREWZL, UNITS, SLIDE_TIME, ROUTE_DIV_AIRPORT,
// CHECK_IN_COUNT, NCKQ_TIME, WCKQ_TIME, CLOSE_DOOR_COUNT, CDM, FLG_DELAY_SF,
// PASSFAST, FLIGHT_DATE1, FLG_DELAY_CG, FLG_DELAY_JS, FLG_DELAY_JG, CDM_FLAG,
// LAST_MODIFY_TIME, SECRECY, CTOT, PTDNEW, PTANEW, OPEN_CARGO_DOOR_TIME,
// PRESET_FUEL, FLG_DELAY_CLOSE_TIME, CLOSE_DELAY_TYPE, TOBT, FRC_PLANLINE_TID)
// VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
// ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
// ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
// ?, ?, ?) ON DUPLICATE KEY UPDATE FLIGHT_ID=VALUES(FLIGHT_ID),
// FLIGHT_DATE=VALUES(FLIGHT_DATE), FLIGHT_TYPE=VALUES(FLIGHT_TYPE),
// FLIGHT_NO=VALUES(FLIGHT_NO), AC_TYPE=VALUES(AC_TYPE), LAYOUT=VALUES(LAYOUT),
// AC_REG=VALUES(AC_REG), DEPARTURE_AIRPORT=VALUES(DEPARTURE_AIRPORT),
// ARRIVAL_AIRPORT=VALUES(ARRIVAL_AIRPORT), HTD=VALUES(HTD), STD=VALUES(STD),
// ETD=VALUES(ETD), OUT=VALUES(OUT), ACARS_ATD=VALUES(ACARS_ATD),
// ATD=VALUES(ATD), HTA=VALUES(HTA), STA=VALUES(STA), ETA=VALUES(ETA),
// ACARS_ATA=VALUES(ACARS_ATA), ATA=VALUES(ATA), INN=VALUES(INN),
// BAY=VALUES(BAY), ON_BOARD_TIME=VALUES(ON_BOARD_TIME),
// CLOSE_DOOR_TIME=VALUES(CLOSE_DOOR_TIME),
// OPEN_DOOR_TIME=VALUES(OPEN_DOOR_TIME), D_OR_I=VALUES(D_OR_I),
// P_OR_C=VALUES(P_OR_C), ADJUST_TYPE=VALUES(ADJUST_TYPE),
// FLG_DELAY=VALUES(FLG_DELAY), BASE=VALUES(BASE), CARRIER=VALUES(CARRIER),
// FILIALE=VALUES(FILIALE), SERIAL=VALUES(SERIAL),
// AC_LINK_LINE=VALUES(AC_LINK_LINE), CREW_LINK_LINE=VALUES(CREW_LINK_LINE),
// FPL_DIV_AIRPORT1=VALUES(FPL_DIV_AIRPORT1),
// FPL_DIV_AIRPORT2=VALUES(FPL_DIV_AIRPORT2), FLAG_FPL=VALUES(FLAG_FPL),
// FLAG_RLS=VALUES(FLAG_RLS), FLAG_TAKEOFF=VALUES(FLAG_TAKEOFF),
// FLIGHT_STATUS=VALUES(FLIGHT_STATUS),
// ESTIMATE_SK_TIME=VALUES(ESTIMATE_SK_TIME), SELECT_FLG=VALUES(SELECT_FLG),
// TOTAL_FUEL=VALUES(TOTAL_FUEL), TRIP_FUEL=VALUES(TRIP_FUEL),
// SLIDE_FUEL=VALUES(SLIDE_FUEL), RR_COUNT=VALUES(RR_COUNT),
// READ_OK=VALUES(READ_OK), CHK_STEP=VALUES(CHK_STEP), PTD=VALUES(PTD),
// PTA=VALUES(PTA), TELE_REMARK=VALUES(TELE_REMARK),
// FLY_DEVICE=VALUES(FLY_DEVICE),
// CLOSE_CARGODOOR_TIME=VALUES(CLOSE_CARGODOOR_TIME),
// REAL_SEGMENT_FUEL=VALUES(REAL_SEGMENT_FUEL),
// PZ_FUEL_FLAG=VALUES(PZ_FUEL_FLAG), RLS_FUEL_FLAG=VALUES(RLS_FUEL_FLAG),
// REAL_RLS_FUEL=VALUES(REAL_RLS_FUEL), FLAG_CREWZL=VALUES(FLAG_CREWZL),
// SLIDE_TIME=VALUES(SLIDE_TIME), CHECK_IN_COUNT=VALUES(CHECK_IN_COUNT),
// NCKQ_TIME=VALUES(NCKQ_TIME), WCKQ_TIME=VALUES(WCKQ_TIME),
// FLIGHT_DATE1=VALUES(FLIGHT_DATE1), LAST_MODIFY_TIME=VALUES(LAST_MODIFY_TIME),
// SECRECY=VALUES(SECRECY), FLG_DELAY_CLOSE_TIME=VALUES(FLG_DELAY_CLOSE_TIME),
// CLOSE_DELAY_TYPE=VALUES(CLOSE_DELAY_TYPE);\n",
// (ps, t) -> {
// String[] fields = t.keySet().toArray(new String[0]);
// for (int i = 0; i < fields.length; i++) {
// if (Objects.isNull(t.get(fields[i]))) {
// ps.setNull(i + 1, Types.VARCHAR); // Adjust type as needed
// } else {
// ps.setObject(i + 1, t.get(fields[i])); // Will automatically call the proper
// ps.set* method based on the type of value
// }
// }
// },
// new JdbcExecutionOptions.Builder()
// .withBatchSize(5000) // 按照 500 条数据进行批处理
// .build(),
// new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
// .withUrl(jdbcURL)
//// .withDriverName("org.h2.Driver") // 使用合适的 JDBC 驱动
// .withUsername(jdbcUser)
// .withPassword(jdbcPassword)
// .build()
// );

// // 自定义触发器 (未使用)
// public static class TimeOrCountTrigger extends Trigger<Object, TimeWindow> {
//
// private static final long serialVersionUID = 1L;
//
// private final long maxCount;
// private final long intervalMillis;
//
// private final ValueStateDescriptor<Long> countDescriptor = new ValueStateDescriptor<>("count", Long.class, 0L);
//
// private TimeOrCountTrigger(long maxCount, long intervalMillis) {
// this.maxCount = maxCount;
// this.intervalMillis = intervalMillis;
// }
//
// @Override
// public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx)
// throws Exception {
// ValueState<Long> count = ctx.getPartitionedState(countDescriptor);
// long currentCount = count.value() + 1;
// count.update(currentCount);
//
// if (currentCount >= maxCount) {
// count.update(0L);
// return TriggerResult.FIRE;
// }
//
// long nextFireTimestamp = window.getStart() + intervalMillis;
//
// if (nextFireTimestamp <= ctx.getCurrentProcessingTime()) {
// return TriggerResult.FIRE;
// } else {
// ctx.registerProcessingTimeTimer(nextFireTimestamp);
// }
//
// return TriggerResult.CONTINUE;
// }
//
// @Override
// public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
// return TriggerResult.FIRE;
// }
//
// @Override
// public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
// return TriggerResult.CONTINUE;
// }
//
// @Override
// public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
// ctx.getPartitionedState(countDescriptor).clear();
// ctx.deleteProcessingTimeTimer(window.getStart() + intervalMillis);
// }
//
// public static TimeOrCountTrigger create(long maxCount, Duration interval) {
// return new TimeOrCountTrigger(maxCount, interval.toMillis());
// }
// }

···

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容