雁渡寒潭 风吹疏竹

教练,我想打篮球


  • Home

  • Tags

  • Categories

  • Archives

HTTP Session 攻击与防护

Posted on 2016-04-01 | In Web |

HTTP Session 攻击与防护

Cookie的基本概念


Cookie 是网站在浏览器中存放的資料,內容包括使用者在网站上的偏好設定、或者是登入的 Session ID。网站利用 Session ID 來辨认访客的身份。

当用户访问某一站点时,浏览器会根据用户访问的站点自动搜索可用的cookie,如果有可用的就随着请求一起发送到了服务端。每次接收到服务端的响应时又会更新本地的cookie信息。

客户端和服务端的这种联系必然是需要有时间的规定的,所以需要定期清除session。这个问题就需要在两方面考虑了,一个是清除服务端session文件,一个是清除客户端的cookie信息,因为两者都各保存着一半的信息。

Cookie 既然存放在 Client 端,那就有被窃取的風險。例如透過 Cross-Site Scripting(跨站腳本共计,又稱 XSS),攻击者可以輕易窃取受害者的 Cookie。如果 Cookie 被偷走了,你的身份就被窃取了。

Session 共计手法有三種:

  • 猜测 Session ID (Session Prediction)
  • 窃取 Session ID (Session Hijacking)
  • 固定 Session ID (Session Fixation)

Session Prediction (猜测 Session ID)

Session ID 如同我們前面所說的,就如同是會員卡的编号。只要知道 Session ID,就可以成為這個使用者。如果 Session ID 的长度、复杂度、杂乱度不夠,就能夠被攻击者猜测。攻击者只要寫程序不斷暴力計算 Session ID,就有機會得到有效的 Session ID 而窃取使用者帳號。

分析 Session ID 的工具可以用以下幾種

OWASP WebScarab
Stompy
Burp Suite

观察 Session ID 的亂數分布,可以了解是否能夠推出規律、猜测有效的 Session ID。

防护措施

使用 Session ID 分析程序進行分析,評估是否无法被預測。如果沒有 100% 的把握自己撰寫的 Session ID 產生機制是安全的,不妨使用內建的 Session ID 產生 function,通常都有一定程度的安全。

Session Hijacking (窃取 Session ID)

窃取 Session ID 是最常見的共计手法。攻击者可以利用多種方式窃取 Cookie 獲取 Session ID:

  • 跨站腳本攻击 (Cross-Site Scripting (XSS)):利用 XSS 漏洞窃取使用者 Cookie
  • 网路窃听:使用 ARP Spoofing 等手法窃听网路封包獲取 Cookie
    透過 Referer 取得:若网站允許 Session ID 使用 URL 传输,便可能從 Referer 取得 Session ID

窃取利用的方式如下圖:

受害者已經登入网站伺服器,並且取得 Session ID,在連線過程中攻击者用窃听的方式獲取受害者 Session ID。攻击者直接使用窃取到的 Session ID 送至伺服器,偽造受害者身分。若伺服器沒有檢查 Session ID 的使用者身分,則可以让攻击者得逞。
enter image description here

enter image description here

防护措施

  • 禁止將 Session ID 使用 URL (GET) 方式來传输
  • 設定加強安全性的 Cookie 属性:HttpOnly (无法被 JavaScript 存取)
  • 設定加強安全性的 Cookie 属性:Secure (只在 HTTPS 传输,若网站无 HTTPS 請勿設定)
    在需要權限的頁面請使用者重新輸入密碼

Session Fixation (固定 Session ID)

攻击者诱使受害者使用特定的 Session ID 登入网站,而攻击者就能取得受害者的身分。

  1. 攻击者從网站取得有效 Session ID
  2. 使用社交工程等手法诱使受害者點選連結,使用該 Session ID 登入网站
  3. 受害者輸入帳號密碼成功登入网站
  4. 攻击者使用該 Session ID,操作受害者的帳號

enter image description here

防护措施

在使用者登入成功後,立即更換 Session ID,防止攻击者操控 Session ID 給予受害者。
禁止將 Session ID 使用 URL (GET) 方式來传输

Session 防护

每個使用者在登入网站的時候,我們可以用每個人特有的識別資訊來確認身分:

  • 來源 IP 位址
  • 瀏覽器 User-Agent

如果在同一個 Session 中,使用者的 IP 或者 User-Agent 改變了,最安全的作法就是把這個 Session 清除,請使用者重新登入。雖然使用者可能因為 IP 更換、Proxy 等因素導致被強制登出,但為了安全性,便利性必須要與之取捨。

除了檢查個人識別資訊來確認是否盜用之外,也可以增加前述的 Session ID 的防护方式:

  • Cookie 設定 Secure Flag (HTTPS)
  • Cookie 設定 HTTP Only Flag
  • 成功登入後立即變更 Session ID
  • Session 的清除機制也非常重要。當伺服器偵測到可疑的使用者 Session 行為時,例如攻击者惡意嘗試偽造 Session ID、使用者 Session 可能遭窃、或者逾時等情況,都應該立刻清除該 Session ID 以免被攻击者利用。

Session 清除机制时机:

  • 侦测到恶意尝试 Session ID
  • 识别咨询无效时
  • 超时

Session和Cookie的区别与联系以及Session的实现原理

Posted on 2016-04-01 | In Web |

Session和Cookie的区别与联系以及Session的实现原理


HTTP链接状态

web应用技术的核心http协议是一个无状态的协议,web应用开发里就出现了保持http链接状态的技术:一个是cookie技术,另一种是session技术。

