这种方法应该是有问题的.行不通,原因如下:
1)在beginTransaction在恢复或者做快照之前调用,这里返回的是一个mysql的connection对象,在这里事务的载体就是这个connection对象
2)precommit是第一次预提交,如果你看过两阶段的源码你就会发现,这个是在给你传入的那个事物的载体对象做快照,对应你这里的就是mysql的connection,,接下来flink会将这个connection存储到状态后端,connection可不是一个普通对象,是一个数据库的连接,能序列化成功?
3)commit,这个是快照成功了.flink内部已经将你上次的事务的载体对象给保存完毕了,在notifyCheckpointComplete里回调用户的commit方法,通知用户checkpoint成功了
connection根本就不能序列化,这是其一,
其二:即便这种方式可行,也会非常不稳定,因为每一次commit的提交都是一个checkpoin的过程,checkpoint的时间有长有短,如果遇到数据量大,checkpoint的时间长,那就意味着你在beginTransaction创建的这个链接得一直是打开的状态,只到checkpoin完成才可以,那你得设置足够长的connectionTimeOut时间和socketTimeOut时间
其三:.如果flink某次任务失败了,你是从checkpoint中恢复上一个mysql的连接?
IP属地:湖北