雁渡寒潭 风吹疏竹

教练,我想打篮球


  • Home

  • Tags

  • Categories

  • Archives

Akka的监管和监控

Posted on 2016-02-04 | In Akka |

Akka的监管和监控

监管和监控

在 Actor 系统 中说过,监管描述的是actor之间的依赖关系:监管者将任务委托给下属,并相应地对下属的失败状况进行响应。当一个下属出现了失败(即抛出一个异常),它自己会将自己和自己所有的下属挂起,然后向自己的监管者发送一个提示失败的消息。基于所监管的工作的性质和失败的性质,监管者可以有4种基本选择:

  1. 恢复下属,保持下属当前积累的内部状态
  2. 重启下属,清除下属的内部状态
  3. 永久地停止下属
  4. 升级失败(沿监管树向上传递失败),由此失败自己

警告
监管相关的父-子沟通,使用了特殊的系统消息及其固有的邮箱,从而和用户消息隔离开来。这意味着,监管相关的事件相对于普通的消息没有确定的顺序关系。在一般情况下,用户不能影响正常消息和失败通知的顺序。

顶级监管者

enter image description here

一个actor系统在其创建过程中至少要启动三个actor,如上图所示。

  • /user: 守护Actor

这个名为”/user”的守护者,作为所有用户创建actor的父actor,可能是需要打交道最多的。使用system.actorOf()创建的actor都是其子actor。

  • /system: 系统守护者

这个特殊的守护者被引入,是为了实现正确的关闭顺序,即日志(logging)要保持可用直到所有普通actor终止,即使日志本身也是用actor实现的。

  • /: 根守护者

根守护者所谓“顶级”actor的祖父,它监督所有在Actor路径的顶级作用域中定义的特殊actor,使用发现任何Exception就终止子actor的SupervisorStrategy.stoppingStrategy策略。

重启

重启过程中所发生事件的精确次序是:

  1. actor被挂起(意味着它不会处理正常消息直到被恢复),并递归挂起其所有子actor
  2. 调用旧实例的 preRestart hook (缺省实现是向所有子actor发送终止请求并调用 postStop)
  3. 等待所有子actor终止(使用context.stop())直到 preRestart 最终结束;这里所有的actor操作都是非阻塞的,最后被杀掉的子actor的终止通知会影响下一步的执行
  4. 再次调用原来提供的工厂生成actor的新实例
  5. 调用新实例的postRestart方法(其默认实现是调用preStart方法)
  6. 对步骤3中没有被杀死的所有子actor发送重启请求;重启的actor会遵循相同的过程, 从步骤2开始
  7. 恢复这个actor

Actor引用, 路径与地址

什么是Actor引用?

Actor引用是 ActorRef 的子类,其最重要的目的是支持向它所代表的actor发送消息。每个actor通过self字段来访问自己的标准(本地)引用;在给其它actor发送的消息中也缺省包含这个引用。反过来,在消息处理过程中,actor可以通过sender()方法来访问到当前消息的发送者的引用。

什么是Actor路径?

由于actor是以一种严格的树形结构样式来创建的,所以沿着子actor到父actor的监管链,一直到actor系统的根存在一条唯一的actor名字序列。这个序列可以被看做是文件系统中的文件路径,所以我们称之为“路径”。就像在一些真正的文件系统中一样,也存在所谓的“符号链接”,即一个actor也许能通过不同的路径被访问到,除了原始路径外,其它的路径都涉及到对actor实际监管祖先链的某部分路径进行转换的方法。

如何获得Actor引用?

actor引用的获取方法分为两类:通过创建actor,或者通过查找actor。后一种功能又分两种:通过具体的actor路径来创建actor引用,和查询actor逻辑树。

创建Actor

一个actor系统通常是在根守护者上使用ActorSystem.actorOf创建actor来启动,然后在创建出的actor中使用ActorContext.actorOf来展开actor树。这些方法返回的是指向新创建的actor的引用。每个actor都拥有到它的父亲,它自己和它的子actor的引用(通过ActorContext访问)。这些引用可以与消息一起被发送给别的actor,以便接收方直接回复。

通过具体的路径来查找actor

另外,可以使用ActorSystem.actorSelection来查找actor引用。“选择”可在已有actor与被选择的actor进行通讯的时候用到,在投递每条消息的时候都会用到查找。

为了获得一个绑定到指定actor生命周期的ActorRef,你需要发送一个消息,如内置的Identify信息,向指定的actor,所获得的sender()即为所求。