cookie技术是客户端的解决方案,Cookie就是由服务器发给客户端的特殊信息,而这些信息以文本文件的方式存放在客户端,然后客户端每次向服务器发送请求的时候都会带上这些特殊的信息。让我们说得更具体一些:当用户使用浏览器访问一个支持Cookie的网站的时候,用户会提供包括用户名在内的个人信息并且提交至服务器;接着,服务器在向客户端回传相应的超文本的同时也会发回这些个人信息,当然这些信息并不是存放在HTTP响应体(Response Body)中的,而是存放于HTTP响应头(Response Header);当客户端浏览器接收到来自服务器的响应之后,浏览器会将这些信息存放在一个统一的位置,对于Windows操作系统而言,我们可以从: [系统盘]:\Documents and Settings[用户名]\Cookies目录中找到存储的Cookie;自此,客户端再向服务器发送请求的时候,都会把相应的Cookie再次发回至服务器。而这次,Cookie信息则存放在HTTP请求头(Request Header)了。有了Cookie这样的技术实现,服务器在接收到来自客户端浏览器的请求之后,就能够通过分析存放于请求头的Cookie得到客户端特有的信息,从而动态生成与该客户端相对应的内容。通常,我们可以从很多网站的登录界面中看到“请记住我”这样的选项,如果你勾选了它之后再登录,那么在下一次访问该网站的时候就不需要进行重复而繁琐的登录动作了,而这个功能就是通过Cookie实现的。

session技术则是服务端的解决方案,它是通过服务器来保持状态的。由于Session这个词汇包含的语义很多,因此需要在这里明确一下 Session的含义。首先,我们通常都会把Session翻译成会话,因此我们可以把客户端浏览器与服务器之间一系列交互的动作称为一个 Session。从这个语义出发,我们会提到Session持续的时间,会提到在Session过程中进行了什么操作等等;其次,Session指的是服务器端为客户端所开辟的存储空间,在其中保存的信息就是用于保持状态。从这个语义出发,我们则会提到往Session中存放什么内容,如何根据键值从 Session中获取匹配的内容等。要使用Session,第一步当然是创建Session了。那么Session在何时创建呢?当然还是在服务器端程序运行的过程中创建的,不同语言实现的应用程序有不同创建Session的方法,而在Java中是通过调用HttpServletRequest的getSession方法(使用true作为参数)创建的。在创建了Session的同时,服务器会为该Session生成唯一的Session id,而这个Session id在随后的请求中会被用来重新获得已经创建的Session;在Session被创建之后,就可以调用Session相关的方法往Session中增加内容了,而这些内容只会保存在服务器中,发到客户端的只有Session id;当客户端再次发送请求的时候,会将这个Session id带上,服务器接受到请求之后就会依据Session id找到相应的Session,从而再次使用之。正式这样一个过程,用户的状态也就得以保持了。
由此我们可以得出,session是解决http协议无状态问题的服务端解决方案,它能让客户端和服务端一系列交互动作变成一个完整的事务,能使网站变成一个真正意义上的软件。

cookie与session的关系

cookie和session的方案虽然分别属于客户端和服务端,但是服务端的session的实现对客户端的cookie有依赖关系的,上面我讲到服务端执行session机制时候会生成session的id值,这个id值会发送给客户端,客户端每次请求都会把这个id值放到http请求的头部发送给服务端,而这个id值在客户端会保存下来,保存的容器就是cookie,因此当我们完全禁掉浏览器的cookie的时候,服务端的session也会不能正常使用

在实际运用中session所带来的问题

由上面所描述的session实现机制,我们会发现,为了弥补http协议的无状态的特点,服务端会占用一定的内存和cpu用来存储和处理session计算的开销,这也就是tomcat这个的web容器的并发连接那么低(tomcat官方文档里默认的连接数是200)原因之一。因此很多java语言编写的网站,在生产环境里web容器之前会加一个静态资源服务器,例如:apache服务器或nginx服务器,静态资源服务器没有解决http无状态问题的功能,因此部署静态资源的服务器也就不会让出内存或cpu计算资源专门去处理像session这样的功能,这些内存和cpu资源可以更有效的处理每个http请求,因此静态资源服务器的并发连接数更高,所以我们可以让那些没有状态保持要求的请求直接在静态服务器里处理,而要进行状态保持的请求则在java的web容器里进行处理,这样能更好的提升网站的效率。

当下的互联网网站为了提高网站安全性和并发量,服务端的部署的服务器的数量往往是大于或等于两台,多台服务器对外提供的服务是等价的,但是不同的服务器上面肯定会有不同的web容器,由上面的讲述我们知道session的实现机制都是web容器里内部机制,这就导致一个web容器里所生成的session的id值是不同的,因此当一个请求到了A服务器,浏览器得到响应后,客户端存下的是A服务器上所生成的session的id,当在另一个请求分发到了B服务器,B服务器上的web容器是不能识别这个session的id值,更不会有这个sessionID所对应记录下来的信息,这个时候就需要两个不同web容器之间进行session的同步。Tomcat容器有一个官方的解决方案就是使用apache+tomcat+mod_jk方案,当一个web容器里session的信息发生变化后,该web容器会向另一个web容器进行广播,另一个web收到广播后将session信息同步到自己的容器里,这个过程是十分消耗系统资源,当访问量增加会严重影响到网站的效率和稳定性。

