写在前面:
Presto 是facebook开发的一款开源大数据OLAP引擎,基于内存计算和作业分发,直接使用sql分析,不需要数据预处理,类似于impala和sparksql,而且它本身不保存任何数据,通过连接数据源取数。
在Presto的架构中,主要有几种角色
Coordinator:
Coordinator服务器是用来解析语句,执行计划分析和管理Presto的worker结点。Presto安装必须有一个Coordinator和多个worker。
Coordinator跟踪每个work的活动情况并协调查询语句的执行。 Coordinator为每个查询建立模型,模型包含多个stage,每个stage再转为task分发到不同的worker上执行。Coordinator再做结果的聚合。
Worker:
Worker是负责执行任务和处理数据。
Discovery Server:
通常内嵌于coordinator节点中,也可以单独部署,用于Worker的注册,让Coordinator知道Worker有多少,地址是啥
问题:
前面提到,presto集群是基于内存和作业并行处理来执行分析任务,对于机器不多的单个集群来讲,并发很低,不过这倒是次要的,很多情况下,会出现cpu和内存打满的情形,这就直接影响到了可用性(最主要的原因是穷,机器的硬件都是低配且机器只有三台),而最为明显的瓶颈就首先出现在了Coordinator上,多条查询语句并发执行时,有一些直接被阻塞了,而且内存直接打满,通过jstat命令查看,GC就没停下来过,但是Worker的话,都还好
开始我的设想是,增加一台协调者,使用集群的Discovery Server,然后使用nginx做负载,将请求分流到两台协调者上,以减轻Coordinator压力同时增强并发能力
nginx.conf
upstream backend {
server 10.100.218.110:8090;
server 10.101.126.93:8090;
}
server {
listen 8060;
server_name tt;
location / {
root html;
index index.html index.htm;
proxy_pass http://backend;
proxy_redirect off;
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}
Coordinator-1(最开始的协调者) config:
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8090
discovery-server.enabled=true
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=6GB
query.max-total-memory-per-node=8GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16
Coordinator-2(新加入的协调者) config:
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8090
discovery-server.enabled=false
discovery.uri=http://10.100.218.110:8090
query.max-memory-per-node=4GB
query.max-total-memory-per-node=5GB
query.low-memory-killer.policy=total-reservation
http-server.max-request-header-size=32MB
http-server.max-response-header-size=32MB
task.concurrency=16
但是,出问题了,所有查询都失败,原因是presto-jdbc执行查询请求并不只是发一个http请求就完事儿,而是会有多个请求去发送预处理,赋值,查询队列状况,获取执行结果等等。通过Fiddler抓包的结果也验证了这一点
要想解决这个问题就必须将这些请求路由到同一个Coordinator节点上,我预先设想的是在presto-jdbc驱动发送请求的时候拦截下来,然后在Header里面强行塞线程id,然后nginx再根据每个请求的线程id来求模做路由
后面发现,在应用端做路由也是可行的,就是注入两个数据源,获取连接的时候根据线程id求模来决定走哪个连接
PrestoRouteDataSource (多数据源实现)
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
import java.io.PrintWriter;
import java.sql.*;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.logging.Logger;
public class PrestoRouteDataSource implements DataSource {
private CopyOnWriteArrayList<DataSource> corAddr = new CopyOnWriteArrayList<DataSource>();
public PrestoRouteDataSource(String addr) {
String[] split = addr.split(";");
for (String s : split) {
HikariDataSource hikariDataSource = new HikariDataSource();
hikariDataSource.setJdbcUrl(s);
hikariDataSource.setUsername("root");
hikariDataSource.setDriverClassName("com.facebook.presto.jdbc.PrestoDriver");
hikariDataSource.setPoolName("presto:" + s);
corAddr.add(hikariDataSource);
}
}
private DataSource getPrestoRouteDataSource(){
long sessionId = Thread.currentThread().getId();
String as = String.valueOf(sessionId);
int subIndex = (as.length()>=4)?(as.length()-3):(as.length()-1);
int delSessionId = Integer.parseInt(as.substring(subIndex));
int index = delSessionId % corAddr.size() ;
DataSource dataSource = corAddr.get(index);
return dataSource ;
}
@Override
public Connection getConnection() throws SQLException {
return getPrestoRouteDataSource().getConnection();
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
return getPrestoRouteDataSource().getConnection();
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public PrintWriter getLogWriter() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public int getLoginTimeout() throws SQLException {
throw new UnsupportedOperationException();
}
@Override
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new UnsupportedOperationException();
}
}
数据源配置
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import javax.sql.DataSource;
import java.io.IOException;
import java.util.*;
@Configuration
@MapperScan(basePackages = {"lvye.java.datamiddle.dao.presto"}, sqlSessionFactoryRef = "prestoSqlSessionFactory")
public class PerstoDataSourceConfig {
@Value("${presto.jdbc-url}")
private String url;
@Bean(name = "prestoDataSource")
public DataSource prestoDataSource() {
return new PrestoRouteDataSource(url);
}
@Bean(name = "prestoSqlSessionFactory")
public SqlSessionFactory mallSqlSessionFactory(@Qualifier("prestoDataSource") DataSource mallDataSource)
throws Exception {
final SqlSessionFactoryBean sessionFactory = new SqlSessionFactoryBean();
sessionFactory.setTypeAliasesPackage("lvye.java.datamiddle.model.entity");
sessionFactory.setDataSource(mallDataSource);
sessionFactory.setMapperLocations(resolveMapperLocations());
try {
//开启驼峰命名转换
Objects.requireNonNull(sessionFactory.getObject()).getConfiguration().setMapUnderscoreToCamelCase(true);
sessionFactory.getObject().getConfiguration().setJdbcTypeForNull(null);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return sessionFactory.getObject();
}
private Resource[] resolveMapperLocations() {
ResourcePatternResolver resourceResolver = new PathMatchingResourcePatternResolver();
List<String> mapperLocations = new ArrayList<>();
mapperLocations.add("classpath*:/mapper/presto/*.xml");
List<Resource> resources = new ArrayList<>();
for (String mapperLocation : mapperLocations) {
try {
Resource[] mappers = resourceResolver.getResources(mapperLocation);
resources.addAll(Arrays.asList(mappers));
} catch (IOException e) {
e.printStackTrace();
}
}
return resources.toArray(new Resource[0]);
}
@Bean(name = "prestoTransactionManager")
public DataSourceTransactionManager mallTransactionManager(@Qualifier("prestoDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean(name = "prestoTemplate")
public SqlSessionTemplate prestoJdbcTemplate(@Qualifier("prestoSqlSessionFactory") SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
application.properties
presto.jdbc-url=jdbc:presto://10.100.218.110:8090/kudu/tclydm;jdbc:presto://10.101.126.93:8090/kudu/tclydm
测试1(多协调者)
测试2(单协调者)
性能对比:
(多协调者)总耗时 37.66s 平均耗时:3.13s 最大耗时:6.67s 最小耗时:764ms
(单协调者)总耗时 131.01s 平均耗时:10.91s 最大耗时:15.78s 最小耗时:1.6s
写在后面:presto官方显然也意识到了这个问题,多协调者集群目前加入了路线图,目前版本还没有发布