Akka邮箱&路由器&调度器关键点说明

Akkay邮箱&路由器&调度器关键点说明

Akka 无法保证消息将被传送到目的地。这种无保证传送背后的哲学原理是 Akka 的核心原理之一。
Akka 可以 保证消息最多传送一次,而且绝不会无序地收到从一个 actor 实例发送到另一个 actor 实例的消息。

调度器

Akka MessageDispatcher是维持 Akka Actor “运作”的部分, 可以说它是整个机器的引擎。所有的MessageDispatcher实现也同时也是一个ExecutionContext

缺省派发器

在没有为Actor作配置的情况下,每一个ActorSystem将有一个缺省的派发器。该缺省派发器可以被配置,默认是使用指定的default-executor的一个Dispatcher。如果一个ActorSystem是使用传入的ExecutionContext创建的,则该ExecutionContext将被用作所有派发器的默认执行器(“executor”)。如果没有给定ExecutionContext,则会回退使用akka.actor.default-dispatcher.default-executor.fallback指定的执行器。缺省情况下是使用“fork-join-executor”,它在大多数情况下拥有非常好的性能。

邮箱

一个Akka Mailbox保存发往某个Actor的消息。通常每个Actor都拥有自己的邮箱,但也有例外,例如使用BalancingPool的所有路由子(routee)共享同一个邮箱实例。

为actor指定一个消息队列类型

为某个特定类型的actor指定一个特定类型的消息队列是有可能的,只要通过actor扩展RequiresMessageQueue参数化特质即可。下面是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import akka.dispatch.RequiresMessageQueue
import akka.dispatch.BoundedMessageQueueSemantics

class MyBoundedActor extends MyActor
with RequiresMessageQueue[BoundedMessageQueueSemantics]
RequiresMessageQueue特质的类型参数需要映射到配置中的邮箱,像这样:

bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
}

akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}

如何选择邮箱类型

当一个actor创建时,ActorRefProvider首先确定将执行它的调度器。然后,邮箱确定如下:

  1. 如果actor的部署配置节包含mailbox键,则其描述邮箱类型将被使用。
  2. 如果actor的Props包含邮箱选择——即它调用了withMailbox——则其描述邮箱类型将被使用。
  3. 如果调度器的配置节包含mailbox-type键,则该节内容将用于配置邮箱类型。
  4. 如果该actor需要邮箱类型,如上文所述,然后该约束的映射将用于确定使用的邮箱类型;如果不能满足调度器的约束——如果有的话——将继续替换尝试。
  5. 如果调度器需要一个邮箱类型,如上文所述,则该约束的映射将被用来确定要使用的邮箱类型。
  6. 将使用默认邮箱akka.actor.default-mailbox。

路由

消息可以通过路由器发送,以便有效地将它们路由到目的actor,称为其routee。一个Router可以在actor内部或外部使用,并且你可以自己管理routee或使用有配置功能的自我包含的路由actor。

为了创建路由器,设置指定数量的rountee,需要以下信息
-rounter类型和routee实例的数量

1
ActorRef masterAct = system.actorOf(Props.create(MasterActor.class).withRouter(new RoundRobinPool(5)),"RounterActor");

定义了一个Actor,通过实例化一个路由器实例,这里是RoundRobin,它的构造函数接受一个参数,表示创建rountees的数量.

一个简单的路由器

下面的示例阐释如何使用Router和在actor内管理routee。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import akka.routing.ActorRefRoutee
import akka.routing.Router
import akka.routing.RoundRobinRoutingLogic

class Master extends Actor {
var router = {
val routees = Vector.fill(5) {
val r = context.actorOf(Props[Worker])
context watch r
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(), routees)
}

def receive = {
case w: Work =>
router.route(w, sender())
case Terminated(a) =>
router = router.removeRoutee(a)
val r = context.actorOf(Props[Worker])
context watch r
router = router.addRoutee(r)
}
}

Akka自带的路由逻辑如下:

  • akka.routing.RoundRobinRoutingLogic
  • akka.routing.RandomRoutingLogic
  • akka.routing.SmallestMailboxRoutingLogic
  • akka.routing.BroadcastRoutingLogic
  • akka.routing.ScatterGatherFirstCompletedRoutingLogic
  • akka.routing.TailChoppingRoutingLogic
  • akka.routing.ConsistentHashingRoutingLogic

Router是不可变的,而RoutingLogic是线程安全的;意味着他们也可以在actor外部使用。
路由actor是一种特殊的类型-RounterActorRef。RounterActorRef不是利用存储转发的机制,他直接路由消息到Rountee的邮箱,而不是到Router的邮箱,当routee答复路由消息时,回复将发送到原始发件人,而不是路由actor。

注意
一般情况下,任何发送到路由器的消息将被向前发送到它的routee,但有一个例外。特别地广播消息将发送到路由器下所有的routee

一个路由actor

一个路由器也可以被创建为一个自包含的actor,来管理routee,载入路由逻辑和其他配置设置。

注意: 路由器actor相当于是透明的
路由器也是子actor的监管者。路由器的默认策略是总是上溯,所以错误会传递给路由器的监管者处理。
注意路由器的监管者会将错误当做路由器的错位,因此会重启路由器,并将导致他的孩子全部重启。
如果路由器池的子actor终止,池路由器不会自动产生一个新的actor。在池路由器所有子actor都终止的事件中,路由器将终止本身,除非它是一个动态的路由器,例如使用了大小调整。