通常会使用session Id进行负载均衡粘性控制,但是这种做法干扰了负载均衡服务器的负载均衡的计算,让请求的分发并不是公平的。

解决session相关问题的技术方案

由上所述,session一共有两个问题需要解决:
1) session的存储应该独立于web容器,也要独立于部署web容器的服务器;
2)如何进行高效的session同步。

在讲到解决这些问题之前,首先考虑下session如何存储才是高效,是存在内存、文件还是数据库了?文件和数据库的存储方式都是将session的数据固化到硬盘上,操作硬盘的方式就是IO,IO操作的效率是远远低于操作内存的数据,因此文件和数据库存储方式是不可取的,所以将session数据存储到内存是最佳的选择。因此最好的解决方案就是使用分布式缓存技术,例如:memcached和redis,将session信息的存储独立出来也是解决session同步问题的方法。

参考:

  • session机制详解以及session的相关应用
  • Session原理和Tomcat实现分析

Java IO

Posted on 2016-03-23 | In Java |

Java IO

流

Java IO流是既可以从中读取,也可以写入到其中的数据流。正如这个系列教程之前提到过的,流通常会与数据源、数据流向目的地相关联,比如文件、网络等等。

流和数组不一样,不能通过索引读写数据。在流中,你也不能像数组那样前后移动读取数据,除非使用RandomAccessFile 处理文件。流仅仅只是一个连续的数据流。

某些类似PushbackInputStream 流的实现允许你将数据重新推回到流中,以便重新读取。然而你只能把有限的数据推回流中,并且你不能像操作数组那样随意读取数据。流中的数据只能够顺序访问。

Java IO流通常是基于字节或者基于字符的。字节流通常以“stream”命名,比如InputStream和OutputStream。除了DataInputStream 和DataOutputStream 还能够读写int, long, float和double类型的值以外,其他流在一个操作时间内只能读取或者写入一个原始字节。

字符流通常以“Reader”或者“Writer”命名。字符流能够读写字符(比如Latin1或者Unicode字符)。可以浏览Java Readers and Writers获取更多关于字符流输入输出的信息。

  • InputStream

java.io.InputStream类是所有Java IO输入流的基类。如果你正在开发一个从流中读取数据的组件,请尝试用InputStream替代任何它的子类(比如FileInputStream)进行开发。这么做能够让你的代码兼容任何类型而非某种确定类型的输入流。

然而仅仅依靠InputStream并不总是可行。如果你需要将读过的数据推回到流中,你必须使用PushbackInputStream,这意味着你的流变量只能是这个类型,否则在代码中就不能调用PushbackInputStream的unread()方法。

通常使用输入流中的read()方法读取数据。read()方法返回一个整数,代表了读取到的字节的内容(译者注:0 ~ 255)。当达到流末尾没有更多数据可以读取的时候,read()方法返回-1。

  • OutputStream

java.io.OutputStream是Java IO中所有输出流的基类。如果你正在开发一个能够将数据写入流中的组件,请尝试使用OutputStream替代它的所有子类。
-组合流

你可以将流整合起来以便实现更高级的输入和输出操作。比如,一次读取一个字节是很慢的,所以可以从磁盘中一次读取一大块数据,然后从读到的数据块中获取字节。为了实现缓冲,可以把InputStream包装到BufferedInputStream中。

字节和字符数组

Java中的字节和字符数组,经常被用于临时存储应用程序内部的数据,所以数组也是常见的数据来源以及数据流目的地。如果你在程序执行过程中需要频繁访问文件的内容,你可能会愿意将文件加载到数组中去。当然你可以通过索引直接访问这些数组。但是如果你有一个组件的设计初衷是从InputStream或者Reader而非数组中读取某些数据呢?

  • 通过InputStream或者Reader读取数组

为了让你的组件能够从数组中读取数据,你需要把字节或者字符数组包装到一个ByteArrayInputStream或者CharArrayReader中。这种方式允许通过包装好的stream或者reader读取数组中的字节或者字符数据。

这是一个简单的示例:

1
2
3
4
5
6
7
8
9
10
11
byte[] bytes = new byte[1024]; 
//write data into byte array...
InputStream input = new ByteArrayInputStream(bytes);
//read first byte
int data = input.read();

while(data != -1) {
//do something with data
//read next byte
data = input.read();
}

  • 通过InputStream或者Reader写入数组

同样可以将数据写入到ByteArrayOutputStream或者CharArrayWriter中。你所需要做的是创建一个ByteArrayOutputStream或者CharArrayWriter,然后写入数据,就像你操作其他类型的stream或者writer一样。当所有的数据都写入完毕,只需调用toByteArray()或者toCharArray(),即可得到写入数据的数组形式。

这是一个简单的示例:

1
2
3
OutputStream output = new ByteArrayOutputStream();
output.write("This text is converted to bytes".toBytes("UTF-8"));
byte[] bytes = output.toByteArray();

操作一个字符数组的代码也与本例类似,只需要将字符数组包装到CharArrayWriter中。

管道

Java IO中的管道为运行在同一个JVM中的两个线程提供了通信的能力。所以管道也可以作为数据源以及目标媒介。

你不能利用管道与不同的JVM中的线程通信(不同的进程)。在概念上,Java的管道不同于Unix/Linux系统中的管道。在Unix/Linux中,运行在不同地址空间的两个进程可以通过管道通信。在Java中,通信的双方应该是运行在同一进程中的不同线程。

  • 通过Java IO创建管道

