Actor就是Scala中的多线程编程,Actor尽可能避免锁和共享状态,从而避免多线程并发时出现资源争用的情况,进而提升多线程的性能。此外,Actor的这种模型还可以避免死锁等一系列传统多线程编程的问题。
Spark中使用的分布式多线程框架,是Akka,Akka也实现了Scala Actor的模型,其核心概念也是ACtor。
1、Actor的创建、启动和消息收发(案例:Actor Hello World)
Scala提供了Actor trait 来进行actor多线程编程,我们只需要重写Actor trait的方法,即可实现自己的线程执行体,与Java中重写run方法类似。
此外,使用start()方法启动actor,使用!符号向actor发送消息,actor内部使用receive和模式匹配接收消息
代码如下:
import scala.actors.Actor
class HelloActor extends Actor{
def act(): Unit ={
while(true){
receive{
case name:String =>println("Hello,"+name)
}
}
}
}
object testObject{
def main(args: Array[String]): Unit = {
val helloActor = new HelloActor
helloActor.start()
helloActor!"hello actor"
helloActor!"yangyang"
}
}
2、收发case class类型的消息(案例:用户注册登陆后台接口)
Scala天然支持线程之间的精准通信,即一个actor可以给其他actor发送消息。
import scala.actors.Actor
case class Login(username:String,psd:String)
case class Register(username:String,psd:String)
class UserManageActor extends Actor{
override def act(): Unit = {
while (true){
receive{
case Login(username,psd)=>println("login,usname:"+username+",psd:"+psd)
case Register(username,psd)=>println("Register,usname:"+username+",psd:"+psd)
}
}
}
}
object UserManageActor{
def main(args: Array[String]): Unit = {
val userManageActor = new UserManageActor
userManageActor.start()
userManageActor!Register("leo","1234")
userManageActor!Login("leo","1234")
}
}
3、Actor之间互相收发消息(案例:打电话)
一个actor向另外一个actor发送消息时,同时带上自己的引用,其他actor收到消息后,可以通过发送消息的引用,给发送者回复消息
import scala.actors.Actor
case class Message(content:String,sender:Actor)
class LeoActor extends Actor{
override def act(): Unit = {
while (true){
receive{
case Message(content,sender)=>{
println("leo receive content:"+content)
sender!"leo receive your content"
}
}
}
}
}
class JackActor(val leoActor: Actor) extends Actor{
override def act(): Unit = {
leoActor!Message("Hello Leo,I'm Jack",this)
receive{
case response:String=>{
println("Jack receive content:"+response)
}
}
}
}
object UserManageActor{
def main(args: Array[String]): Unit = {
val leoActor = new LeoActor
val jackActor = new JackActor(leoActor)
leoActor.start()
jackActor.start()
}
}
4、同步消息和Future
默认情况,消息都是异步的,如果希望发送消息是同步的,即对方接收到消息后,立马给个返回结果,那么可以使用!?的方式发送消息,即val reply = actor!?message
如果要异步发送消息,但是在后面要获取消息的返回值,那么可以使用Future。即!!语法。val future = actor!!messgae
import scala.actors.Actor
case class SyncMsg(id:Int,msg:String)
case class AsyncMsg(id:Int,msg:String)
case class ReplyMsg(id:Int,msg:String)
class AppleActor extends Actor{
override def act(): Unit = {
while (true){
receive{
case "start" => println("starting ....")
case SyncMsg(id,msg) =>{//同步
println(id+",sync"+msg)
Thread.sleep(5000)
sender !ReplyMsg(id,"ReplyMsg finished")
}
case AsyncMsg(id,msg) =>{//异步
println(id +",AsyncMsg"+msg)
Thread.sleep(5000)
}
}
}
}
}
object AppleActor{
def main(args: Array[String]): Unit = {
val a=new AppleActor
a.start()
//异步消息,没有返回值
a ! AsyncMsg(1,"AsyncMsg !")
// a !? AsyncMsg(2,"AsyncMsg !?")//异步消息没有返回消息,会阻塞
println("异步消息发送完成")
//同步消息
val reply1= a ! SyncMsg(1 ,"SyncMsg !")//无返回值的适用于异步的
println("同步接收:reply1:"+reply1)
val reply2= a !? SyncMsg(2 ,"SyncMsg !?")//阻塞
println("同步接收:reply2:"+reply2)
val reply3= a !! SyncMsg(3,"SyncMsg !!")
println("reply3:"+reply3)
println(reply3.isSet)
val c=reply3.apply()//阻塞
println(reply3.isSet)
println("异步接收 reply3.apply:"+c)
}
}
运行结果如下:
异步消息发送完成
同步接收:reply1:()
1,AsyncMsgAsyncMsg !
1,syncSyncMsg !
2,syncSyncMsg !?
同步接收:reply2:ReplyMsg(2,ReplyMsg finished)
reply3:<function0>
false
3,syncSyncMsg !!
true
异步接收 reply3.apply:ReplyMsg(3,ReplyMsg finished)