val selection = context.actorSelection(“/user/serviceA”)
selection.tell(new Identify(identifyId), getSelf());

总结: actorOf vs. actorSelection vs. actorFor

Note

以上部分所描述的细节可以简要地总结和记忆成:

actorOf 永远都只会创建一个新的actor,这个新的actor是actorOf所调用上下文(可以是任意一个actor或actor系统本身)的直接子actor
actorSelection只会在消息送达后查找已经存在的actor集合,即不会创建actor,也不会在创建选择集合时验证actor是否存在。
actorFor(废弃,已经被actorSelection取代) 永远都只是查找到一个已存在的actor,不会创建新的actor。

Actor路径的顶级作用域

在路径树的根上是根监管者,所有其他actor都可以从通过它找到;它的名字是”/“。在第二个层次上是以下这些:

  • “/user” 是所有由用户创建的顶级actor的监管者;用 ActorSystem.actorOf创建的actor在其下。
  • “/system” 是所有由系统创建的顶级actor的监管者,如日志监听器,或由配置指定在actor系统启动时自动部署的actor。
  • “/deadLetters” 是死信actor,所有发往已经终止或不存在的actor的消息会被重定向到这里(以尽最大努力为基础:即使在本地JVM,消息也可能丢失)
  • “/temp”是所有系统创建的短时actor的监管者,例如那些在ActorRef.ask的实现中用到的actor。
  • “/remote” 是一个人造虚拟路径,用来存放所有其监管者是远程actor引用的actor。

Akka Actor的创建&引用&声明周期

Posted on 2016-02-04 | In Akka |

Actor的创建&引用&声明周期

1.创建actor

  • 定义一个Actor类

要定义自己的Actor类,需要继承Actor并实现receive方法。receive方法需要定义一系列case语句(类型为PartialFunction[Any, Unit])来描述你的Actor能够处理哪些消息(使用标准的Scala模式匹配),以及消息如何被处理。
如下例:

1
2
3
4
5
6
7
8
9
10
11
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging

class MyActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
}

  • Props

Props是一个用来在创建actor时指定选项的配置类,可以把它看作是不可变的,因此在创建包含相关部署信息的actor时(例如使用哪一个调度器(dispatcher),详见下文),是可以自由共享的。以下是如何创建Props实例的示例.

1
2
3
4
5
import akka.actor.Props

val props1 = Props[MyActor]
val props2 = Props(new ActorWithArgs("arg")) // careful, see below
val props3 = Props(classOf[ActorWithArgs], "arg")

警告
在另一个actor中声明一个actor是非常危险的,会打破actor的封装。永远不要将一个actor的this引用传进Props!
推荐做法

在每一个Actor的伴生对象中提供工厂方法是一个好主意,这有助于保持创建合适的Props,尽可能接近actor的定义。这也避免了使用Props.apply(…)方法将采用一个“按名”(by-name)参数的缺陷,因为伴生对象的给定代码块中将不会保留包含作用域的引用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
object DemoActor {
/**
* Create Props for an actor of this type.
* @param magciNumber The magic number to be passed to this actor’s constructor.
* @return a Props for creating this actor, which can then be further configured
* (e.g. calling `.withDispatcher()` on it)
*/
def props(magicNumber: Int): Props = Props(new DemoActor(magicNumber))
}

class DemoActor(magicNumber: Int) extends Actor {
def receive = {
case x: Int => sender() ! (x + magicNumber)
}
}

class SomeOtherActor extends Actor {
// Props(new DemoActor(42)) would not be safe
context.actorOf(DemoActor.props(42), "demo")
// ...
}

  • 使用Props创建Actor

Actor可以通过将Props实例传入actorOf工厂方法来创建,ActorSystem和ActorContext中都有该方法。

1
2
3
4
5
6
7
8
9
10
11
import akka.actor.ActorSystem

// ActorSystem is a heavy object: create only one per application
val system = ActorSystem("mySystem")
val myActor = system.actorOf(Props[MyActor], "myactor2")
使用ActorSystem将创建顶级actor,由actor系统提供的守护actor监管;如果使用的是actor的上下文,则创建一个该actor的子actor。

class FirstActor extends Actor {
val child = context.actorOf(Props[MyActor], name = "myChild")
// plus some behavior ...
}

推荐创建一个树形结构,包含子actor、孙子等等,使之符合应用的逻辑错误处理结构

  • 依赖注入