可以通过Java IO中的PipedOutputStream和PipedInputStream创建管道。一个PipedInputStream流应该和一个PipedOutputStream流相关联。一个线程通过PipedOutputStream写入的数据可以被另一个线程通过相关联的PipedInputStream读取出来。
你也可以使用两个管道共有的connect()方法使之相关联。PipedInputStream和PipedOutputStream都拥有一个可以互相关联的connect()方法。

  • 管道和线程

请记得,当使用两个相关联的管道流时,务必将它们分配给不同的线程。read()方法和write()方法调用时会导致流阻塞,这意味着如果你尝试在一个线程中同时进行读和写,可能会导致线程死锁。

  • 管道的替代

除了管道之外,一个JVM中不同线程之间还有许多通信的方式。实际上,线程在大多数情况下会传递完整的对象信息而非原始的字节数据。但是,如果你需要在线程之间传递字节数据,Java IO的管道是一个不错的选择。

Java中一个线程可以多次start吗?

Posted on 2016-03-23 | In Java |
通过Thread实例的start(),一个Thread的实例只能产生一个线程。
Read more »

zookeeper的功能列表

Posted on 2016-02-11 | In ZooKeeper |

学习资料为:http://ifeve.com/zookeeper-sharedcount/

zookeeper的功能列表

ZooKeeper官方给出了使用zookeeper的几种用途。

  • Leader Election
  • Barriers
  • Queues
  • Locks
  • Two-phased Commit
  • 其它应用如Name Service, Configuration, Group Membership

在实际使用ZooKeeper开发中,我们最常用的是Apache Curator。 它由Netflix公司贡献给Apache。
Curator的主要组件为:

  • Recipes, ZooKeeper的系列recipe实现, 基于 Curator Framework.
  • Framework, 封装了大量ZooKeeper常用API操作,降低了使用难度, 基于Zookeeper增加了一些新特性,对ZooKeeper链接的管理,对链接丢失自动重新链接。
  • Utilities,一些ZooKeeper操作的工具类包括ZK的集群测试工具路径生成等非常有用,在Curator-Client包下org.apache.curator.utils。
  • Client,ZooKeeper的客户端API封装,替代官方 ZooKeeper class,解决了一些繁琐低级的处理,提供一些工具类。
  • Errors,异常处理, 连接异常等
  • Extensions,对curator-recipes的扩展实现,拆分为 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基于RESTful的Recipes WEB服务.

leader选举

  • Leader latch:必须启动LeaderLatch: leaderLatch.start(); 一旦启动, LeaderLatch会和其它使用相同latch path的其它LeaderLatch交涉,然后随机的选择其中一个作为leader。 你可以随时查看一个给定的实例是否是leader。一旦不使用LeaderLatch了,必须调用close方法。 如果它是leader,会释放leadership, 其它的参与者将会选举一个leader。异常处理 LeaderLatch实例可以增加ConnectionStateListener来监听网络连接问题。 当 SUSPENDED 或 LOST 时, leader不再认为自己还是leader.当LOST 连接重连后 RECONNECTED,LeaderLatch会删除先前的ZNode然后重新创建一个. LeaderLatch用户必须考虑导致leadershi丢失的连接问题。 强烈推荐你使用ConnectionStateListener。
  • Leader Election

    • LeaderSelector
    • LeaderSelectorListener
    • LeaderSelectorListenerAdapter
    • CancelLeadershipException

      与LeaderLatch, 通过LeaderSelectorListener可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。 而LeaderLatch一根筋到死, 除非调用close方法,否则它不会释放领导权。

      计算器

  • SharedCount
  • SharedCountReader
  • SharedCountListener
    支持监听,强制set,trySet,leguansuo

    临时节点

    临时节点驻存在ZooKeeper中,当连接和session断掉时被删除。
    PersistentEphemeralNode 节点必须调用start方法启动。 不用时调用close方法。

    分布式Barrier

    分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。
    • DistributedBarrier
    • DistributedDoubleBarrier 双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 类似于百米赛跑

    首先你需要设置栅栏,它将阻塞在它上面等待的线程:
    1
    setBarrier();

然后需要阻塞的线程调用“方法等待放行条件:

1
public void waitOnBarrier()

当条件满足时,移除栅栏,所有等待的线程将继续执行:

1
removeBarrier();

异常处理 DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()方法会抛出异常。

队列

Curator也提供ZK Recipe的分布式队列实现。 利用ZK的 PERSISTENTSEQUENTIAL节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用leader选举只让leader作为唯一的消费者。

但是, 根据Netflix的Curator作者所说, ZooKeeper真心不适合做Queue,或者说ZK没有实现一个好的Queue,详细内容可以看 Tech Note 4, 原因有五:

  • ZK有1MB 的传输限制。 实践中ZNode必须相对较小,而队列包含成千上万的消息,非常的大。
  • 如果有很多节点,ZK启动时相当的慢。 而使用queue会导致好多ZNode. 你需要显著增大 initLimit 和 syncLimit.
  • ZNode很大的时候很难清理。Netflix不得不创建了一个专门的程序做这事。
  • 当很大量的包含成千上万的子节点的ZNode时, ZK的性能变得不好
  • ZK的数据库完全放在内存中。 大量的Queue意味着会占用很多的内存空间。

  • DistributedQueue 队列是与path绑定的
    • QueueBuilder
    • QueueConsumer
    • QueueSerializer
    • DistributedQueue
  • DistributedIdQueue 可以为队列中的每一个元素设置一个ID。 可以通过ID把队列中任意的元素移除
  • DistributedPriorityQueue 优先级队列对队列中的元素按照优先级进行排序。 Priority越小, 元素月靠前, 越先被消费掉。
  • DistributedDelayQueue 元素有个delay值, 消费者隔一段时间才能收到元素。
  • SimpleDistributedQueue 实现类似JDK一样的接口

