61_Java程序员、缓存架构以及Storm大数据实时计算之间的关系
接下来,我们是要讲解这个商品详情页缓存架构,缓存预热问题和解决方案,缓存热点数据可能导致整个系统崩溃的问题,以及解决方案
缓存,热,预热,热数据
解决方案,和架构设计中,会引入大数据的实时计算的技术,storm
为什么要引入这个storm,难道必须是storm吗?我们后面去讲解那个解决方案的时候再说
java工程师,storm的关系是什么呢,缓存架构和storm的关系
缓存架构和storm的关系,因为有些热点数据相关的一些实时处理的一些方案,比如快速预热,热点数据的实时感知和快速降级,全部要用到storm
因为我们可能需要实时的去计算出热点缓存数据,实时计算,亿级流量,高并发,大量的请求过来
这个时候,你要做一些实时的计算,那么必须涉及到分布式的一些技术,分布式的技术,才能处理高并发,大量的请求
目前在时候计算的领域,最成熟的大数据的技术,就是storm
storm分布式的大数据实时计算的技术/系统
java工程师,我跟storm之间的关系是什么?
1、介绍,我自己本身这么多年,一直在大公司,BAT公司,一线的大互联网公司,我认识的很多的java工程师
java开发和架构,后来开始大数据的架构
大公司里的很多java工程师,都是会用一些大数据的一些技术的,比如storm,或者hbase,或者zookeeper,或者hive,spark
因为在大公司里,容易遇到一些复杂的挑战和场景,比如高并发,海量数据,场景
你做一些java先关的项目,和系统,可能也会遇到这种问题,很多时候,直接用大数据的一些技术,实时计算,你是自己去写个系统,还是用现成的storm
更好的选择时用storm,成熟
我也只是说部分java的人,但是也有很多搞java的工程师就是纯java技术栈
2、java系统跟大数据技术的关系
(1)大数据不仅仅只是大数据工程师要关注的东西
(2)大数据也是Java程序员在构建各类系统的时候一种全新的思维,以及架构理念,比如Storm,Hive,Spark,ZooKeeper,HBase,Elasticsearch,等等
(3)举例说明
Storm:实时缓存热点数据统计->缓存预热->缓存热点数据自动降级
Hive:Hadoop生态栈里面,做数据仓库的一个系统,高并发访问下,海量请求日志的批量统计分析,日报周报月报,接口调用情况,业务使用情况,等等
我所知,在一些大公司里面,是有些人是将海量的请求日志打到hive里面,做离线的分析,然后反过来去优化自己的系统
Spark:离线批量数据处理,比如从DB中一次性批量处理几亿数据,清洗和处理后写入Redis中供后续的系统使用,大型互联网公司的用户相关数据
ZooKeeper:分布式系统的协调,分布式锁,分布式选举->高可用HA架构,轻量级元数据存储
用java开发了分布式的系统架构,你的整套系统拆分成了多个部分,每个部分都会负责一些功能,互相之间需要交互和协调
服务A说,我在处理某件事情的时候,服务B你就别处理了
服务A说,我一旦发生了某些状况,希望服务B你立即感知到,然后做出相应的对策
HBase:海量数据的在线存储和简单查询,替代MySQL分库分表,提供更好的伸缩性
java底层,对应的是海量数据,然后要做一些简单的存储和查询,同时数据增多的时候要快速扩容
mysql分库分表就不太合适了,mysql分库分表扩容,还是比较麻烦的
Elasticsearch:海量数据的复杂检索以及搜索引擎的构建,支撑有大量数据的各种企业信息化系统的搜索引擎,电商/新闻等网站的搜索引擎,等等
mysql的like "%xxxx%",更加合适一些,性能更加好
大家不要说觉得来听课程,就必须每堂课都是代码,代码,代码,就不喜欢听我这些废话
我告诉大家,这些还真不是废话,代码很重要,手写代码,copy。我可能做为一个过来人,很多项目都做过,很多技术都用过,也做过。
比较我的角度,去给大家讲一讲,行业,一些技术领域的问题
62 大白话介绍
这块给大家解释一下,就是说,有些技术我们可能就是简单带着大家去用一下就好了
nginx,java,一般都会一些
kafka,zookeeper,lua,我觉得,那些东西的话,主要是讲解基于他们的一些架构,和解决方案的设计还有开发
redis:跟我们的这个topic是很有关系的,大型缓存架构,高并发高性能高可用的缓存架构的底层支持,redis,细致的去讲解,那块redis技术和知识是本套课程的一个重点
数据库+缓存双写,多级缓存架构,大家重点去理解里面的方案设计和架构思想
热数据的处理,缓存雪崩 --> storm,hystrix
对于这两个技术,都是关键性的会去影响你的热数据,缓存雪崩时的系统可用性和稳定性
对这两个技术,storm,hystrix,都很重要
会类似redis,花费较多的篇幅去给大家讲解一下,让大家可以把这两个技术同时也学习的非常好
正好跟着我们的大的项目实战在走,学完以后,直接可以学以致用,用到我们的系统架构中去
kafka,消息队列,用起来很简单,而且搞java得一般来说,对消息队列都有一些了解吧,而且到了真实的生产环境中,kafka你是可以换成其他的技术,Active MQ,Rabbit MQ,Rocket MQ
zookeeper,分布式锁,分布式锁,搞java一般也会知道一些,zk去做,redis去做锁也是可以的
lua,大家后面真的是要用到lua,觉得课程里讲解的东西不够,可以自己去网上查一些lua的语法可以了,语法是最最简单的
storm,说句实话,在做热数据这块,如果要做复杂的热数据的统计和分析,亿流量,高并发的场景下,我还真觉得,最合适的技术就是storm,没有其他
缓存架构,热数据先关的架构设计,热数据相关的架构中最重要的唯一的可选技术,storm,好好的去讲一下的
hystrix,分布式系统的高可用性的限流,熔断,降级,等等,一些措施,缓存雪崩的方案,限流的技术
讲给Java工程师的史上最通俗易懂Storm教程
讲给Java工程师:我知道你没什么大数据的背景和经验,基础,那么我就把你当做一个大数据小白,主要是java背景和基础
史上最通俗易懂:市面上其他的storm视频课程,或者是一些书籍,我告诉,storm还是挺难的,事务,云里雾里,云里雾里
搞storm大数据的,连这个并行度和流分组的本质它都说不清楚,因为市面上的资料也说不清楚
会把你当做小白,用最最通俗易懂的语言,给你去讲解这块的知识,画图
一、Storm到底是什么?
1、mysql,hadoop与storm
mysql:事务性系统,面临海量数据的尴尬
hadoop:离线批处理
storm:实时计算
2、我们能不能自己搞一套storm?
来一条数据,我理解就算一条,来一条,算一条
坑,海量高并发大数据,高并发的请求数据,分布式的系统,流式处理的分布式系统
如果自己搞一套实时流系统出来,也是可以的,但是。。。。
(1)花费大量的时间在底层技术细节上:如何部署各种中间队列,节点间的通信,容错,资源调配,计算节点的迁移和部署,等等
(2)花费大量的时间在系统的高可用上问题上:如何保证各种节点能够高可用稳定运行
(3)花费大量的时间在系统扩容上:吞吐量需要扩容的时候,你需要花费大量的时间去增加节点,修改配置,测试,等等
5万/s,10万/s,扩容
国内,国产的实时大数据计算系统,唯一做出来的,做得好的,做得影响力特别大,特别牛逼的,就是JStorm,阿里
阿里,技术实力,世界一流,顶尖,国内顶尖,一流
JStorm,clojure编程预压,Java重新写了一遍,Galaxy流式计算的系统,百度,腾讯,也都自己做了,也能做得很好
3、storm的特点是什么?
(1)支撑各种实时类的项目场景:实时处理消息以及更新数据库,基于最基础的实时计算语义和API(实时数据处理领域);对实时的数据流持续的进行查询或计算,同时将最新的计算结果持续的推送给客户端展示,同样基于最基础的实时计算语义和API(实时数据分析领域);对耗时的查询进行并行化,基于DRPC,即分布式RPC调用,单表30天数据,并行化,每个进程查询一天数据,最后组装结果
storm做各种实时类的项目都ok
(2)高度的可伸缩性:如果要扩容,直接加机器,调整storm计算作业的并行度就可以了,storm会自动部署更多的进程和线程到其他的机器上去,无缝快速扩容
扩容起来,超方便
(3)数据不丢失的保证:storm的消息可靠机制开启后,可以保证一条数据都不丢
数据不丢失,也不重复计算
(4)超强的健壮性:从历史经验来看,storm比hadoop、spark等大数据类系统,健壮的多的多,因为元数据全部放zookeeper,不在内存中,随便挂都不要紧
特别的健壮,稳定性和可用性很高
(5)使用的便捷性:核心语义非常的简单,开发起来效率很高
用起来很简单,开发API还是很简单的
63 storm 集群架构
64 Storm 的并行度以及流分组
因为我们在这里,是讲给java工程师的storm教程
所以我期望的场景是,你们所在的公司,基本上来说,已经有大数据团队,有人在维护storm集群
我觉得,对于java工程师来说,先不说精通storm
至少说,对storm的核心的基本原理,门儿清,你都很清楚,集群架构、核心概念、并行度和流分组
接下来,掌握最常见的storm开发范式,spout消费kafka,后面跟一堆bolt,bolt之间设定好流分组的策略
在bolt中填充各种代码逻辑
了解如何将storm拓扑打包后提交到storm集群上去运行
掌握如何能够通过storm ui去查看你的实时计算拓扑的运行现状
你在一个公司里,如果说,需要在你的java系统架构中,用到一些类似storm的大数据技术,如果已经有人维护了storm的集群
那么此时你就可以直接用,直接掌握如何开发和部署即可
但是,当然了,如果说,恰巧没人负责维护storm集群,也没什么大数据的团队,那么你可能需要说再去深入学习一下storm
当然了,如果你的场景不是特别复杂,整个数据量也不是特别大,其实自己主要研究一下,怎么部署storm集群
你自己部署一个storm集群,也ok
好多年前,我第一次接触storm的时候,真的,我觉得都没几个人能彻底讲清楚,用一句话讲清楚什么是并行度,什么是流分组
很多时候,你以外你明白了,其实你不明白
比如我经常面试一些做过storm的人过来,我就问一个问题,就知道它的水深水浅,流分组的时候,数据在storm集群中的流向,你画一下
比如你自己随便设想一个拓扑结果出来,几个spout,几个bolt,各种流分组情况下,数据是怎么流向的,要求具体画出集群架构中的流向
worker,executor,task,supervisor,流的
几乎没几个人能画对,为什么呢,很多人就没搞明白这个并行度和流分组到底是什么
并行度:Worker->Executor->Task,没错,是Task
流分组:Task与Task之间的数据流向关系
Shuffle Grouping:随机发射,负载均衡
Fields Grouping:根据某一个,或者某些个,fields,进行分组,那一个或者多个fields如果值完全相同的话,那么这些tuple,就会发送给下游bolt的其中固定的一个task
你发射的每条数据是一个tuple,每个tuple中有多个field作为字段
比如tuple,3个字段,name,age,salary
{"name": "tom", "age": 25, "salary": 10000} -> tuple -> 3个field,name,age,salary
All Grouping
Global Grouping
None Grouping
Direct Grouping
Local or Shuffle Grouping
65 WordCount
storm核心的基本原理,都了解了
写一下代码,去体验一下,storm的程序是怎么开发的,通过了解了代码之后,再回头,你去看一下之前讲解的一些基本原理,就很清楚了
大数据,入门程序,wordcount,单词计数
你可以认为,storm源源不断的接收到一些句子,然后你需要实时的统计出句子中每个单词的出现次数
(1)搭建工程环境
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>storm-wordcount</artifactId>
<packaging>jar</packaging>
<name>storm-wordcount</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>test/main/java</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.sf</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.dsa</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>META-INF/*.rsa</exclude>
<exclude>META-INF/*.EC</exclude>
<exclude>META-INF/*.ec</exclude>
<exclude>META-INF/MSFTSIG.SF</exclude>
<exclude>META-INF/MSFTSIG.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass></mainClass>
</configuration>
</plugin>
</plugins>
</build>
</project>
(2)编写代码
public class WordCountTopology {
public static class RandomSentenceSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
Random _rand;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}
@Override
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"),
sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")};
final String sentence = sentences[_rand.nextInt(sentences.length)];
_collector.emit(new Values(sentence));
}
protected String sentence(String input) {
return input;
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class SplitSentence implements IRichBolt {
public SplitSentence() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
(4)测试代码
讲了手写了storm wordcount程序
蕴含了很多的知识点
(1)Spout
(2)Bolt
(3)OutputCollector,Declarer
(4)Topology
(5)设置worker,executor,task,流分组
storm的核心基本原理,基本的开发,学会了
storm集群部署,怎么将storm的拓扑扔到storm集群上去跑
66 部署一个storm集群
(1)安装Java 7和Pythong 2.6.6
(2)下载storm安装包,解压缩,重命名,配置环境变量
(3)修改storm配置文件
mkdir /var/storm
conf/storm.yaml
storm.zookeeper.servers:
- "111.222.333.444"
- "555.666.777.888"
storm.local.dir: "/mnt/storm"
nimbus.seeds: ["111.222.333.44"]
slots.ports,指定每个机器上可以启动多少个worker,一个端口号代表一个worker
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
(4)启动storm集群和ui界面
一个节点,storm nimbus >/dev/null 2>&1 &
三个节点,storm supervisor >/dev/null 2>&1 &
一个节点,storm ui >/dev/null 2>&1 &
(5)访问一下ui界面,8080端口