如果你的actor有带参数的构造函数,则这些参数也需要成为Props的一部分,如上文所述。但有些情况下必须使用工厂方法,例如,当实际构造函数的参数由依赖注入框架决定。

1
2
3
4
5
6
7
8
9
10
11
12
13
import akka.actor.IndirectActorProducer

class DependencyInjector(applicationContext: AnyRef, beanName: String)
extends IndirectActorProducer {

override def actorClass = classOf[Actor]
override def produce =
// obtain fresh Actor instance from DI framework ...
}

val actorRef = system.actorOf(
Props(classOf[DependencyInjector], applicationContext, "hello"),
"helloBean")

2.Actor API

Actor trait只定义了一个抽象方法,就是上面提到的receive,用来实现actor的行为。

如果当前actor的行为与收到的消息不匹配,则会调用 unhandled,其缺省实现是向actor系统的事件流中发布一条akka.actor.UnhandledMessage(message, sender, recipient)(将配置项akka.actor.debug.unhandled设置为on来将它们转换为实际的调试消息)。

另外,它还包括:

  • self引用代表本actor的ActorRef
  • sender引用代表最近收到消息的发送actor,通常用于下面将讲到的消息回应中
  • supervisorStrategy 用户可重写它来定义对子actor的监管策略

该策略通常在actor内声明,这样决定函数就可以访问actor的内部状态:因为失败通知作为消息发送给监管者,并像普通消息一样被处理(尽管不是正常行为),所有的值和actor变量都是可用的,以及sender引用 (报告失败的将是直接子actor;如果原始失败发生在遥远的后裔,它仍然是一次向上报告一层)。

  • context暴露actor和当前消息的上下文信息,如:
    • 用于创建子actor的工厂方法(actorOf)
    • actor所属的系统
    • 父监管者
    • 所监管的子actor
    • 生命周期监控
    • hotswap行为栈,见Become/Unbecome

3.Actor生命周期

enter image description here

actor系统中的路径代表一个”地方”,这里可能会被活着的actor占据。最初(除了系统初始化actor)路径都是空的。在调用actorOf()时它将为指定路径分配根据传入Props创建的一个actor化身。actor化身是由路径和一个UID标识的。重新启动只会替换有Props定义的Actor实例,但不会替换化身,因此UID保持不变。

当actor停止时,其化身的生命周期结束。在这一时间点上相关的生命周期事件被调用,监视该actor的actor都会获得终止通知。当化身停止后,路径可以重复使用,通过actorOf()创建一个actor。在这种情况下,除了UID不同外,新化身与老化身是相同的。

ActorRef始终表示化身(路径和UID)而不只是一个给定的路径。因此如果actor停止,并且创建一个新的具有相同名称的actor,则指向老化身的ActorRef将不会指向新的化身。

相对地,ActorSelection指向路径(或多个路径,如果使用了通配符),且完全不关注有没有化身占据它。因此ActorSelection 不能被监视。获取某路径下的当前化身ActorRef是可能的,只要向该ActorSelection发送Identify,如果收到ActorIdentity回应,则正确的引用就包含其中(详见通过Actor Selection确定Actor)。也可以使用ActorSelection的resolveOne方法,它会返回一个包含匹配ActorRef的Future。

  • 使用DeathWatch进行生命周期监控

为了在其它actor终止时 (即永久停止,而不是临时的失败和重启)收到通知,actor可以将自己注册为其它actor在终止时所发布的Terminated消息的接收者(见停止 Actor)。这个服务是由actor系统的DeathWatch组件提供的。

注册一个监视器很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
import akka.actor.{ Actor, Props, Terminated }

class WatchActor extends Actor {
val child = context.actorOf(Props.empty, "child")
context.watch(child) // <-- this is the only call needed for registration
var lastSender = system.deadLetters

def receive = {
case "kill" =>
context.stop(child); lastSender = sender()
case Terminated(`child`) => lastSender ! "finished"
}
}

要注意Terminated消息的产生与注册和终止行为所发生的顺序无关。特别地,即使在注册时,被观察的actor已经终止了,监视actor仍然会受到一个Terminated消息。

多次注册并不表示会有多个消息产生,也不保证有且只有一个这样的消息被接收到:如果被监控的actor已经生成了消息并且已经进入了队列,在这个消息被处理之前又发生了另一次注册,则会有第二个消息进入队列,因为对一个已经终止的actor的监控注册操作会立刻导致Terminated消息的产生。