缓存

利用ZooKeeper在集群的各个节点之间缓存数据。 每个节点都可以得到最新的缓存的数据。 Curator提供了三种类型的缓存方式:Path Cache,Node Cache 和Tree Cache。

  • Path Cache
    Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态。 这也正如它的名字表示的那样, 那监控path。
  • Node Cache
    Path Cache用来监控一个ZNode. 当节点的数据修改或者删除时,Node Cache能更新它的状态包含最新的改变。
  • Tree Node
    这种类型的即可以监控节点的状态,还监控节点的子节点的状态, 类似上面两种cache的组合。 这也就是Tree的概念。 它监控整个树中节点的状态。

##分布式锁

  • 可重入锁Shared Reentrant Lock
    首先我们先看一个全局可重入的锁。 Shared意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant和JDK的ReentrantLock类似, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。
    • InterProcessMutex
  • 不可重入锁Shared Lock
  • 可重入读写锁Shared Reentrant Read Write Lock 一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。 这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。 从读锁升级成写锁是不成的。
  • 信号量Shared Semaphore
    上面说讲的锁都是公平锁(fair)。 总ZooKeeper的角度看, 每个客户端都按照请求的顺序获得锁。 相当公平。
  • 多锁对象 Multi Shared Lock

Multi Shared Lock是一个锁的容器。 当调用acquire, 所有的锁都会被acquire,如果请求失败,所有的锁都会被release。 同样调用release时所有的锁都被release(失败被忽略)。 基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。

#Curator
Framework
事务

Akka学习笔记

Posted on 2016-02-04 | In Akka |

Akka学习笔记

IBM JVM 并发性: 使用 Akka 执行异步操作
IBM JVM 并发性: 使用 Akka 构建 actor 应用程序

Remote Actor介绍
Akka入门编程实践
Akka框架基本要点介绍

Concurrency and Fault Tolerance Made Easy: An Akka Tutorial with Examples 简单介绍Akka的使用,适合入门,包括actor模型,actor通信,容错等

Akka Cluster原理与应用
Akka开发的常用模式
Akka开发指南
Akka2.0官方文档中文版

使用Akka构建集群(一)
使用Akka构建集群(二)

AkkaCluster

Posted on 2016-02-04 | In Akka |

Akka集群原理

Akka集群支持去中心化的基于P2P的集群服务,没有单点故障(SPOF)问题,它主要是通过Gossip协议来实现。对于集群成员的状态,Akka提供了一种故障检测机制,能够自动发现出现故障而离开集群的成员节点,通过事件驱动的方式,将状态传播到整个集群的其它成员节点。

1. 状态转移与故障检测

Akka内部为集群成员定义了一组有限状态(6种状态),并给出了一个状态转移矩阵,代码如下所示:

1
2
3
4
5
6
7
8
9
  private[cluster] val allowedTransitions: Map[MemberStatus, Set[MemberStatus]] =
Map(
Joining -> Set(Up, Down, Removed),
Up -> Set(Leaving, Down, Removed),
Leaving -> Set(Exiting, Down, Removed),
Down -> Set(Removed),
Exiting -> Set(Removed, Down),
Removed -> Set.empty[MemberStatus])
}

Akka集群中的每个成员节点,都有可能处于上面的一种状态,在发生某些事件以后,会发生状态转移。需要注意的是,除了Down和Removed状态以外,节点处于其它任何一个状态时都有可能变成Down状态,即节点故障而无法提供服务,而在变成Down状态之前有一个虚拟的Unreachable状态,因为在Gossip收敛过程中,是无法到达或者经由Unreachable状态的节点,这个状态是由Akka实现的故障探测器(Failure Detector)来检测到的。处于Down状态的节点如果想要再次加入Akka集群,需要重新启动,并进入Joining状态,然后才能进行后续状态的转移变化。Akka集群成员节点状态及其转移情况,如下图所示:

在Akka中,集群中每一个成员节点M会被集群中的其他另一组节点(默认是5个)G监控,这一组节点G并不是整个集群中的其他所有节点,只是整个集群全部节点的一个子集,组G中的节点会检测节点M是否处于Unreachable状态,这是通过发送心跳来确认节点M是否可达,如果不可达则组G中的节点会将节点M的Unreachable状态向集群中组G之外的其它节点传播,最终使得集群中的每个成员节点都知道节点M故障。

2.Akka事件集合

节点状态发生转移会触发某个事件,我们可以根据不同类型的事件来进行相应的处理,为了能够详细捕获到各种事件,我们先看一下Akka定义的事件集合,如图所示:

3.Akka成员角色(Node Role)

Akka支持在每个成员节点加入集群的时候,设置成员自己的角色。通过角色划分,可以将使用Akka集群处理业务的系统划分为多个处理逻辑独立的子系统,每个子系统处理自己的业务逻辑,而且,划分得到的多个子系统都处于一个统一的Akka集群中。因此,每个子系统也具备了Akka集群所具有的特性,如故障检测、状态转移、状态传播等等。

有类型Actor

Posted on 2016-02-04 | In Akka |

有类型Actor

  • 通过接口明确actor可以接收的消息目标和行为。

