上文提到,使用kudu等列式存储将数据以update模式写入kudu.
下面说一下java操作kudu的相关demo。java操作kudu在git上有相关demo,而spark操作kudu并没有。cloudera官网的操作中只提到了scala版本。本文列举java操作kudu的全示例,仅供入门参考。(痛苦的是sparksql查询kudu的java实现,官方没有示例,google也不好用)
1)pom依赖
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.5.0-cdh5.13.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-client-tools -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client-tools</artifactId>
<version>1.5.0-cdh5.13.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kudu/kudu-spark2 -->
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>1.6.0</version>
</dependency>
本文用的是cloudera版本,添加:
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
2)功能列表:
使用kuduClient创建表;
使用kuduClient添加数据;
使用kuduClient更新数据;
使用kuduClient查询数据;
使用kuduClient删除表;
使用sparksql查询数据;
使用spark---kuduContext判断表存在
ps:sparksql查询数据在cloudera官网只有scala版本。google也难找到java版的具体写法。查看源码,实际上通过format来指定包路径,制定的路径下包含实现了sparksql的DefaultSource即可。如spark.kudu包中存在DefaultSource类便可以被sparksql识别。
举一反三,其他的库也可以通过此方式访问。同时要扩展集成一个可以供sparksql查询的库也可以通过此方式实现。
package org.apache.kudu.spark.kudu
@org.apache.yetus.audience.InterfaceStability.Unstable
class DefaultSource() extends scala.AnyRef with org.apache.spark.sql.sources.RelationProvider with org.apache.spark.sql.sources.CreatableRelationProvider with org.apache.spark.sql.sources.SchemaRelationProvider {
val TABLE_KEY : java.lang.String = { /* compiled code */ }
val KUDU_MASTER : java.lang.String = { /* compiled code */ }
val OPERATION : java.lang.String = { /* compiled code */ }
val FAULT_TOLERANT_SCANNER : java.lang.String = { /* compiled code */ }
val SCAN_LOCALITY : java.lang.String = { /* compiled code */ }
def defaultMasterAddrs : scala.Predef.String = { /* compiled code */ }
override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String]) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, mode : org.apache.spark.sql.SaveMode, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String], data : org.apache.spark.sql.DataFrame) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
override def createRelation(sqlContext : org.apache.spark.sql.SQLContext, parameters : scala.Predef.Map[scala.Predef.String, scala.Predef.String], schema : org.apache.spark.sql.types.StructType) : org.apache.spark.sql.sources.BaseRelation = { /* compiled code */ }
}
3)代码示例:
import jdk.nashorn.internal.ir.annotations.Ignore;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import org.apache.kudu.spark.kudu.KuduContext;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @ClassName: KuduUtil
* @Description:用于操作kudu的示例代码
* @author jason.li
* @date 2018年1月11日 下午3:45:06
*/
@Ignore
public class KuduUtil {
private static final String KUDU_MASTER = "10.1.0.20:7051";
private static String tableName = "TestKudu";
@Test
public void kuduCreateTableTest(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
try {
List<ColumnSchema> columns = new ArrayList(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.STRING)
.key(true)
.build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING)
.build());
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add("key");
Schema schema = new Schema(columns);
client.createTable(tableName, schema,
new CreateTableOptions().setRangePartitionColumns(rangeKeys));
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
client.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void kuduSaveTest(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
try{
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
System.out.println("-------start--------"+System.currentTimeMillis());
for (int i = 30000; i < 31000; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addString(0, i+"");
row.addString(1, "aaa");
OperationResponse operationResponse = session.apply(insert);
}
System.out.println("-------end--------"+System.currentTimeMillis());
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
client.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void kuduUpdateTest(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
try {
KuduTable table = client.openTable(tableName);
KuduSession session = client.newSession();
Update update = table.newUpdate();
PartialRow row = update.getRow();
row.addString("key", 4+"");
row.addString("value", "value " + 10);
OperationResponse operationResponse = session.apply(update);
System.out.print(operationResponse.getRowError());
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
client.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void kuduSearchTest(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
try {
KuduTable table = client.openTable(tableName);
List<String> projectColumns = new ArrayList<>(1);
projectColumns.add("value");
KuduScanner scanner = client.newScannerBuilder(table)
.setProjectedColumnNames(projectColumns)
.build();
while (scanner.hasMoreRows()) {
RowResultIterator results = scanner.nextRows();
while (results.hasNext()) {
RowResult result = results.next();
System.out.println(result.getString(0));
}
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
client.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void kuduDelTabletest(){
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTER).build();
try {
client.deleteTable(tableName);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void searchBysparkSql(){
SparkSession sparkSession = getSparkSession();
List<StructField> fields = Arrays.asList(
DataTypes.createStructField("key", DataTypes.StringType, true),
DataTypes.createStructField("value", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
Dataset ds = sparkSession.read().format("org.apache.kudu.spark.kudu").
schema(schema).option("kudu.master","10.1.0.20:7051").option("kudu.table","TestKudu").load();
ds.registerTempTable("abc");
sparkSession.sql("select * from abc").show();
}
@Test
public void checkTableExistByKuduContext(){
SparkSession sparkSession = getSparkSession();
KuduContext context = new KuduContext("10.1.0.20:7051",sparkSession.sparkContext());
System.out.println(tableName +" is exist = "context.tableExists(tableName));
}
public SparkSession getSparkSession(){
SparkConf conf = new SparkConf().setAppName("test")
.setMaster("local[*]")
.set("spark.driver.userClassPathFirst", "true");
conf.set("spark.sql.crossJoin.enabled", "true");
SparkContext sparkContext = new SparkContext(conf);
SparkSession sparkSession = SparkSession.builder().sparkContext(sparkContext).getOrCreate();
return sparkSession;
}
}