集成Akka

Integrating with Akka

Akka 使用Actor模型来提升抽象等级并且提供一个更好的平台去构建正确的并发和可扩展的应用。对于容错它采用了Let it crash模型,这是很成功的应用在电信行业构建一个永不停止的自愈系统。Actor提供了对于transparent distribution和基本的真正可拓展和容错的应用。

The application actor system

Akka可以使用一些称作actor systems的容器。一个actor system 管理它的配置资源用于运行他所包含的actors.

一个Play应用定义了一个特殊的actor system被自身使用。这个actor system跟随着这个应用的生命周期并且会自动的重启当应用重启的时候。

Writing actors

想要使用Akka,你需要写一个actor.下面是一个简单的actor.

package actors;

import akka.actor.*;
import actors.HelloActorProtocol.*;

public class HelloActor extends UntypedActor {

    public static Props props = Props.create(HelloActor.class);

    public void onReceive(Object msg) throws Exception {
        if (msg instanceof SayHello) {
            sender().tell("Hello, " + ((SayHello) msg).name, self());
        }
    }
}

注意这里的HelloActor定义了一个static field叫做props,它返回一个Props对象用于描述如何创建这个actor。这是一个非常好的Akka惯例,用来分离实例化的逻辑从创建actor的代码中。

这里有一个best practice。把HelloActor发送和接收定义为static inner classes叫做HelloActorProtocol:

package actors;

public class HelloActorProtocol {

    public static class SayHello {
        public final String name;

        public SayHello(String name) {
            this.name = name;
        }
    }
}

Creating and using actors

创建或是使用一个actor,你需要一个ActorSystem.可以通过申明一个依赖来获得,然后你可以使用actorOf方法去创建一个新的actor.