TypedActor是Akka基于Active对象(Active Object)设计模式的一个实现,关于Active对象模式,可以看维基百科的定义:
Active对象模式解耦了在一个对象上执行方法和调用方法的逻辑,执行方法和调用方法分别在各自的线程执行上下文中。该模式的目标是通过使用异步方法调用和一个调度器来处理请求,从而实现并行计算处理,该模式由6个元素组成:

  • 一个Proxy对象,提供一个面向客户端的接口和一组公共的方法
  • 一个接口,定义了请求一个Active对象上的方法的集合
  • 一个来自客户端请求的列表
  • 一个调度器,确定下一次处理哪一个请求
  • Active对象上方法的实现
  • 一个回调或者变量,供客户端接收请求被处理后的结果
    通过前面对Actor的了解,我们知道Actor更适用于在Akka的Actor系统之间来实现并行计算处理,而TypedActor适用于桥接Actor系统和非Actor系统。TypedActor是基于JDK的Proxy来实现的,与Actor不同的是,Actor一次处理一个消息,而TypedActor一次处理一个调用(Call)。关于更多关于TypedActor,可以查看Akka文档。

有类型Actor由两 “部分” 组成, 一个公开的接口和一个实现, 对普通actor来说,你拥有一个外部API(公开接口的实例)来将方法调用异步地委托给其实现的私有实例。

有类型Actor相对于普通Actor的优势在于有类型Actor拥有静态的契约,你不需要定义你自己的消息;它的劣势在于对你能做什么和不能做什么进行了一些限制,即你不能使用become/unbecome。

有类型Actor是使用JDK Proxies实现的,JDK Proxies提供了非常简单的api来拦截方法调用。

注意
和普通Akka actor一样,有类型actor一次也只处理一个消息。

工具箱

在创建第一个有类型Actor之前,我们先了解一下我们手上可供使用的工具,它位于akka.actor.TypedActor中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import akka.actor.TypedActor

//返回有类型actor扩展
val extension = TypedActor(system) //system是一个Actor系统实例

//判断一个引用是否是有类型actor代理
TypedActor(system).isTypedActor(someReference)

//返回一个外部有类型actor代理所代表的Akka actor
TypedActor(system).getActorRefFor(someReference)

//返回当前的ActorContext,
// 此方法仅在一个TypedActor 实现的方法中有效
val c: ActorContext = TypedActor.context

//返回当前有类型actor的外部代理,
// 此方法仅在一个TypedActor 实现的方法中有效
val s: Squarer = TypedActor.self[Squarer]

//返回一个有类型Actor扩展的上下文实例
//这意味着如果你用它创建其它的有类型actor,它们会成为当前有类型actor的子actor
TypedActor(TypedActor.context)

创建有类型Actor

要创建有类型Actor,需要一个或多个接口,和一个实现。
创建我们的Squarer的有类型actor实例的最简单方法是:

1
2
val mySquarer: Squarer =
TypedActor(system).typedActorOf(TypedProps[SquarerImpl]())

第一个类型是代理的类型,第二个类型是实现的类型。如果要调用某特定的构造方法要这样做:

1
2
3
val otherSquarer: Squarer =
TypedActor(system).typedActorOf(TypedProps(classOf[Squarer],
new SquarerImpl("foo")), "name")

由于你提供了一个 Props, 你可以指定使用哪个派发器, 缺省的超时时间等。

方法派发语义

方法返回:

  • Unit 会以 fire-and-forget语义进行派发,与ActorRef.tell完全一致。
  • akka.dispatch.Future[_] 会以 send-request-reply语义进行派发,与 ActorRef.ask完全一致。
  • scala.Option[]会以send-request-reply语义派发,但是会阻塞等待应答, 如果在超时时限内没有应答则返回scala.None,否则返回包含结果的scala.Some[]。在这个调用中发生的异常将被重新抛出。
  • 任何其它类型的值将以send-request-reply语义进行派发,但会阻塞地等待应答, 如果超时会抛出java.util.concurrent.TimeoutException,如果发生异常则将异常重新抛出。

终止有类型Actor

由于有类型actor底层还是Akka actor,所以在不需要的时候要终止它。

1
2
3
4
5
TypedActor(system).stop(mySquarer)
这将会尽快地异步终止与指定的代理关联的有类型Actor。

TypedActor(system).poisonPill(otherSquarer)
这将会在有类型actor完成所有入队的调用后异步地终止它。

有类型Actor监管树

你可以通过传入一个ActorContext来获得有类型Actor上下文,所以你可以对它调用typedActorOf(..)来创建有类型子actor。

1
2
3
4
5
//Inside your Typed Actor
val childSquarer: Squarer =
TypedActor(TypedActor.context).typedActorOf(TypedProps[SquarerImpl]())
//Use "childSquarer" as a Squarer
通过将ActorContext作为参数传给TypedActor.get(…),也可以为普通的Akka actor创建有类型子actor。

监管策略

通过让你的有类型Actor的具体实现类实现TypedActor.Supervisor方法,你可以定义用来监管子actor的策略,就像监管与监控 和容错(Scala)所描述的。

生命周期回调

通过使你的有类型actor实现类实现以下方法:

  • TypedActor.PreStart
  • TypedActor.PostStop
  • TypedActor.PreRestart
  • TypedActor.PostRestart
    你可以hook进有类型actor的整个生命周期。

接收任意消息

如果你的有类型actor的实现类扩展了akka.actor.TypedActor.Receiver,所有非方法调用MethodCall的消息会被传给onReceive方法.

