42.Storm drpc及kafka

DRPC (Distributed RPC) remote procedure call
分布式远程过程调用

DRPC 是通过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。
DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。
(其实,从客户端的角度来说,DPRC 与普通的 RPC 调用并没有什么区别。)

DRPC设计目的:
为了充分利用Storm的计算能力实现高密度的并行实时计算。
(Storm接收若干个数据流输入,数据在Topology当中运行完成,然后通过DRPC将结果进行输出。)

客户端通过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每个函数调用都标记了一个唯一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 连接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。


图片1.png

定义DRPC拓扑:
方法1:
通过LinearDRPCTopologyBuilder (该方法也过期,不建议使用)
该方法会自动为我们设定Spout、将结果返回给DRPC Server等,我们只需要将Topology实现



方法2:
直接通过普通的拓扑构造方法TopologyBuilder来创建DRPC拓扑
需要手动设定好开始的DRPCSpout以及结束的ReturnResults


图片3.png

运行模式:
1、本地模式
图片4.png

运行模式:
2、远程模式(集群模式)

修改配置文件conf/storm.yaml
drpc.servers:
- "node1“

启动DRPC Server
bin/storm drpc &

通过StormSubmitter.submitTopology提交拓扑


图片5.png

kafka

kafka是消息队列

一、Kafka简介

Kafka是一个分布式的消息队列系统(Message Queue)。

官网:<u>https://kafka.apache.org/</u>

图片1.png

kafka集群有多个Broker服务器组成,每个类型的消息被定义为topic。
同一topic内部的消息按照一定的key和算法被分区(partition)存储在不同的Broker上。
消息生产者producer和消费者consumer可以在多个Broker上生产/消费topic

概念理解:
Topics and Logs:
Topic即为每条发布到Kafka集群的消息都有一个类别,topic在Kafka中可以由多个消费者订阅、消费。
每个topic包含一个或多个partition(分区),partition数量可以在创建topic时指定,每个分区日志中记录了该分区的数据以及索引信息。如下图:


图片2.png

Kafka只保证一个分区内的消息有序,不能保证一个主题的不同分区之间的消息有序。如果你想要保证所有的消息都绝对有序可以只为一个主题分配一个分区。
分区会给每个消息记录分配一个顺序ID号(偏移量), 能够唯一地标识该分区中的每个记录。Kafka集群保留所有发布的记录,不管这个记录有没有被消费过,Kafka提供相应策略通过配置从而对旧数据处理。


图片3.png

实际上,每个消费者唯一保存的元数据信息就是消费者当前消费日志的位移位置。位移位置是由消费者控制,即、消费者可以通过修改偏移量读取任何位置的数据。

Ø Distribution -- 分布式

Ø Producers -- 生产者

指定topic来发送消息到Kafka Broker

Ø Consumers -- 消费者

根据topic消费相应的消息

二、Kafka集群部署

集群规划

Zookeeper集群共三台服务器,分别为:node06、node07、node08。

Kafka集群共三台服务器,分别为:node06、node07、node08。

1、Zookeeper集群准备

kafka是一个分布式消息队列,需要依赖ZooKeeper,请先安装好zk集群。

Zookeeper集群安装步骤略。

2、安装Kafka