最基本的事情就是你可以给一个actor发送一个message.当你发送一个message给一个actor,没有响应, it`s fire and forget.这也被称作tell模式.

然而在一个web应用中,tell模式通常是没有用的,因为HTTP协议是一个request和responses.在这种情况下,你可能想要的是一个ask模式.这个ask模式返回一个Scala的Future,你可以通过使用scala.compat.java8.FutureConverts.toJava来转换为Java的CompletionStage.

下面是一个使用ask模式的HelloActor的例子:

import akka.actor.*;
import play.mvc.*;
import scala.compat.java8.FutureConverters;
import javax.inject.*;
import java.util.concurrent.CompletionStage;

import static akka.pattern.Patterns.ask;//need imported

@Singleton
public class Application extends Controller {

    final ActorRef helloActor;

    @Inject public Application(ActorSystem system) {
        helloActor = system.actorOf(HelloActor.props);
    }

    public CompletionStage<Result> sayHello(String name) {
        return FutureConverters.toJava(ask(helloActor, new SayHello(name), 1000))
                .thenApply(response -> ok((String) response));
    }
}

有一些需要注意的地方:

  • ask模式需要被导入,静态导入ask通常是最方便的方式。
  • 返回的future被转换为CompletionStage.这导致promise是一个CompletionStage<Object>,当你访问呢这个值的时候,你需要转换为你希望从actor返回的类型。
  • 这个ask模式需要一个timeout,我们提供了1000 milliseconds.如果actor花费的时间超过这个响应时间。返回的promise将成为一个timeout error。
  • 由于我们创建了一个actor在这个构造器中,我们需要我们的controller作为一个Singleton,这样每次controller被使用时不会创建一个性的actor。

Dependency injecting actors

如果你愿意,你可以让Guice实例化你的actors并绑定引用到你的controllers和components依赖。

比如,如果你想要一个actor依赖于 Play configuration,你可以这样做:

import akka.actor.UntypedActor;
import play.Configuration;

import javax.inject.Inject;

public class ConfiguredActor extends UntypedActor {

    private Configuration configuration;

    @Inject
    public ConfiguredActor(Configuration configuration) {
        this.configuration = configuration;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof ConfiguredActorProtocol.GetConfig) {
            sender().tell(configuration.getString("my.config"), self());
        }
    }
}

Play提供了一些helpers用于帮助acotr bindings。他们允许actor自身依赖注入,并且允许actor引用自身被注入到其他的组件。绑定actor使用这些helpers,创建一个module参考dependency injection documentation,配合AkkaGuiceSupport接口和使用bindActor方法去绑定到actor:

import com.google.inject.AbstractModule;
import play.libs.akka.AkkaGuiceSupport;

public class MyModule extends AbstractModule implements AkkaGuiceSupport {
    @Override
    protected void configure() {
        bindActor(ConfiguredActor.class, "configured-actor");
    }
}

actor同时被命名为configured-actor,并且还将使用configured-actor被注入。现在你可以依赖actor在你的controllers和其他的components:

import akka.actor.ActorRef;
import play.mvc.*;
import scala.compat.java8.FutureConverters;

import javax.inject.Inject;
import javax.inject.Named;
import java.util.concurrent.CompletionStage;

import static akka.pattern.Patterns.ask;

public class Application extends Controller {

    private ActorRef configuredActor;

    @Inject
    public Application(@Named("configured-actor") ActorRef configuredActor) {
       this.configuredActor = configuredActor;
    }

    public CompletionStage<Result> getConfig() {
        return FutureConverters.toJava(ask(configuredActor,
                        new ConfiguredActorProtocol.GetConfig(), 1000)
        ).thenApply(response -> ok((String) response));
    }
}

Dependency injecting child actors

上面是关于root actors的注入,但是你创建的很多actor是没有被Play应用的生命周期束缚的,并且可能会有一些额外的状态传递。

为了帮助注入child actors,Play利用Guice的AssistedInject支持.

假设你有下面的actor,依赖configuration被注入,添加一个key:

import akka.actor.UntypedActor;
import com.google.inject.assistedinject.Assisted;
import play.Configuration;

import javax.inject.Inject;

public class ConfiguredChildActor extends UntypedActor {

    private final Configuration configuration;
    private final String key;

    @Inject
    public ConfiguredChildActor(Configuration configuration, @Assisted String key) {
        this.configuration = configuration;
        this.key = key;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof ConfiguredChildActorProtocol.GetConfig) {
            sender().tell(configuration.getString(key), self());
        }
    }
}

在这种情况下我们使用构造函数注入,Guice的注入支持仅仅兼容构造器注入。参数key在创建时提供,不是通过容器,我们使用了@Assisted注解。

现在在child协议中,我们定义了一个Factory接口:

import akka.actor.Actor;

public class ConfiguredChildActorProtocol {

    public static class GetConfig {}

    public interface Factory {
        public Actor create(String key);
    }
}

我们不会去实现这个接口,Guice会为我们做这些,提供一个实现,不仅传递我们的key参数,而且还定位Configuration依赖并且注入他。由于只是返回一个Actor,当测试这个actor时,我们可以注入一个factor返回任意的actor,比如它允许我们注入一个家的child actor,来替代一个真实的actor。

现在actor依赖可以被InjectedActorSupport拓展,他可以依赖于我们创建的factory:

import akka.actor.ActorRef;
import akka.actor.UntypedActor;
import play.libs.akka.InjectedActorSupport;

import javax.inject.Inject;

public class ParentActor extends UntypedActor implements InjectedActorSupport {

    private ConfiguredChildActorProtocol.Factory childFactory;

    @Inject
    public ParentActor(ConfiguredChildActorProtocol.Factory childFactory) {
        this.childFactory = childFactory;
    }

    @Override
    public void onReceive(Object message) throws Exception {
        if (message instanceof ParentActorProtocol.GetChild) {
            String key = ((ParentActorProtocol.GetChild) message).key;
            ActorRef child = injectedChild(() -> childFactory.create(key), key);
            sender().tell(child, self());
        }
    }
}

它使用injectedChild创建并获取child actor引用,通过key。第二个参数将会被用作child actor的name。

最终,我们需要绑定我们的actors。在我们的模块中。我们使用bindActorFactory去绑定parent actor并且绑定child factory到child实现:

import com.google.inject.AbstractModule;
import play.libs.akka.AkkaGuiceSupport;

public class MyModule extends AbstractModule implements AkkaGuiceSupport {
    @Override
    protected void configure() {
        bindActor(ParentActor.class, "parent-actor");
        bindActorFactory(ConfiguredChildActor.class,
            ConfiguredChildActorProtocol.Factory.class);
    }
}

这将是Guice自动绑定ConfiguredChildActorProtocol.Factory的实例,该实例将在配置为实例化时配置为ConfiguredChildActor。

Configuration

默认的actor系统配置是读取自Play application configuration文件。比如,配置默认的application actor system dispatcher,将这些行添加到conf/application.conf文件:

akka.actor.default-dispatcher.fork-join-executor.pool-size-max = 64
akka.actor.debug.receive = on

关于Akka的日志配置,参考configuring logging.

Changing configuration prefix

如果你想使用akka.*配置其他的Akka actor system, 你可以告诉Play加载他的配置从其他的位置。

play.akka.config = "my-akka"

现在配置将读取my-akka前缀替代akka前缀:

my-akka.actor.default-dispatcher.fork-join-executor.pool-size-max = 64
my-akka.actor.debug.receive = on

Built-in actor system name

默认的Play actor system 的name是application。你可以改变他通过conf/application.conf

play.akka.actor-system = "custom-name"

Note: This feature is useful if you want to put your play application ActorSystem in an akka cluster.

Executing a block of code asynchronously

一个常见的Akka用例是并发的计算,不需要···。如果你发现你创建了一个Actors pool仅仅只是为了执行一些并行的计算,这里有一些更简单更快的方式:

import play.mvc.*;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

public class Application extends Controller {
    public CompletionStage<Result> index() {
        return CompletableFuture.supplyAsync(this::longComputation)
                .thenApply((Integer i) -> ok("Got " + i));
    }
}

Scheduling asynchronous tasks

你可以定时发送消息给一个actor或是执行任务(functions or Runnable instances).你可以得到一个Cancellable,你可以调用cancel来取消任务的执行。

比如,你可以发送一个消息到testActorevery 30 minutes:

system.scheduler().schedule(
    Duration.create(0, TimeUnit.MILLISECONDS), //Initial delay 0 milliseconds
    Duration.create(30, TimeUnit.MINUTES),     //Frequency 30 minutes
    testActor,
    "tick",
    system.dispatcher(),
    null
);

或则运行一个代码块10 milliseconds立即:

system.scheduler().scheduleOnce(
    Duration.create(10, TimeUnit.MILLISECONDS),
    () -> file.delete(),
    system.dispatcher()
);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342

推荐阅读更多精彩内容

  • 最近因为项目中有大量的消息分发需求,突然心血来潮决定挑战一下传说中的并发终极武器AKKA。 因为这个项目是spri...
    SamHxm阅读 4,822评论 0 11
  • maven SpringExtension akka config actor workerActor maste...
    go4it阅读 5,132评论 0 2
  • Actor系统的实体 在Actor系统中,actor之间具有树形的监管结构,并且actor可以跨多个网络节点进行透...
    JasonDing阅读 3,326评论 2 6
  • 引言 这篇文章主要是第一次学习Akka编程,先试试水,探探坑,对Akka和SBT的使用有一个直观的了解,以几个简单...
    JasonDing阅读 3,875评论 0 19
  • 与OpenGL ES1.x渲染管线相比,OpenGL ES 2.0渲染管线中“顶点着色器”取代了OpenGL ES...
    武小寺阅读 4,203评论 0 6