这使你能够对DeathWatch的Terminated消息及其它类型的消息进行处理,例如,与无类型actor进行交互的场合。

Akka邮箱&路由器&调度器关键点说明

Posted on 2016-02-04 | In Akka |

Akkay邮箱&路由器&调度器关键点说明

Akka 无法保证消息将被传送到目的地。这种无保证传送背后的哲学原理是 Akka 的核心原理之一。
Akka 可以 保证消息最多传送一次,而且绝不会无序地收到从一个 actor 实例发送到另一个 actor 实例的消息。

调度器

Akka MessageDispatcher是维持 Akka Actor “运作”的部分, 可以说它是整个机器的引擎。所有的MessageDispatcher实现也同时也是一个ExecutionContext

缺省派发器

在没有为Actor作配置的情况下,每一个ActorSystem将有一个缺省的派发器。该缺省派发器可以被配置,默认是使用指定的default-executor的一个Dispatcher。如果一个ActorSystem是使用传入的ExecutionContext创建的,则该ExecutionContext将被用作所有派发器的默认执行器(“executor”)。如果没有给定ExecutionContext,则会回退使用akka.actor.default-dispatcher.default-executor.fallback指定的执行器。缺省情况下是使用“fork-join-executor”,它在大多数情况下拥有非常好的性能。

邮箱

一个Akka Mailbox保存发往某个Actor的消息。通常每个Actor都拥有自己的邮箱,但也有例外,例如使用BalancingPool的所有路由子(routee)共享同一个邮箱实例。

为actor指定一个消息队列类型

为某个特定类型的actor指定一个特定类型的消息队列是有可能的,只要通过actor扩展RequiresMessageQueue参数化特质即可。下面是一个示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import akka.dispatch.RequiresMessageQueue
import akka.dispatch.BoundedMessageQueueSemantics

class MyBoundedActor extends MyActor
with RequiresMessageQueue[BoundedMessageQueueSemantics]
RequiresMessageQueue特质的类型参数需要映射到配置中的邮箱,像这样:

bounded-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
}

akka.actor.mailbox.requirements {
"akka.dispatch.BoundedMessageQueueSemantics" = bounded-mailbox
}

如何选择邮箱类型

当一个actor创建时,ActorRefProvider首先确定将执行它的调度器。然后,邮箱确定如下:

  1. 如果actor的部署配置节包含mailbox键,则其描述邮箱类型将被使用。
  2. 如果actor的Props包含邮箱选择——即它调用了withMailbox——则其描述邮箱类型将被使用。
  3. 如果调度器的配置节包含mailbox-type键,则该节内容将用于配置邮箱类型。
  4. 如果该actor需要邮箱类型,如上文所述,然后该约束的映射将用于确定使用的邮箱类型;如果不能满足调度器的约束——如果有的话——将继续替换尝试。
  5. 如果调度器需要一个邮箱类型,如上文所述,则该约束的映射将被用来确定要使用的邮箱类型。
  6. 将使用默认邮箱akka.actor.default-mailbox。

路由

消息可以通过路由器发送,以便有效地将它们路由到目的actor,称为其routee。一个Router可以在actor内部或外部使用,并且你可以自己管理routee或使用有配置功能的自我包含的路由actor。

为了创建路由器,设置指定数量的rountee,需要以下信息
-rounter类型和routee实例的数量

1
ActorRef masterAct = system.actorOf(Props.create(MasterActor.class).withRouter(new RoundRobinPool(5)),"RounterActor");

定义了一个Actor,通过实例化一个路由器实例,这里是RoundRobin,它的构造函数接受一个参数,表示创建rountees的数量.

一个简单的路由器

下面的示例阐释如何使用Router和在actor内管理routee。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import akka.routing.ActorRefRoutee
import akka.routing.Router
import akka.routing.RoundRobinRoutingLogic

class Master extends Actor {
var router = {
val routees = Vector.fill(5) {
val r = context.actorOf(Props[Worker])
context watch r
ActorRefRoutee(r)
}
Router(RoundRobinRoutingLogic(), routees)
}

def receive = {
case w: Work =>
router.route(w, sender())
case Terminated(a) =>
router = router.removeRoutee(a)
val r = context.actorOf(Props[Worker])
context watch r
router = router.addRoutee(r)
}
}

Akka自带的路由逻辑如下:

  • akka.routing.RoundRobinRoutingLogic
  • akka.routing.RandomRoutingLogic
  • akka.routing.SmallestMailboxRoutingLogic
  • akka.routing.BroadcastRoutingLogic
  • akka.routing.ScatterGatherFirstCompletedRoutingLogic
  • akka.routing.TailChoppingRoutingLogic
  • akka.routing.ConsistentHashingRoutingLogic

Router是不可变的,而RoutingLogic是线程安全的;意味着他们也可以在actor外部使用。
路由actor是一种特殊的类型-RounterActorRef。RounterActorRef不是利用存储转发的机制,他直接路由消息到Rountee的邮箱,而不是到Router的邮箱,当routee答复路由消息时,回复将发送到原始发件人,而不是路由actor。

注意
一般情况下,任何发送到路由器的消息将被向前发送到它的routee,但有一个例外。特别地广播消息将发送到路由器下所有的routee

一个路由actor

一个路由器也可以被创建为一个自包含的actor,来管理routee,载入路由逻辑和其他配置设置。