可以使用context.unwatch(target)来停止对另一个actor生存状态的监控。即使Terminated已经加入邮箱,该操作仍有效;一旦调用unwatch,则被观察的actor的Terminated消息就都不会再被处理。

  • 启动Hook

actor启动后,它的preStart方法会被立即执行。

override def preStart() {
// registering with other actors
someService ! Register(self)
}
在actor第一次创建时,将调用此方法。在重新启动期间,它被postRestart的默认实现调用,这意味着通过重写该方法,你可以选择是仅仅在初始化该actor时调用一次,还是为每次重新启动都调用。actor构造函数中的初始化代码将在每个actor实例创建的时候被调用,这也发生在每次重启时。

  • 重启Hook

所有的actor都是被监管的,即与另一个使用某种失败处理策略的actor绑定在一起。如果在处理一个消息的时候抛出了异常,Actor将被重启(详见监管与监控)。这个重启过程包括上面提到的Hook:

要被重启的actor被通知是通过调用preRestart,包含着导致重启的异常以及触发异常的消息;如果重启并不是因为消息处理而发生的,则所携带的消息为None,例如,当一个监管者没有处理某个异常继而被其监管者重启时,或者因其兄弟节点的失败导致的重启。如果消息可用,则消息的发送者通常也可用(即通过调用sender)。

这个方法是用来完成清理、准备移交给新actor实例等操作的最佳位置。其缺省实现是终止所有子actor并调用postStop。

最初调用actorOf的工厂将被用来创建新的实例。
新的actor的postRestart方法被调用时,将携带着导致重启的异常信息。默认实现中,preStart被调用时,就像一个正常的启动一样。
actor的重启只会替换掉原来的actor对象;重启不影响邮箱的内容,所以对消息的处理将在postRestart hook返回后继续。触发异常的消息不会被重新接收。在actor重启过程中,所有发送到该actor的消息将象平常一样被放进邮箱队列中。

警告
要知道失败通知与用户消息的相关顺序不是决定性的。尤其是,在失败以前收到的最后一条消息被处理之前,父节点可能已经重启其子节点了。详细信息请参见“讨论:消息顺序”。

  • 终止 Hook

一个Actor终止后,其postStop hook将被调用,它可以用来,例如取消该actor在其它服务中的注册。这个hook保证在该actor的消息队列被禁止后才运行,即之后发给该actor的消息将被重定向到ActorSystem的deadLetters中。

3. 通过Actor Selection定位Actor

如Actor引用, 路径与地址中所述,每个actor都拥有一个唯一的逻辑路径,此路径是由从actor系统的根开始的父子链构成;它还拥有一个物理路径,如果监管链包含有远程监管者,此路径可能会与逻辑路径不同。这些路径用来在系统中查找actor,例如,当收到一个远程消息时查找收件者,但是它们更直接的用处在于:actor可以通过指定绝对或相对路径(逻辑的或物理的)来查找其它的actor,并随结果获取一个ActorSelection:

1
2
3
4
// will look up this absolute path
context.actorSelection("/user/serviceA/aggregator")
// will look up sibling beneath same supervisor
context.actorSelection("../joe")

其中指定的路径被解析为一个java.net.URI,它以/分隔成路径段。如果路径以/开始,表示一个绝对路径,且从根监管者(”/user”的父亲)开始查找;否则是从当前actor开始。如果某一个路径段为..,会找到当前所遍历到的actor的上一级,否则则会向下一级寻找具有该名字的子actor。 必须注意的是actor路径中的..总是表示逻辑结构,即其监管者。

一个actor selection的路径元素中可能包含通配符,从而允许向匹配模式的集合广播该条消息:

1
2
3
4
// will look all children to serviceB with names starting with worker
context.actorSelection("/user/serviceB/worker*")
// will look up all siblings beneath same supervisor
context.actorSelection("../*")

消息可以通过ActorSelection发送,并且在投递每条消息时 ActorSelection的路径都会被查找。如果selection不匹配任何actor,则消息将被丢弃。

要获得ActorSelection的ActorRef,你需要发送一条消息到selection,然后使用答复消息的sender()引用即可。有一个内置的Identify消息,所有actor会理解它并自动返回一个包含ActorRef的ActorIdentity消息。此消息被遍历到的actor特殊处理为,如果一个具体的名称查找失败(即一个不含通配符的路径没有对应的活动actor),则会生成一个否定结果。请注意这并不意味着应答消息有到达保证,它仍然是一个普通的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import akka.actor.{ Actor, Props, Identify, ActorIdentity, Terminated }

