有这样的一个业务场景-客户端通过接口访问impala Daemon,impala做查询并返回数据给到客户端;
下面通过impala jdbc访问服务方式来介绍客户端调用接口访问impala场景
访问实例前,会做kerberos认证; 通过后就允许访问相关服务
在实施方案前,假设读者已经基本熟悉以下技术 (不细说)
- Java,maven
- impala, hdfs,kerberos
方案实施
- 把kdc服务端krb5.conf拷贝到本地工程目录
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = WONHIGH.COM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 24h
renew_lifetime = 7d
forwardable = true
[realms]
WONHIGH.COM = {
#注意这里最好改成IP,因为你部署工程的机器有可能并不知道host对应的ip
kdc = 172.17.194.20
admin_server = 172.17.194.20
}
- 生成的keytab文件并拷贝都工程目录下
kadmin.local: xst -norandkey -k wms_dev.keytab wms_dev@WONHIGH.COM
-
工程目录
- 然后就是代码了,不多说,直接上 (头晕的直接拉到最下面看效果即可)
- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>deng.yb</groupId>
<artifactId>impalaJdbc</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>impalaJdbc</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<hive.version>2.5.42</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.cloudera</groupId>
<artifactId>impala-jdbc41</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
</project>\
- 主类
package impala.kerberos;
import impala.conf.KbsConfiguration;
import impala.kerberos.callback.CallBack;
import impala.utils.Tools;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.text.MessageFormat;
import org.apache.hadoop.security.UserGroupInformation;
public class KBImpalaJBDC {
private static String JDBC_DRIVER = "com.cloudera.impala.jdbc41.Driver";
private static String CONNECTION_URL = "jdbc:impala://{0}:21050/;AuthMech=1;KrbRealm={1};KrbHostFQDN={0};KrbServiceName=impala";
private static String SECURITY_KRB5_CONF = "java.security.krb5.conf";
private static String HADOOP_SECURITY_AUTH = "hadoop.security.authentication";
private static String DEFAULT_REALM = "WONHIGH.COM";
private String user;
private String realm;
private String krb5ConfDest = "krb5.conf";
private String keytabDest;
static {
try {
Class.forName(JDBC_DRIVER);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public KBImpalaJBDC(String deamonHost, String realm) {
this.realm = realm;
CONNECTION_URL = MessageFormat
.format(CONNECTION_URL, deamonHost, realm);
}
public KBImpalaJBDC(String deamonHost) {
this(deamonHost, DEFAULT_REALM);
}
public KBImpalaJBDC user(String user) {
this.user = user;
return self();
}
public KBImpalaJBDC krb5Dest(String krb5ConfDest) {
this.krb5ConfDest = krb5ConfDest;
return self();
}
public KBImpalaJBDC keytabDest(String keytabDest) {
this.keytabDest = keytabDest;
return self();
}
public Object runWithKrbs(final String sql,final CallBack func) {
if (null == user || user.length() == 0) {
throw new RuntimeException("用户不能为空!");
}
System.out.println("通过JDBC连接访问Kerberos环境下的Impala");
// 登录Kerberos账号
try {
System.setProperty(SECURITY_KRB5_CONF,
Tools.getPath(krb5ConfDest));
UserGroupInformation.setConfiguration(KbsConfiguration
.newInstance().setPro(HADOOP_SECURITY_AUTH,
"Kerberos"));
UserGroupInformation.loginUserFromKeytab(
user,
Tools.getPath(keytabDest == null?(user.replace(realm, "") + ".keytab"):keytabDest));
UserGroupInformation logUser = UserGroupInformation.getLoginUser();
if (null == logUser) {
throw new RuntimeException("登录用户为空!");
}
System.out.println(UserGroupInformation.getCurrentUser() + "------"
+ logUser );
return logUser.doAs(new PrivilegedAction<Object>() {
public Object run() {
Connection connection = null;
ResultSet rs = null;
PreparedStatement ps = null;
try {
//Class.forName(JDBC_DRIVER);
connection = DriverManager
.getConnection(CONNECTION_URL);
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
if (null == func) {
return null;
} else {
return func.deal(rs);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
if (rs != null) {
rs.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return null;
}
});
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
private KBImpalaJBDC self() {
return this;
}
}
- 工具类
package impala.utils;
public class Tools {
public static String getPath(String fileName) {
if (null == fileName || fileName.length() == 0) {
throw null;
}
return currentLoader().getResource(fileName).getPath();
}
public static ClassLoader currentLoader() {
return Thread.currentThread().getContextClassLoader();
}
}
- 辅助类和接口
package impala.conf;
import org.apache.hadoop.conf.Configuration;
public class KbsConfiguration extends Configuration {
public static KbsConfiguration newInstance() {
return new KbsConfiguration();
}
public Configuration setPro(String name, String value) {
super.set(name, value);
return this;
}
}
package impala.kerberos.callback;
public interface CallBack {
Object deal (Object obj);
}
方案验证
- 测试类
package impalaJdbc.testCase;
import impala.kerberos.KBImpalaJBDC;
import impala.kerberos.callback.CallBack;
import java.sql.ResultSet;
/**
* Hello world!
*
*/
public class App
{
public static void main( String[] args )
{
KBImpalaJBDC jdbc = new KBImpalaJBDC("bi-slave1");
Object obj = jdbc.user("wms_dev@WONHIGH.COM")
.krb5Dest("krb5.conf")
.keytabDest("wms_dev.keytab")
.runWithKrbs("select count(1) from gtp.ods_item;",new CallBack(){
public Object deal(Object obj) {
try {
if (obj instanceof ResultSet) {
ResultSet result = (ResultSet) obj;
StringBuilder builder = new StringBuilder();
while (result.next()) {
builder.append(result.getString(1)+"\n");
}
return builder.toString();
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
});
System.out.println((obj!=null && obj instanceof java.lang.String)?obj.toString():"");
}
}
-
代码结果
-
直接登陆服务查询结果
结果一致!
遗留问题
- 有经验的开发会发现一个问题;就是代码里面写死访问同一个impala实例,并发量一大会不会导致impala Daemon服务罢工。答案是肯定的!
- 解决思路:实现软均衡负载,
- 具体方案;在客户端与服务端直接搭建 HAProxy服务, 监听相应接口,分发请求;以下是通过HAProxy实现impala均衡负载方案
均衡方案实施前,假设满足以下条件
- impala服务正常
- 集成kerberos正常
HAProxy安装和配置
yum -y install haproxy
- 启动与停止HAProxy服务
service haproxy start
service haproxy stop
chkconfig haproxy on
- 配置Impala负载均衡
mv /etc/haproxy/haproxy.cfg /etc/haproxy/haproxy.cfg.bak
vi /etc/haproxy/haproxy.cfg
#---------------------------------------------------------------------
# Example configuration for a possible web application. See the
# full configuration options online.
#
# http://haproxy.1wt.eu/download/1.4/doc/configuration.txt
#
#---------------------------------------------------------------------
#---------------------------------------------------------------------
# Global settings
#---------------------------------------------------------------------
global
log 127.0.0.1 local2
chroot /var/lib/haproxy
pidfile /var/run/haproxy.pid
maxconn 4000
user haproxy
group haproxy
daemon
# turn on stats unix socket
stats socket /var/lib/haproxy/stats
#---------------------------------------------------------------------
# common defaults that all the 'listen' and 'backend' sections will
# use if not designated in their block
#---------------------------------------------------------------------
defaults
mode http
log global
option httplog
option dontlognull
#option http-server-close
#option forwardfor except 127.0.0.0/8
option redispatch
retries 3
timeout http-request 10s
timeout queue 1m
timeout connect 10s
timeout client 1m
timeout server 1m
timeout http-keep-alive 10s
timeout check 10s
maxconn 3000
listen stats
bind 0.0.0.0:1080
mode http
option httplog
maxconn 5000
stats refresh 30s
stats uri /stats
listen impalashell
bind 0.0.0.0:25003
mode tcp
option tcplog
balance leastconn
server bi-slave1 bi-slave1:21000 check
server bi-slave2 bi-slave2:21000 check
server bi-slave3 bi-slave3:21000 check
listen impalajdbc
bind 0.0.0.0:25004
mode tcp
option tcplog
balance leastconn
server bi-slave1 bi-slave1:21050 check
server bi-slave2 bi-slave2:21050 check
server bi-slave3 bi-slave3:21050 check
- 启动HAProxy服务
service haproxy restart
-
查看是否启动正常
impala 配置
- 保存重启
Impala Shell测试
-
打开两个窗口,同时访问impala-shell -i bi-master:25003
- 发现请求会分发到不同impala daemon上,证明均衡负载配置成功
-
同理:java jdbc连接把Connect url的节点改为bi-master:25004即可
hue配置
-
impala均衡负载配置后,进入impala查询会有可能报以下错
-
解决办法,在hue配置中修改
- 重启hue服务即可