有类型Actor

有类型Actor

  • 通过接口明确actor可以接收的消息目标和行为。

TypedActor是Akka基于Active对象(Active Object)设计模式的一个实现,关于Active对象模式,可以看维基百科的定义:
Active对象模式解耦了在一个对象上执行方法和调用方法的逻辑,执行方法和调用方法分别在各自的线程执行上下文中。该模式的目标是通过使用异步方法调用和一个调度器来处理请求,从而实现并行计算处理,该模式由6个元素组成:

  • 一个Proxy对象,提供一个面向客户端的接口和一组公共的方法
  • 一个接口,定义了请求一个Active对象上的方法的集合
  • 一个来自客户端请求的列表
  • 一个调度器,确定下一次处理哪一个请求
  • Active对象上方法的实现
  • 一个回调或者变量,供客户端接收请求被处理后的结果
    通过前面对Actor的了解,我们知道Actor更适用于在Akka的Actor系统之间来实现并行计算处理,而TypedActor适用于桥接Actor系统和非Actor系统。TypedActor是基于JDK的Proxy来实现的,与Actor不同的是,Actor一次处理一个消息,而TypedActor一次处理一个调用(Call)。关于更多关于TypedActor,可以查看Akka文档。

有类型Actor由两 “部分” 组成, 一个公开的接口和一个实现, 对普通actor来说,你拥有一个外部API(公开接口的实例)来将方法调用异步地委托给其实现的私有实例。

有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约,你不需要定义你自己的消息;它的劣势在于对你能做什么和不能做什么进行了一些限制,即你不能使用become/unbecome。

有类型Actor是使用JDK Proxies实现的,JDK Proxies提供了非常简单的api来拦截方法调用。

注意
和普通Akka actor一样,有类型actor一次也只处理一个消息。

工具箱

在创建第一个有类型Actor之前,我们先了解一下我们手上可供使用的工具,它位于akka.actor.TypedActor中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import akka.actor.TypedActor

//返回有类型actor扩展
val extension = TypedActor(system) //system是一个Actor系统实例

//判断一个引用是否是有类型actor代理
TypedActor(system).isTypedActor(someReference)

//返回一个外部有类型actor代理所代表的Akka actor
TypedActor(system).getActorRefFor(someReference)

//返回当前的ActorContext,
// 此方法仅在一个TypedActor 实现的方法中有效
val c: ActorContext = TypedActor.context

//返回当前有类型actor的外部代理,
// 此方法仅在一个TypedActor 实现的方法中有效
val s: Squarer = TypedActor.self[Squarer]

//返回一个有类型Actor扩展的上下文实例
//这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor
TypedActor(TypedActor.context)

创建有类型Actor

要创建有类型Actor,需要一个或多个接口,和一个实现。
创建我们的Squarer的有类型actor实例的最简单方法是:

1
2
val mySquarer: Squarer =
TypedActor(system).typedActorOf(TypedProps[SquarerImpl]())

第一个类型是代理的类型,第二个类型是实现的类型。如果要调用某特定的构造方法要这样做:

1
2
3
val otherSquarer: Squarer =
TypedActor(system).typedActorOf(TypedProps(classOf[Squarer],
new SquarerImpl("foo")), "name")

由于你提供了一个 Props, 你可以指定使用哪个派发器, 缺省的超时时间等。

方法派发语义

方法返回:

  • Unit 会以 fire-and-forget语义进行派发,与ActorRef.tell完全一致。
  • akka.dispatch.Future[_] 会以 send-request-reply语义进行派发,与 ActorRef.ask完全一致。
  • scala.Option[]会以send-request-reply语义派发,但是会阻塞等待应答, 如果在超时时限内没有应答则返回scala.None,否则返回包含结果的scala.Some[]。在这个调用中发生的异常将被重新抛出。
  • 任何其它类型的值将以send-request-reply语义进行派发,但会阻塞地等待应答, 如果超时会抛出java.util.concurrent.TimeoutException,如果发生异常则将异常重新抛出。

终止有类型Actor

由于有类型actor底层还是Akka actor,所以在不需要的时候要终止它。

1
2
3
4
5
TypedActor(system).stop(mySquarer)
这将会尽快地异步终止与指定的代理关联的有类型Actor

TypedActor(system).poisonPill(otherSquarer)
这将会在有类型actor完成所有入队的调用后异步地终止它。

有类型Actor监管树

你可以通过传入一个ActorContext来获得有类型Actor上下文,所以你可以对它调用typedActorOf(..)来创建有类型子actor。

1
2
3
4
5
//Inside your Typed Actor
val childSquarer: Squarer =
TypedActor(TypedActor.context).typedActorOf(TypedProps[SquarerImpl]())
//Use "childSquarer" as a Squarer
通过将ActorContext作为参数传给TypedActor.get(…),也可以为普通的Akka actor创建有类型子actor。

监管策略

通过让你的有类型Actor的具体实现类实现TypedActor.Supervisor方法,你可以定义用来监管子actor的策略,就像监管与监控 和容错(Scala)所描述的。

生命周期回调

通过使你的有类型actor实现类实现以下方法:

  • TypedActor.PreStart
  • TypedActor.PostStop
  • TypedActor.PreRestart
  • TypedActor.PostRestart
    你可以hook进有类型actor的整个生命周期。

接收任意消息

如果你的有类型actor的实现类扩展了akka.actor.TypedActor.Receiver,所有非方法调用MethodCall的消息会被传给onReceive方法.

这使你能够对DeathWatch的Terminated消息及其它类型的消息进行处理,例如,与无类型actor进行交互的场合。