class Follower extends Actor {
val identifyId = 1
context.actorSelection("/user/another") ! Identify(identifyId)

def receive = {
case ActorIdentity(`identifyId`, Some(ref)) =>
context.watch(ref)
context.become(active(ref))
case ActorIdentity(`identifyId`, None) => context.stop(self)

}

def active(another: ActorRef): Actor.Receive = {
case Terminated(`another`) => context.stop(self)
}
}

你也可以通过ActorSelection的resolveOne方法获取ActorSelection的一个ActorRef。如果存在这样的actor,它将返回一个包含匹配的ActorRef的Future。如果没有这样的actor 存在或识别没有在指定的时间内完成,它将以失败告终——akka.actor.ActorNotFound。

如果开启了远程调用,则远程actor地址也可以被查找:

1
context.actorSelection("akka.tcp://app@otherhost:1234/user/serviceB")

4.发送消息

向actor发送消息需使用下列方法之一。

  • !意思是“fire-and-forget”,即异步发送一个消息并立即返回。也称为tell。
  • ?异步发送一条消息并返回一个Future代表一个可能的回应。也称为ask。
    对每一个消息发送者,分别有消息顺序保证。

注意
使用ask有一些性能内涵,因为需要跟踪超时,需要有桥梁将Promise转为ActorRef,并且需要在远程情况下可访问。所以为了性能应该总选择tell,除非只能选择ask。

Tell: Fire-forget

这是发送消息的推荐方式。 不会阻塞地等待消息。它拥有最好的并发性和可扩展性。

1
actorRef ! message

如果是在一个Actor中调用 ,那么发送方的actor引用会被隐式地作为消息的sender(): ActorRef成员一起发送。目的actor可以使用它来向源actor发送回应, 使用sender() ! replyMsg。

如果不是从Actor实例发送的,sender成员缺省为 deadLetters actor引用。

Ask: Send-And-Receive-Future

ask模式既包含actor也包含future,所以它是一种使用模式,而不是ActorRef的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import akka.pattern.{ ask, pipe }
import system.dispatcher // The ExecutionContext that will be used
case class Result(x: Int, s: String, d: Double)
case object Request

implicit val timeout = Timeout(5 seconds) // needed for `?` below

val f: Future[Result] =
for {
x <- ask(actorA, Request).mapTo[Int] // call pattern directly
s <- (actorB ask Request).mapTo[String] // call by implicit conversion
d <- (actorC ? Request).mapTo[Double] // call by symbolic name
} yield Result(x, s, d)

f pipeTo actorD // .. or ..
pipe(f) to actorD

5.接收消息

Actor必须实现receive方法来接收消息:

protected def receive: PartialFunction[Any, Unit]
这个方法应返回一个PartialFunction,例如一个“match/case”子句,消息可以与其中的不同分支进行scala模式匹配。如下例:

1
2
3
4
5
6
7
8
9
10
11
import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging

class MyActor extends Actor {
val log = Logging(context.system, this)
def receive = {
case "test" => log.info("received test")
case _ => log.info("received unknown message")
}
}

6.终止Actor

通过调用ActorRefFactory(即ActorContext或ActorSystem)的stop方法来终止一个actor。通常context用来终止子actor,而 system用来终止顶级actor。实际的终止操作是异步执行的,即stop可能在actor被终止之前返回。

actor的终止分两步: 第一步actor将挂起对邮箱的处理,并向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止,最后终止自己(调用postStop,清空邮箱,向DeathWatch发布Terminated,通知其监管者)。这个过程保证actor系统中的子树以一种有序的方式终止,将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应(即由于处理消息用了太长时间以至于没有收到终止命令),整个过程将会被阻塞。

在ActorSystem.shutdown()被调用时,系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。

postStop() hook 是在actor被完全终止以后调用的。这是为了清理资源:

1
2
3
override def postStop() {
// clean up some resources ...
}

注意
由于actor的终止是异步的,你不能马上使用你刚刚终止的子actor的名字;这会导致InvalidActorNameException。你应该 监视watch()正在终止的actor,并在Terminated最终到达后作为回应创建它的替代者。

优雅地终止

如果你需要等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import akka.pattern.gracefulStop
import scala.concurrent.Await

try {
val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds, Manager.Shutdown)
Await.result(stopped, 6 seconds)
// the actor has been stopped
} catch {
// the actor wasn't stopped within 5 seconds
case e: akka.pattern.AskTimeoutException =>
}
object Manager {
case object Shutdown
}

