akka编程demo

AKKA

akka基于actor模型, 是一个用于构建可扩展的弹性的快速响应的应用程序的平台;
actor模型:是一个并行计算模型。 它把actor作为基本元素来对待:未响应一个接收到的消息,一个actor能够自己做出一些决策,如创建更多的actor或者发送更多的消息

image.png

概念介绍

Actor:

actor是akka中最核心的概念,它是一个封装了状态和行为的对象,actor之间可以通过交换消息的方式进行通信,每个actor都有自己的收件箱,通过actor能够简化锁及线程管理,actor具有如下特性:

  • 提供了一种高级抽象,能够简化在并发/并行应用场景下的编程开发
  • 提供了异步非阻塞、高性能的事件驱动编程模型
  • 超轻量级事件处理(每GB堆内存几百万actor)

类介绍

ActorSystem

在Akka中,ActorSystem是一个重量级的结构,他需要分配多个线程,所以实际应用中,actorSystem一般是单例对象,我们通过ActorSystem创建很多actor,负责创建和监督actor

Actor

Actor负责通信,它包含一些重要的生命周期方法:

  • preStart(): 在Actor对象构造方法执行后执行
  • receive(): 在actor的preStart方法执行完成后执行,用于接收消息,会被反复执行

Demo

使用akka做一个简易的通信模型,实现一个主从结构通信

Master

主对象类,即注册中心,统计当前在线的worker数目

package akkaDemo

import akka.actor._
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
/**
  * @author phil.zhang
  */
class Master extends Actor {

  // workMap
  private val workerMap = new mutable.HashMap[Int, WorkerInfo]()

  private val workerList = new ListBuffer[WorkerInfo]()

  override def preStart(): Unit = {
    println("master 已经启动")

    import context.dispatcher
    // 循环检查心跳
    context.system.scheduler.schedule(0 millis, 10 seconds, self, Check)
  }

  override def receive: Receive = {
    // 接受注册信息并统计
    case RegisterMessage(workId, memory, cores) => {
      val info = new WorkerInfo(workId, memory, cores)
      info.lastHeartBeatTime = System.currentTimeMillis()
      workerMap.put(workId, info)
      workerList += info
      val size = workerList.size
      println(info)
      println(s"worker$workId 注册成功,当前worker共:$size")
      sender ! RegisterdMessage("注册成功")
    }
      // 检查心跳
    case Check => {
      val now = System.currentTimeMillis()
      val outTimeList = workerList.filter(worker => now - worker.lastHeartBeatTime > 5000)
      outTimeList.foreach(workerInfo => {
        workerList -= workerInfo
        workerMap.remove(workerInfo.workerId)
        println("移除" + workerInfo.workerId)
      })
    }
      // 接受心跳后更新心跳时间
    case SendHeartBeat(workId) => {
      if (workerMap.contains(workId)) {
        val workerInfo = workerMap(workId)
        workerInfo.lastHeartBeatTime=System.currentTimeMillis()
      }
    }
  }
}

object Master {

  def main(args: Array[String]): Unit = {
    val options = new Options()
    options.addOption("h", true, "host")
    options.addOption("p", true, "port")

    val parser = new GnuParser()
    val line = parser.parse(options, args)

    val host = line.getOptionValue("h")
    val port = line.getOptionValue("p").toInt

    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config=ConfigFactory.parseString(configStr)
    val actorSystem = ActorSystem("actorSystem", config)
    val master = actorSystem.actorOf(Props(new Master), "master")
  }

}

Worker

工作对象类, 向主类注册,并保持心跳

package akkaDemo

import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import org.apache.commons.cli.{GnuParser, Options}

import scala.concurrent.duration._

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{

  var master:ActorSelection = _

  override def preStart(): Unit = {

    // actorSystem 是master的ActorSystem的名字, master是masterActor的名字
    master=context.actorSelection(s"akka.tcp://actorSystem@$masterHost:$masterPort/user/master")
    // 向主类注册
    master ! RegisterMessage(1, memory, cores)
    println("worker注册")
  }

  override def receive: Receive = {
    // 主类注册返回信息
    case RegisterdMessage(message) => {
      println("worker" + message)

      import context.dispatcher
      // 循环发起心跳
      context.system.scheduler.schedule(0 millis, 2 seconds,self, HeartBeat)
    }
      // 发送心跳
    case HeartBeat => {
      master ! SendHeartBeat(1)
    }
  }
}

object Worker {

  def main(args: Array[String]): Unit = {
    val options = new Options()
    options.addOption("mh",true, "master host")
    options.addOption("mp",true, "master port")
    options.addOption("h",true, "host")
    options.addOption("p",true, "host")
    options.addOption("m",true, "memory")
    options.addOption("c",true, "cores")

    val parser = new GnuParser()
    val line = parser.parse(options, args)

    val m_host = line.getOptionValue("mh")
    val m_port = line.getOptionValue("mp")
    val host = line.getOptionValue("h")
    val port = line.getOptionValue("p")
    val memory = line.getOptionValue("m").toInt
    val cores = line.getOptionValue("c").toInt

    val configStr=
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config=ConfigFactory.parseString(configStr)

    val actorSystem = ActorSystem("workerActorSystem", config)

    val worker = actorSystem.actorOf(Props(new Worker(memory,cores,m_host,m_port)),"worker")

  }
}

WorkerInfo

工作对象信息类, 用于描述工作对象

package akkaDemo

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
class WorkerInfo(val workerId:Int,val memory: Int,val cores:Int) {

  // 用于记录上次心跳时间
  var lastHeartBeatTime:Long = _
  
  override def toString = s"$workerId,$memory,$cores"
}

Message

定义了一些信息类型

package akkaDemo

/**
  * @author phil.zhang
  * @date 2020/2/6
  */
trait Message extends Serializable{

}

// slave发给master的心跳信息
case class SendHeartBeat(workId: Int) extends Message

// slave发给master的注册信息
case class RegisterMessage(workId: Int, memory: Int, cores: Int) extends Message

// master发给slave的注册反馈信息
case class RegisterdMessage(message: String) extends Message

// master发给自己的检查信息, 所以不需要序列化
case object Check

// slave发给自己的心跳信息,所以不需要序列化
case object HeartBeat

主要maven依赖

    <dependency>
      <groupId>commons-cli</groupId>
      <artifactId>commons-cli</artifactId>
      <version>1.2</version>
    </dependency>

    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-actor_2.11</artifactId>
      <version>2.5.3</version>
    </dependency>
    <dependency>
      <groupId>com.typesafe.akka</groupId>
      <artifactId>akka-remote_2.11</artifactId>
      <version>2.5.3</version>
    </dependency>
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容

  • 持久化 当我们在集群系统中,一台机器向另一台机器发送一段数据,负责接收的机器在接收数据前突然宕机,就会造成数据丢失...
    mango_knight阅读 4,494评论 0 4
  • 前言 一 不得不说的Actor模型 1.1 Actor模型的诞生与发展 1.2 Actor模型是什么? 1.3 A...
    hedgehog1112阅读 401评论 0 0
  • 这篇文章主要介绍了Flink通过Akka实现的分布式通信。它第一次在0.9版本中出现。通过Akka,所有的远程程序...
    alvin_wang阅读 9,611评论 0 12
  • 传统的游戏服务器要么是单线程要么是多线程,过去几十年里CPU一直遵循摩尔定律发展,带来的结果是单核频率越来越高。而...
    JunChow520阅读 66,423评论 14 58
  • Actor系统的实体 在Actor系统中,actor之间具有树形的监管结构,并且actor可以跨多个网络节点进行透...
    JasonDing阅读 3,326评论 2 6