下载压缩包(官网地址:<u>http://kafka.apache.org/downloads.html</u>

解压:

tar zxvf kafka_2.10-0.9.0.1.tgz -C /opt/sxt

mv kafka_2.10-0.9.0.1/ kafka

修改配置文件:config/server.properties


图片4.png

图片5.png

核心配置参数说明:
broker.id: broker集群中唯一标识id,0、1、2、3依次增长(broker即Kafka集群中的一台服务器)
注:
当前Kafka集群共三台节点,分别为:node1、node2、node3。对应的broker.id分别为0、1、2。
zookeeper.connect: zk集群地址列表

将当前node1服务器上的Kafka目录同步到其他node2、node3服务器上:
scp -r /opt/kafka/ node2:/opt
scp -r /opt/kafka/ node3:/opt

修改node2、node3上Kafka配置文件中的broker.id(分别在node2、3服务器上执行以下命令修改broker.id)
sed -i -e 's/broker.id=./broker.id=1/' /opt/kafka/config/server.properties
sed -i -e 's/broker.id=.
/broker.id=2/' /opt/kafka/config/server.properties

3、启动Kafka集群
A、启动Zookeeper集群。
B、启动Kafka集群。
分别在三台服务器上执行以下命令启动:
bin/kafka-server-start.sh config/server.properties

4、测试
创建话题
(kafka-topics.sh --help查看帮助手册)
创建topic:
bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 3 --topic test

(参数说明:
--replication-factor:指定每个分区的复制因子个数,默认1个
--partitions:指定当前创建的kafka分区数量,默认为1个
--topic:指定新建topic的名称)

查看topic列表:
bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --list

查看“test”topic描述:
bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --describe --topic test


图片6.png

创建生产者:
bin/kafka-console-producer.sh --broker-list node06:9092,node07:9092,node08:9092 --topic test

创建消费者:
bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic test

注:
查看帮助手册:
bin/kafka-console-consumer.sh help

三、Flume & Kafka
1、Flume安装
Flume安装流程:
解压jar包
mv conf/flume-env.sh.template flume-env.sh
vi flume-env.sh java环境变量
./bin flume-ng version
/conf/下 创建配置文件fk.conf内容如下:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node06
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = node06:9092,node07:9092,node08:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、Flume****+****Kafka

启动zk集群

A、启动Kafka集群。

bin/kafka-server-start.sh config/server.properties

B、配置Flume集群,并启动Flume集群。

bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console

3、测试

Ø 分别启动Zookeeper、Kafka、Flume集群。

zkServer.sh start

bin/kafka-server-start.sh config/server.properties

bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console

Ø 创建topic:(不用)

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1--topic testflume

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1--topic LogError

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --list

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1--topic LogError

bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic LogError

Ø 启动消费者:

bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic testflume

启动生产者

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1 --topic mylog_cmcc

查看topic列表:

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --list

启动消费者

bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --from-beginning --topic mylog_cmcc

bin/kafka-console-consumer.sh --zookeeper node06:2181,node07:2181,node08:2181 --topic mylog_cmcc

Ø 运行“RpcClientDemo”代码,通过rpc请求发送数据到Flume集群。

Flume中source类型为AVRO类型,此时通过Java发送rpc请求,测试数据是否传入Kafka。

其中,Java发送Rpc请求Flume代码示例如下:

(参考Flume官方文档:<u>http://flume.apache.org/FlumeDeveloperGuide.html</u>

import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
import java.nio.charset.Charset;

/**
 * Flume官网案例
 * http://flume.apache.org/FlumeDeveloperGuide.html 
 * @author root
 */
public class RpcClientDemo {
    
    public static void main(String[] args) {
        MyRpcClientFacade client = new MyRpcClientFacade();
        // Initialize client with the remote Flume agent's host and port
        client.init("node1", 41414);

        // Send 10 events to the remote Flume agent. That agent should be
        // configured to listen with an AvroSource.
        String sampleData = "Hello Flume!";
        for (int i = 0; i < 10; i++) {
            client.sendDataToFlume(sampleData);
            System.out.println("发送数据:" + sampleData);
        }

        client.cleanUp();
    }
}

class MyRpcClientFacade {
    private RpcClient client;
    private String hostname;
    private int port;

    public void init(String hostname, int port) {
        // Setup the RPC connection
        this.hostname = hostname;
        this.port = port;
        this.client = RpcClientFactory.getDefaultInstance(hostname, port);
        // Use the following method to create a thrift client (instead of the
        // above lin

四、Storm****&****Kafka

官网地址:

<u>http://storm.apache.org/about/integrates.html</u>

五、flume+kafka+spout

bin/kafka-topics.sh --zookeeper node06:2181,node07:2181,node08:2181 --create --replication-factor 2 --partitions 1 --topic LogError

六。kafka安装实战


传输并解压



修改配置文件



修改zk配置

传输文件到其他节点
5.png

修改broker的id分别为1,2,3


6.png
7.png

开启zk




开启后输入数据。产生生产者


11.png

另一边是消费者,接受数据,可以看到数据是分区有序,因为刚开始,数据一窝蜂进入

之后数据有序,生产者输入,消费者产生相应输出

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,271评论 5 466
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,725评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,252评论 0 328
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,634评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,549评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,985评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,471评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,128评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,257评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,233评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,235评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,940评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,528评论 3 302
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,623评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,858评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,245评论 2 344
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,790评论 2 339

推荐阅读更多精彩内容

  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,339评论 0 9
  • kafka安装目录下的bin目录包含了很多运维可操作的shell脚本,列举如下: 接下来详细说明每个脚本的使用方法...
    阿飞的博客阅读 9,697评论 5 15
  • 转载自:https://www.cnblogs.com/saneri/p/8762168.html 主机规划: 1...
    Hmcf阅读 361评论 0 0
  • 我主要使用的代码编辑器是vscode,这边便专门拉出一个文档,记录一下,关于平时我使用vscode的心得记录等等....
    嗨姑娘_大个子阅读 615评论 0 0
  • 今天上午参加东区教研室组织的同课异构活动,年青教师用心地备课,精心地上课。在课堂上,看到她们扎实的基本功,...
    聆听栀子花开阅读 887评论 2 17