注意: 路由器actor相当于是透明的
路由器也是子actor的监管者。路由器的默认策略是总是上溯,所以错误会传递给路由器的监管者处理。
注意路由器的监管者会将错误当做路由器的错位,因此会重启路由器,并将导致他的孩子全部重启。
如果路由器池的子actor终止,池路由器不会自动产生一个新的actor。在池路由器所有子actor都终止的事件中,路由器将终止本身,除非它是一个动态的路由器,例如使用了大小调整。

AkkaRemote

Posted on 2016-02-04 | In Akka |

Akka远程

Akka remoting是按照端到端(peer-to-peer)对等通信的方式设计的

远程交互的类型

Akka 远程调用有两种方式:

  • 查找 : 使用actorSelection(path)在远程主机上查找一个actor
  • 创建 : 使用actorOf(Props(…), actorName)在远程主机上创建一个actor

查找远程 Actors

actorSelection(path)会获得远程结点上一个Actor的ActorSelection, 例如:

1
2
3
4
5
6
7
8
val selection =
context.actorSelection("akka.tcp://actorSystemName@10.0.0.1:2552/user/actorName")
可以看到以下模式被用来在远程结点上查找一个actor:

akka.<protocol>://<actor system>@<hostname>:<port>/<actor path>
一旦得到了actor的selection,你就可以像与本地actor通讯一样与它进行通迅, 例如:

selection ! "Pretty awesome feature"

用代码进行远程部署

要允许动态部署系统,也可以在用来创建actor的Props中包含deployment配置 : 这一部分信息与配置文件中的deployment部分是等价的, 如果两者都有,则外部配置拥有更高的优先级.

1
2
3
4
5
6
7
8
9
10
11
12
加入这些import:

import akka.actor.{ Props, Deploy, Address, AddressFromURIString }
import akka.remote.RemoteScope
和一个像这样的远程地址:

val one = AddressFromURIString("akka.tcp://sys@host:1234")
val two = Address("akka.tcp", "sys", "host", 1234) // this gives the same
你可以像这样建议系统在此远程结点上创建一个子actor:

val ref = system.actorOf(Props[SampleActor].
withDeploy(Deploy(scope = RemoteScope(address))))

序列化

对actor使用远程调用时,你必须保证这些actor所使用的props和messages是可序列化的. 如果不能保证会导致系统产生意料之外的行为.
配置

  1. 为了让 Akka 知道对什么任务使用哪个Serializer, 你需要编辑你的 配置文件, 在 “akka.actor.serializers”一节将名称绑定为akka.serialization.Serializer的实现

  2. 在将名称与Serializer的不同实现绑定后,你需要指定哪些类的序列化使用哪种Serializer, 这部分配置写在“akka.actor.serialization-bindings”一节中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    akka {
    actor {
    serializers {
    java = "akka.serialization.JavaSerializer"
    proto = "akka.remote.serialization.ProtobufSerializer"
    myown = "docs.serialization.MyOwnSerializer"
    }

    serialization-bindings {
    "java.lang.String" = java
    "docs.serialization.Customer" = java
    "com.google.protobuf.Message" = proto
    "docs.serialization.MyOwnSerializable" = myown
    "java.lang.Boolean" = myown
    }
    }
    }

自定义 创建新的 Serializer

首先你需要为你的 Serializer 写一个类定义,像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import akka.actor.{ ActorRef, ActorSystem }
import akka.serialization._
import com.typesafe.config.ConfigFactory

class MyOwnSerializer extends Serializer {

// This is whether "fromBinary" requires a "clazz" or not
def includeManifest: Boolean = false

// Pick a unique identifier for your Serializer,
// you've got a couple of billions to choose from,
// 0 - 16 is reserved by Akka itself
def identifier = 1234567

// "toBinary" serializes the given object to an Array of Bytes
def toBinary(obj: AnyRef): Array[Byte] = {
// Put the code that serializes the object here
// ... ...
}

// "fromBinary" deserializes the given array,
// using the type hint (if any, see "includeManifest" above)
// into the optionally provided classLoader.
def fromBinary(bytes: Array[Byte],
clazz: Option[Class[_]]): AnyRef = {
// Put your code that deserializes here
// ... ...
}
}

远程事件

可以监听Akka远程调用中发生的事件,也可以订阅/取消订阅这些事情,你只需要在ActorSystem.eventStream中为下面所列出类型的事件注册监听器.

注意
若要订阅任意远程事件,订阅RemotingLifecycleEvent。若要订阅只涉及链接的生命周期的事件,请订阅akka.remote.AssociationEvent。

注意
使用”链接”而不是”连接”一词,反映了远程处理子系统可能使用无连接传输,但链接类似于运输层连接,来维持点到点之间的Akka协议。

远程安全

Akka提供了几种方式来加强远程节点(客户端/服务器)之间的安全:

  • 不受信任的模式
  • 安全 Cookie 握手
  1. 配置为不受信任模式的系统通过远程处理层传入的以下操作将被忽略:
    • 远程部署 (这也意味着没有远程监控)
    • 远程DeathWatch
    • system.stop(), PoisonPill, Kill
    • 发送任何继承自PossiblyHarmful标记接口的消息,包括Terminated
    • 通过actor selection发送的消息,除非目标定义在trusted-selection-paths中。
  2. 安全 Cookie 握手
    Akka远程处理还允许你指定一个安全cookie,它将被交换并确保在客户端和服务器之间的连接握手中是相同的。如果他们不相同,则客户端将被拒绝连接到服务器。
1…91011

Sun Ke

104 posts
21 categories
61 tags
© 2018 Sun Ke
Powered by Hexo
|
Theme — NexT.Pisces v5.1.4