class Manager extends Actor {
import Manager._
val worker = context.watch(context.actorOf(Props[Cruncher], "worker"))

def receive = {
case "job" => worker ! "crunch"
case Shutdown =>
worker ! PoisonPill
context become shuttingDown
}

def shuttingDown: Receive = {
case "job" => sender() ! "service unavailable, shutting down"
case Terminated(`worker`) =>
context stop self
}
}

当gracefulStop()成功返回时,actor的postStop() hook将会被执行:在postStop()结束和gracefulStop()返回之间存在happens-before边界。

在上面的示例中自定义的Manager.Shutdown消息是发送到目标actor来启动actor的终止过程。你可以使用PoisonPill,但之后在停止目标actor之前,你与其他actor的互动的机会有限。在postStop中,可以处理简单的清理任务。

警告
请记住,actor停止和其名称被注销是彼此异步发生的独立事件。因此,在gracefulStop()返回后。你会发现其名称仍可能在使用中。为了保证正确注销,只在你控制的监管者内,并且只在响应Terminated消息时重用名称,即不是用于顶级actor。

7.Become/Unbecome

升级

Akka支持在运行时对Actor消息循环(即其实现)进行实时替换:在actor中调用context.become方法。become要求一个PartialFunction[Any, Unit]参数作为新的消息处理实现。 被替换的代码被保存在一个栈中,可以被push和pop。

警告
请注意actor被其监管者重启后将恢复其最初的行为。

8.使用PartialFunction链来扩展actor

有时在一些actor中分享共同的行为,或通过若干小的函数构成一个actor的行为是很有用的。这由于actor的receive方法返回一个Actor.Receive(PartialFunction[Any,Unit]的类型别名)而使之成为可能,多个偏函数可以使用PartialFunction#orElse链接在一起。你可以根据需要链接尽可能多的功能,但是你要牢记”第一个匹配”获胜——这在组合可以处理同一类型的消息的功能时会很重要。

例如,假设你有一组actor是生产者Producers或消费者Consumers,然而有时候需要actor分享这两种行为。这可以很容易实现而无需重复代码,通过提取行为的特质和并将actor的receive实现为这些偏函数的组合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
trait ProducerBehavior {
this: Actor =>

val producerBehavior: Receive = {
case GiveMeThings =>
sender() ! Give("thing")
}
}

trait ConsumerBehavior {
this: Actor with ActorLogging =>

val consumerBehavior: Receive = {
case ref: ActorRef =>
ref ! GiveMeThings

case Give(thing) =>
log.info("Got a thing! It's {}", thing)
}
}

class Producer extends Actor with ProducerBehavior {
def receive = producerBehavior
}

class Consumer extends Actor with ActorLogging with ConsumerBehavior {
def receive = consumerBehavior
}

class ProducerConsumer extends Actor with ActorLogging
with ProducerBehavior with ConsumerBehavior {

def receive = producerBehavior orElse consumerBehavior
}

// protocol
case object GiveMeThings
case class Give(thing: Any)

不同于继承,相同的模式可以通过组合实现——可以简单地通过委托的偏函数组合成receive方法。

Hystrix学习笔记

Posted on 2016-02-03 | In Java |

Hystrix

学习资料:https://github.com/Netflix/Hystrix/wiki

What Is Hystrix For?

Hystrix is designed to do the following:

  • Give protection from and control over latency and failure from dependencies accessed (typically over the network) via third-party client libraries. 让程序具有处理调用外部服务失败的能力
  • Stop cascading failures in a complex distributed system.
  • Fail fast and rapidly recover.
  • Fallback and gracefully degrade when possible.
  • Enable near real-time monitoring, alerting, and operational control.

    Hystrix works by:

  • Preventing any single dependency from using up all container (such as Tomcat) user threads.
  • Shedding load and failing fast instead of queueing.
  • Providing fallbacks wherever feasible to protect users from failure.
  • Using isolation techniques (such as bulkhead, swimlane, and circuit breaker patterns) to limit the impact of any one dependency.
  • Optimizing for time-to-discovery through near real-time metrics, monitoring, and alerting
  • Optimizing for time-to-recovery by means of low latency propagation of configuration changes and support for dynamic property changes in most aspects of Hystrix, which allows you to make real-time operational modifications with low latency feedback loops.
  • Protecting against failures in the entire dependency client execution, not just in the network traffic.

