代码如下,详细请参见注释。
class Transaction { //class Transaction
//结构体Write
struct Write {
Row row; //行
Column col; //列
string value; //列值
};//Write Struct
vector<Write> writes_;//数据缓存Write
int start_ts_;//事务开始时间
Transaction() : start_ts_(oracle.GetTimestamp()) {} //构造函数,初始化变量start_ts_
/*
输入:Write结构体
输出:无
实现:简单的把Write对象(列值)push到Vector中
*/
void Set(Write w) {//Set函数
writes_.push_back(w);
}
/*
输入:Row-行标识,Column-列标识
输出:value-列值,成功返回true,失败(如没有获取值)返回false
*/
bool Get(Row row, Column c, string* value) {
while (true) {
//Bigtable提供的行级事务
bigtable::Txn T = bigtable::StartRowTransaction(row);
// Check for locks that signal concurrent writes.
// 检查是否存在并发事务在写数据
// 注:SI的特点是写不阻塞读,读不阻塞写,但在这里却需要等待?
// 原因是SI保证读到的是事务开始(start_ts)之前已提交的数据,
// 存在锁意味着写操作未完成且该操作的commit_ts可能在事务开始之前,
// 但需要在写入之后才能知道是否在start_ts之前,因此需要等待
if (T.Read(row, c+"lock", [0, start_ts_])) { //判断[0, start_ts_]内是否存在lock?
// There is a pending lock; try to clean it and wait
// 仍存在lock,等待
BackoffAndMaybeCleanupLock(row, c);
continue;
}
// Find the latest write below our start timestamp.
//读取最近已提交的数据版本
latest write = T.Read(row, c+"write", [0, start_ts_]);
if (!latest_write.found())
//没有数据,返回false
return false; // no data
//从Column+write中获取start_ts
int data_ts = latest_write.start_timestamp();
//获取真正的数据:Row+Column(column+"data")+start_ts
*value = T.Read(row, c+"data", [data_ts, data_ts]);
return true;
}
}
// Prewrite tries to lock cell w, returning false in case of conflict.
// 预写入(理论基础:通过意向表缓存数据,执行延迟更新)
/*
输入:Write结构体,Write主节点
输出:成功返回true,失败返回false
*/
bool Prewrite(Write w, Write primary) {
//获取列
Column c = w.col;
//启动Bigtable行事务
bigtable::Txn T = bigtable::StartRowTransaction(w.row);
// Abort on writes after our start timestamp ...
// 存在比事务启动时间start_ts更大的值,存在ww冲突,按照FUW原则,本事务回滚
if (T.Read(w.row, c+"write", [start_ts_, ∞]))
return false;
// ... or locks at any timestamp.
// 存在锁,说明未完成的写,即存在ww冲突,且其他事务比本事务更"早"获得锁,本事务回滚
if (T.Read(w.row, c+"lock", [0, ∞]))
return false;
//校验完毕,可以写数据
//写入数据:key=Row+Column(data)+start_ts,value=需写入的值
T.Write(w.row, c+"data", start_ts_, w.value);
//上锁,key=Row+Column(lock)+start_ts,value=主节点的行&列
T.Write(w.row, c+"lock", start_ts_,
{primary.row, primary.col}); // The primary’s location.
//执行提交操作
return T.Commit();
}
//提交操作
/*
输入:无
输出:成功返回true,失败返回false
*/
bool Commit() {
// The primary’s location.
// 数组writes_的第一个元素为主节点
Write primary = writes_[0];
// 除第一个元素外,其他为从节点
vector<Write> secondaries(writes_.begin()+1, writes_.end());
//主节点预写入失败
if (!Prewrite(primary, primary))
return false;
//遍历从节点,执行预写入,一个节点不成功则全部失败
for (Write w : secondaries)
if (!Prewrite(w, primary))
return false;
//获取事务提交时间戳
int commit_ts = oracle_.GetTimestamp();
// Commit primary first.
// 主节点首先提交
Write p = primary;
//启动Bigtable事务
bigtable::Txn T = bigtable::StartRowTransaction(p.row);
//谨慎起见,判断是否存在锁(本事务,start_ts唯一),避免重复写入
if (!T.Read(p.row, p.col+"lock", [start_ts_, start_ts_]))
return false; // aborted while working
//写入:key=Row+Column(write)+commit_ts,value=start_ts,实际的值在key=Row+Column(data)+start_ts中
T.Write(p.row, p.col+"write", commit_ts, start_ts_); // Pointer to data written at start_ts_.
//删除锁
T.Erase(p.row, p.col+"lock", commit_ts);
//Bigtable事务提交
if (!T.Commit())
return false; // commit point
// Second phase: write out write records for secondary cells.
//遍历从节点,写key=Row+Column(write)+commit_ts,value=start_ts,同时删除锁
for (Write w : secondaries) {
bigtable::Write(w.row, w.col+"write", commit_ts, start_ts_);
bigtable::Erase(w.row, w.col+"lock", commit_ts);
}
return true;
}
} // class Transaction