Hystrix使用命令模式HystrixCommand(Command)包装依赖调用逻辑,每个命令在单独线程中/信号授权下执行

Hello World

下面是一个HystrixCommand的简单的“hello world”实现

1
2
3
4
5
6
7
8
9
10
11
12
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() {
// a real example would do work like a network call here
return "Hello " + name + "!";
}
}

###同步执行
Hystrix commands能通过execute()方法调用被同步的执行

1
String s = new CommandHelloWorld("World").execute();

###异步执行
异步执行通过调用queue()方法实现

1
Future<String> fs = new CommandHelloWorld("World").queue();

###响应式执行
响应式执行(异步回调)通过使用observe() 执行

1
Observable<String> fs = new CommandHelloWorld("World").observe();

返回值可以通过订阅Observable获得

1
2
3
4
5
6
fs.subscribe(new Action1<String>() {
@Override
public void call(String s) {
// value emitted here
}
});

Fallback

优美的降级可以通过增加一个getFallback()实现来达到。该方法在各种类型的失败后执行。如: run()方
法调用失败,超时,线程池,信号丢弃以及熔断器短路。

1
2
3
4
@Override
protected String getFallback() {
return "Hello Failure " + name + "!";
}

错误传播

从run()方法中抛出的所以异常(除了HystrixBadRequestException)都被计为异常。将触发getFallback()
和熔断逻辑。在HystrixBadRequestException中抛出的例外,你可以根据你的喜好进行包装,然后通过
getCause()获取。
HystrixBadRequestException设计的使用场景为,报告不合法的参数或非系统性错误。这些都不能计入失
败次数的度量,也不应当触发回退逻辑

Command Name

Command Group

group进行统一管理
command组键名被用于将command分组,如报表,警告,面板或者组包的所以者。
默认情况下,它被用于command的线程池的命名,除非有单独的定义

1
2
3
4
5
6
7
8
private static final Setter cachedSetter = 
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"));

public CommandHelloWorld(String name) {
super(cachedSetter);
this.name = name;
}

Command线程池

线程池的键被用于监控HystrixThreadPool时的呈现,度量的发布,缓存等其它应用。一个
HystrixCommand 是和一个单个的HystrixThreadPool相关联,通过注入它的HystrixThreadPoolKey可以取
得HystrixThreadPool 或者它默认情况下用HystrixCommandGroupKey创建一个。

请求缓存

请求缓存通过实现

```或者```HystrixObservableCommand```中的``` getCacheKey() ```方法完成:依赖于request context 的某些东西,必须实例化HystrixRequestContext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
``` java
public class CommandUsingRequestCache extends HystrixCommand<Boolean> {

private final int value;

protected CommandUsingRequestCache(int value) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.value = value;
}

@Override
protected Boolean run() {
return value == 0 || value % 2 == 0;
}

@Override
protected String getCacheKey() {
return String.valueOf(value);
}
}

请求合并

请求合并是一个特性,它能自动将一批请求合并到单一的HystrixCommand实例中执行。
可以设定批次的大小和时间作为促发器来执行一个批次
两种style的请求合并

  • request-scoped
  • globally-scoped.
    This is configured at collapser construction, and defaulted to request-scoped.

    Request Context Setup

    为了能使用request的scoped特性(请求缓存,请求折叠,请求日记)HystrixRequestContext 的生命周期
    必须被管理起来。(或者一个替代的HystrixConcurrencyStrategy 实现)
    这就意味着下面代码必须在一个请求之前执行
    1
    HystrixRequestContext context = HystrixRequestContext.initializeContext();

然后在请求的最后调用

1
context.shutdown();

常见模式

快速失败 Fail Fast

无声失败 Fail Silent

无声的失败等同于返回一个空的响应或者删除功能,它通过返回null,空的map对象,空的list或者其他类似
的响应实现。 通常通过HystrixCommand实例中的getFallback() 方法实现

回退:静态的 Fallback:Static

一些回退能返回在代码中硬编码的值。它不能引起特性或将被移除服务(如同无声失败经常处理的方
法),但是执行默认的行为逻辑。

Fallback: Stubbed

一个存根回退典型的被用于包含多个字段的一个组合对象被返回时。它们其中的一部分能被其它请求状态
来决定。当其它字段被设置为默认值。

Fallback: Cache via Network

由于回退如果重掉网络可能导致另外的失败,因此需要通过另外的HystrixCommand转换。
另外重要的是,回退command应当在独立的线程池中执行。如果两个command共享相同的线程池,会导
致主command将变的延迟并且占用整个的线程池,从而阻止回退。

主从都失效

enter image description here
通过两个Command进行隔离

##Client Doesn’t Perform Network Access

迁移

enter image description here
to
enter image description here

动态分区说明

Posted on 2015-02-03 | In Hive |

Hive分区

Hive的动态分区

概述

hive中支持两种类型的分区:

  • 静态分区SP(static partition)
  • 动态分区DP(dynamic partition)

静态分区与动态分区的主要区别在于静态分区是手动指定,而动态分区是通过数据来进行判断。详细来说,静态分区的列实在编译时期,通过用户传递来决定的;动态分区只有在SQL执行时才能决定。

动态分区说明

关系型数据库(如Oracle)中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。

按照常规的方法向分区表中插入数据,如果源数据量很大,那么针对一个分区就要写一个insert,非常麻烦,你必须先要知道源数据中都有什么样的数据才能创建分区。
例如:

1
hive> insert overwrite table partition_test partition(stat_date='20110728',province='henan') select member_id,name from partition_test_input where stat_date='20110728' and province='henan';

使用动态分区可以很好的解决上述问题。动态分区可以根据查询得到的数据自动匹配到相应的分区中去。
使用动态分区要先设置hive.exec.dynamic.partition参数值为true,默认值为false,即不允许使用:

1
2
3
4
5
hive> set hive.exec.dynamic.partition;
hive.exec.dynamic.partition=false
hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition;
hive.exec.dynamic.partition=true

动态分区的使用方法很简单,假设我想向stat_date=’20110728’这个分区下面插入数据,至于province插入到哪个子分区下面让数据库自己来判断,那可以这样写:

1
2
3
4
5
6
hive> insert overwrite table partition_test partition(stat_date='20110728',province)
> select member_id,name,province from partition_test_input where stat_date='20110728';
Total MapReduce jobs = 2
...
3 Rows loaded to partition_test
OK

stat_date叫做静态分区列,province叫做动态分区列。select子句中需要把动态分区列按照分区的顺序写出来,静态分区列不用写出来。这样stat_date=’20110728’的所有数据,会根据province的不同分别插入到/user/hive/warehouse/partition_test/stat_date=20110728/下面的不同的子文件夹下,如果源数据对应的province子分区不存在,则会自动创建,非常方便,而且避免了人工控制插入数据与分区的映射关系存在的潜在风险。

  • 注意,动态分区不允许主分区采用动态列而副分区采用静态列,这样将导致所有的主分区都要创建副分区静态列所定义的分区
  • 动态分区可以允许所有的分区列都是动态分区列,但是要首先设置一个参数hive.exec.dynamic.partition.mode :
    1
    2
    hive> set hive.exec.dynamic.partition.mode;
    hive.exec.dynamic.partition.mode=strict

动态分区demo

1
2
3
4
5
6
7
8
SET hive.exec.dynamic.partition=true;  
SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.exec.max.dynamic.partitions.pernode = 1000;
SET hive.exec.max.dynamic.partitions=1000;

INSERT overwrite TABLE t_lxw1234_partitioned PARTITION (month,day)
SELECT url,substr(day,1,7) AS month,day
FROM t_lxw1234;

动态分区参数

使用动态分区需要注意设定以下参数:

  • hive.exec.dynamic.partition
    默认值:false
    是否开启动态分区功能,默认false关闭。
    使用动态分区时候,该参数必须设置成true;

  • hive.exec.dynamic.partition.mode
    默认值:strict
    动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。
    一般需要设置为nonstrict

  • hive.exec.max.dynamic.partitions.pernode
    默认值:100
    在每个执行MR的节点上,最大可以创建多少个动态分区。
    该参数需要根据实际的数据来设定。
    比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。

  • hive.exec.max.dynamic.partitions
    默认值:1000
    在所有执行MR的节点上,最大一共可以创建多少个动态分区。
    同上参数解释。

  • hive.exec.max.created.files
    默认值:100000
    整个MR Job中,最大可以创建多少个HDFS文件。
    一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于100000,可根据实际情况加以调整。

  • hive.error.on.empty.partition
    默认值:false
    当有空分区生成时,是否抛出异常。
    一般不需要设置。

1…1011

Sun Ke

104 posts
21 categories
61 tags
© 2018 Sun Ke
Powered by Hexo
|
Theme — NexT.Pisces v5.1.4