雁渡寒潭 风吹疏竹

教练,我想打篮球


  • Home

  • Tags

  • Categories

  • Archives

HFTP文件系统解析

Posted on 2017-01-05 | In HDFS |

初探

抽象类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

| 文件系统 | URI前缀 | hadoop的具体实现类 |
| --- | --- | --- |
| Local | file | fs.LocalFileSystem |
| HDFS | hdfs | hdfs.DistributedFileSystem |
| HFTP | hftp | hdfs.HftpFileSystem |
| HSFTP | hsftp | hdfs.HsftpFileSystem |
| HAR | har | fs.HarFileSystem |
| KFS | kfs | fs.kfs.KosmosFileSystem |
| FTP | ftp | fs.ftp.FTPFileSystem |
| S3 (native) | s3n | s.s3native.NativeS3FileSystem |
| S3 (blockbased) | s3 | fs.s3.S3FileSystem |

Hadoop提供了很多接口来访问这些文件系统,最常用的是通过URI前缀来访问正确的文件系统。比如:
> hadoop fs -ls file:///.......
> hadoop fs -ls hdfs:///.......

虽然理论上MapReduce可以使用上面这些系统,但是如果我们处理海量数据的话还是要选用一个分布式文件系统hdfs或者kfs。


## 配置
hadoop-default.xml关于filesystem实现的配置```hadoop-default.xml```:
``` xml
<property>
<name>fs.file.impl</name>
<value>org.apache.hadoop.fs.LocalFileSystem</value>
<description>The FileSystem for file: uris.</description>
</property>

<property>
<name>fs.fms.impl</name>
<value>org.apache.hadoop.hdfs.FMSFileSystem</value>
<description>The FileSystem for hdfs: uris.</description>
</property>

<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.hdfs.DistributedFileSystem</value>
<description>The FileSystem for hdfs: uris.</description>
</property>

spark-client关于filesystem实现的配置

```:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
``` xml
<property>
<name>fs.file.impl</name>
<value>org.apache.hadoop.fs.LocalFileSystem</value>
<description>The FileSystem for file: uris.</description>
</property>

<property>
<name>fs.hdfs.impl</name>
<value>org.apache.hadoop.fs.DFileSystem</value>
</property>

<property>
<name>fs.webhdfs.impl</name>
<value>org.apache.hadoop.smw.hdfs.web.WebHdfsFileSystem</value>
</property>

HFTP介绍

HFTP 是hadoop文件系统用来让你从一个远程的hadoop HDFS集群中读取数据的组件。这个读取是通过HTTP,并且数据源是DataNodes。HFTP是一个只读的文件系统,当你试图用来写入数据或者修改文件系统状态时,会抛出异常。

HFTP 主要的帮助在有多个HDFS集群,并存在多个版本时,将数据从一个集群迁移到另一个。HFTP 在不同版本的HDFS中是兼容写的。你可以操作例如:

hadoop distcp -i hftp://sourceFS:50070/src hdfs://destFS:8020/dest

注意HFTP是只读的,所以目标端必须是HDFS文件系统。(在这个例子中,distcp会使用新文件系统的配置运行。)

另外,HSFTP,默认使用HTTPS。这意味着数据在传输的时候会被加密。
https://hadoop.apache.org/docs/r2.6.3/hadoop-project-dist/hadoop-hdfs/Hftp.html

实现

HFTP的代码在java 类org.apache.hadoop.hdfs.HftpFileSystem 中。同样的,HSFTP也在org.apache.hadoop.hdfs.HsftpFileSystem中实现.

SparkListener机制详解

Posted on 2017-01-02 |

Spark源码注释中有下面一句话:

Asynchronously passes SparkListenerEvents to registered SparkListeners

即所有spark消息SparkListenerEvents 被异步的发送给已经注册过的SparkListeners.
在SparkContext中, 首先会创建LiveListenerBus实例,这个类主要功能如下:

  • 保存所有消息队列,负责消息的缓存
  • 保存所有注册过的listener,负责消息的分发

listener链表保存在ListenerBus类中,为了保证并发访问的安全性,此处采用Java的CopyOnWriteArrayList类来存储listener. 当需要对listener链表进行更改时,CopyOnWriteArrayList的特性使得会先复制整个链表,然后在复制的链表上面进行修改.当一旦获得链表的迭代器,在迭代器的生命周期中,可以保证数据的一致性.

消息队列实际上是保存在类AsynchronousListenerBus中的:

1
2
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[E](EVENT_QUEUE_CAPACITY)

事件队列的长度为10000,当缓存事件数量达到上限后,新来的事件会被丢弃,

在SparkContext中,会

  • 创建LiveListenerBus类类型的成员变量listenerBus
  • 创建各种listener,并加入到listenerBus中
  • post一些事件到listenerBus中
  • 调用listenerBus.start() 来启动事件处理程序

这里有一点需要注意的是, 在listenerBus.start() 调用之前, 可以向其中post消息, 这些消息会被缓存起来,等start函数调用之后, 消费者线程会分发这些缓存的消息.

使用RCU技术实现读写线程无锁

Posted on 2016-12-23 | In 多线程 |

在一个系统中有一个写线程和若干个读线程,读写线程通过一个指针共用了一个数据结构,写线程改写这个结构,读线程读取该结构。在写线程改写这个数据结构的过程中,加锁情况下读线程由于等待锁耗时会增加。

可以利用RCU (Read Copy Update What is rcu)的思想来去除这个锁。

RCU

RCU可以说是一种替代读写锁的方法。其基于一个事实:当写线程在改变一个指针时,读线程获取这个指针,要么获取到老的值,要么获取到新的值。RCU的基本思想其实很简单,参考What is RCU中Toy implementation可以很容易理解。一种简单的RCU流程可以描述为:

写线程:

1
2
3
4
5
old_ptr = _ptr
tmp_ptr = copy(_ptr) // copy
change(tmp_ptr) // change
_ptr = tmp_ptr // update
synchroize(tmp_ptr)

写线程要更新_ptr指向的内容时,先复制一份新的,基于新的进行改变,更新_ptr指针,最后同步释放老的内存。

读线程:

1
2
3
tmp_ptr = _ptr
use(tmp_ptr)
dereference(tmp_ptr)

读线程直接使用_ptr,使用完后需要告诉写线程自己不再使用_ptr。读线程获取_ptr时,可能会获取到老的也可能获取到新的,无论哪种RCU都需要保证这块内存是有效的。重点在synchroize和dereference。synchroize会等待所有使用老的_ptr的线程dereference,对于新的_ptr使用者其不需要等待。这个问题说白了就是写线程如何知道old_ptr没有任何读线程在使用,可以安全地释放。

这个问题实际上在wait-free的各种实现中有好些解法,how-when-to-release-memory-in-wait-free-algorithms这里有人总结了几种方法,例如Hazard pointers、Quiescence period based reclamation。

简单地使用引用计数智能指针是无法解决这个问题的,因为智能指针自己不是线程安全的,例如:

1
2
3
4
tmp_ptr = _ptr      // 1
tmp_ptr->addRef() // 2
use
tmp_ptr->release()

代码1/2行不是原子的,所以当取得tmp_ptr准备addRef时,tmp_ptr可能刚好被释放了。

Quiescence period based reclamation方法指的是读线程需要声明自己处于Quiescence period,也就是不使用_ptr的时候,当其使用_ptr的时候实际是进入了一个逻辑上的临界区,当所有读线程都不再使用_ptr的时候,写线程就可以对内存进行安全地释放。

本文正是描述了一种Quiescence period based reclamation实现。这个实现可以用于有一个写线程和多个读线程共用若干个数据的场景。

大数据下的DistinctCount

Posted on 2016-12-23 | In Spark |

在数据库中,常常会有Distinct Count的操作,比如,查看每一选修课程的人数:

1
2
3
select course, count(distinct sid)
from stu_table
group by course;

Hive

在大数据场景下,报表很重要一项是UV(Unique Visitor)统计,即某时间段内用户人数。例如,查看一周内app的用户分布情况,Hive中写HiveQL实现:

1
2
3
4
5
select app, count(distinct uid) as uv
from log_table
where week_cal = '2016-03-27'
order by uv desc
limit 20

Pig

大部分情况下,Hive的执行效率偏低,我更为偏爱Pig:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- all users define DISTINCT_COUNT(A, a) returns dist {
B = foreach $A generate $a;
unique_B = distinct B;
C = group unique_B all;
$dist = foreach C generate SIZE(unique_B);
}
A = load '/path/to/data' using PigStorage() as (app, uid);
B = DISTINCT_COUNT(A, uid);

-- <app, users>
A = load '/path/to/data' using PigStorage() as (app, uid);
B = distinct A;
C = group B by app;
D = foreach C generate group as app, COUNT($1) as uv;
-- or
D = foreach C generate group as app, SIZE($1) as uv;

DataFu 为pig提供基数估计的UDF

Count:
1
2
3
4
5
```sql
define HyperLogLogPlusPlus datafu.pig.stats.HyperLogLogPlusPlus();
A = load '/path/to/data' using PigStorage() as (app, uid);
B = group A by app;
C = foreach B generate group as app, HyperLogLogPlusPlus($1) as uv;

Spark

在Spark中,Load数据后通过RDD一系列的转换——map、distinct、reduceByKey进行Distinct Count:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
rdd.map { row => (row.app, row.uid) }
.distinct()
.map { line => (line._1, 1) }
.reduceByKey(_ + _)

// or
rdd.map { row => (row.app, row.uid) }
.distinct()
.mapValues{ _ => 1 }
.reduceByKey(_ + _)

// or
rdd.map { row => (row.app, row.uid) }
.distinct()
.map(_._1)
.countByValue()

同时,Spark提供近似Distinct Count的API:

1
2
rdd.map { row => (row.app, row.uid) }
.countApproxDistinctByKey(0.001)

实现是基于HyperLogLog算法:

The algorithm used is based on streamlib’s implementation of “HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm”, available here.

或者,将Schema化的RDD转成DataFrame后,registerTempTable然后执行sql命令亦可:

1
2
3
4
5
val sqlContext = new SQLContext(sc)
val df = rdd.toDF()
df.registerTempTable("app_table")

val appUsers = sqlContext.sql("select app, count(distinct uid) as uv from app_table group by app")

Bitmap

Bitmap介绍

《编程珠玑》上是这样介绍bitmap的:

Bitmap是一个十分有用的数据结构。所谓的Bitmap就是用一个bit位来标记某个元素对应的Value,而Key即是该元素。由于采用了Bit为单位来存储数据,因此在内存占用方面,可以大大节省。

简而言之——用一个bit(0或1)表示某元素是否出现过,其在bitmap的位置对应于其index。《编程珠玑》给出了一个用bitmap做排序的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/* Copyright (C) 1999 Lucent Technologies */
/* From 'Programming Pearls' by Jon Bentley */
/* bitsort.c -- bitmap sort from Column 1 * Sort distinct integers in the range [0..N-1] */
#include <stdio.h>

#define BITSPERWORD 32
#define SHIFT 5
#define MASK 0x1F
#define N 10000000
int a[1 + N / BITSPERWORD];

void set(int i) { a[i >> SHIFT] |= (1 << (i & MASK)); }

void clr(int i) { a[i >> SHIFT] &= ~(1 << (i & MASK)); }

int test(int i) { return a[i >> SHIFT] & (1 << (i & MASK)); }

int main() {
int i;
for (i = 0; i < N; i++)
clr(i);
/* Replace above 2 lines with below 3 for word-parallel init int top = 1 + N/BITSPERWORD; for (i = 0; i < top; i++) a[i] = 0; */
while (scanf("%d", &i) != EOF)
set(i);
for (i = 0; i < N; i++)
if (test(i))
printf("%d\n", i);
return 0;
}

上面代码中,用int的数组存储bitmap,对于每一个待排序的int数,其对应的index为其int值。

Distinct Count优化

  1. index生成

为了使用bitmap做Distinct Count,首先需得到每个用户(uid)对应(在bitmap中)的index。有两种办法可以得到从1开始编号index表(与uid一一对应):

  • hash,但是要找到无碰撞且hash值均匀分布[1, +∞)区间的hash函数是非常困难的;
    维护一张uid与index之间的映射表,并增量更新
  • 比较两种方法,第二种方法更为简单可行。
  1. UV计算

在index生成完成后,RDD[(uid, V)]与RDD[(uid, index)]join得到index化的RDD。bitmap的开源实现有EWAH,采用RLE(Run Length Encoding)压缩,很好地解决了存储空间的浪费。Distinct Count计算转变成了求bitmap中1的个数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// distinct count for rdd(not pair) and the rdd must be sorted in each partition
// 计算独立值的count
def distinctCount(rdd: RDD[Int]): Int = {
val bitmap = rdd.aggregate[EWAHCompressedBitmap](new EWAHCompressedBitmap())(
(u: EWAHCompressedBitmap, v: Int) => {
u.set(v)
u
},
(u1: EWAHCompressedBitmap, u2: EWAHCompressedBitmap) => u1.or(u2)
)
bitmap.cardinality()
}

// the tuple_2 is the index
// 计算每个值对应的count
def groupCount[K: ClassTag](rdd: RDD[(K, Int)]): RDD[(K, Int)] = {
val grouped: RDD[(K, EWAHCompressedBitmap)] = rdd.combineByKey[EWAHCompressedBitmap](
(v: Int) => EWAHCompressedBitmap.bitmapOf(v),
(c: EWAHCompressedBitmap, v: Int) => {
c.set(v)
c
},
(c1: EWAHCompressedBitmap, c2: EWAHCompressedBitmap) => c1.or(c2))
grouped.map(t => (t._1, t._2.cardinality()))
}

但是,在上述计算中,由于EWAHCompressedBitmap的set方法要求int值是升序的,也就是说RDD的每一个partition的index应是升序排列:

1
2
3
4
5
6
// sort pair RDD by value
def sortPairRDD[K](rdd: RDD[(K, Int)]): RDD[(K, Int)] = {
rdd.mapPartitions(iter => {
iter.toArray.sortWith((x, y) => x._2.compare(y._2) < 0).iterator
})
}

为了避免排序,可以为每一个uid生成一个bitmap,然后在Distinct Count时将bitmap进行or运算亦可:

1
2
rdd.reduceByKey(_ or _)
.mapValues(_._2.cardinality())

Scala的类型和反射机制

Posted on 2016-11-29 | In Scala |

Scala的反射机制

  1. Manifest & ClassManifest
    Manifest是在编译时捕捉的,编码了“捕捉时”所致的类型信息。然后就可以在运行时检查和使用类型信息,但是manifest只能捕捉当Manifest被查找时在隐式作用域里的类型。

    1
    def first[A : ClassManifest](x:Array[A]) = Array(x(0))
  2. ClassTag & TypeTag

  • ClassTag[T]保存了被泛型擦除后的原始类型T,提供给运行时的

    1
    2
    scala> def mkArray[T : ClassTag](elems: T*) = Array[T](elems: _*)
    mkArray: [T](elems: T*)(implicit evidence$1: scala.reflect.ClassTag[T])Array[T]
  • TypeTag则保存所有具体的类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    import scala.reflect.runtime.universe._
    def paramInfo[T](x: T)(implicit tag: TypeTag[T]): Unit = {
    val targs = tag.tpe match { case TypeRef(_, _, args) => args }
    println(s"type of $x has type arguments $targs")
    }
    scala> paramInfo(42)
    type of 42 has type arguments List()
    scala> paramInfo(List(1, 2))
    type of List(1, 2) has type arguments List(Int)

可以看到,获取到的类型是具体的类型,而不是被擦除后的类型List(Any)

scala在2.10里却用TypeTag替代了Manifest,用ClassTag替代了ClassManifest,原因是在路径依赖类型中,Manifest存在问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> class Foo{class Bar}

scala> def m(f: Foo)(b: f.Bar)(implicit ev: Manifest[f.Bar]) = ev

scala> val f1 = new Foo;val b1 = new f1.Bar
scala> val f2 = new Foo;val b2 = new f2.Bar

scala> val ev1 = m(f1)(b1)
ev1: Manifest[f1.Bar] = Foo@681e731c.type#Foo$Bar

scala> val ev2 = m(f2)(b2)
ev2: Manifest[f2.Bar] = Foo@3e50039c.type#Foo$Bar

scala> ev1 == ev2 // they should be different, thus the result is wrong
res28: Boolean = true

ev1 不应该等于 ev2 的,因为其依赖路径(外部实例)是不一样的。
还有其他因素,所以在2.10版本里,使用 TypeTag 替代了 Manifest
TypeTag:由编辑器生成,只能通过隐式参数或者上下文绑定获取
可以有两种方式获取:

1
2
3
4
5
6
7
8
9
10
11
12
scala> import scala.reflect.runtime.universe._
import scala.reflect.runtime.universe._

//使用typeTag
scala> def getTypeTag[T:TypeTag](a:T) = typeTag[T]
getTypeTag: [T](a: T)(implicit evidence$1: reflect.runtime.universe.TypeTag[T])reflect.runtime.universe.TypeTag[T]

//使用implicitly 等价的
//scala>def getTypeTag[T:TypeTag](a:T) = implicitly[TypeTag[T]]

scala> getTypeTag(List(1,2,3))
res0: reflect.runtime.universe.TypeTag[List[Int]] = TypeTag[List[Int]]

通过TypeTag的tpe方法获得需要的Type(如果不是从对象换取Type 而是从class中获得 可以直接用 typeOf[类名])

  1. 反射获取TypeTag和ClassTag
    JavaCodeExample
    String———->Calss————–>Manifest——–>TypeTag
    Class.forName ManifestFactory.classType scala.reflect.runtime.universe.manifestToTypeTag

    1
    2
    3
    4
    5
    6
    7
    import scala.reflect.runtime.universe
    import scala.reflect.ManifestFactory

    val className = "java.lang.String"
    val mirror = universe.runtimeMirror(getClass.getClassLoader)
    val cls = Class.forName(className)
    val t = universe.manifestToTypeTag(mirror, ManifestFactory.classType(cls))
  2. classOf与getClass方法的差异

getClass 方法得到的是 Class[A]的某个子类,而 classOf[A] 得到是正确的 Class[A],但是去比较的话,这两个类型是equals为true的

classOf获取运行时的类型。classOf[T] 相当于 java中的T.class; 而getClass:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
val listClass = classOf[List[_]]
* // listClass is java.lang.Class[List[_]] = class scala.collection.immutable.List
val mapIntString = classOf[Map[Int,String]]
* // mapIntString is java.lang.Class[Map[Int,String]] = interface scala.collection.immutable.Map
* }}}

scala> class A
scala> val a = new A

scala> a.getClass
res2: Class[_ <: A] = class A

scala> classOf[A]
res3: Class[A] = class A

scala> a.getClass == classOf[A]
res13: Boolean = true

这种细微的差别,体现在类型赋值时,因为java里的 Class[T]是不支持协变的,所以无法把一个 Class[_ < : A] 赋值给一个 Class[A]

1
2
scala> val c:Class[A] = a.getClass
<console>:9: error: type mismatch;

类(class)与类型(type)是两个不一样的概念

(在java里因为早期一直使用class表达type,并且现在也延续这样的习惯);类型(type)比类(class)更”具体”,任何数据都有类型。类是面向对象系统里对同一类数据的抽象,在没有泛型之前,类型系统不存在高阶概念,直接与类一一映射,而泛型出现之后,就不在一一映射了。比如定义class List[T] {}, 可以有List[Int] 和 List[String]等具体类型,它们的类是同一个List,但类型则根据不同的构造参数类型而不同。

类型一致的对象它们的类也是一致的,反过来,类一致的,其类型不一定一致。

1
2
3
4
5
scala> classOf[List[Int]] == classOf[List[String]]
res16: Boolean = true

scala> typeOf[List[Int]] == typeOf[List[String]]
res17: Boolean = false

JVM源码分析系列

Posted on 2016-11-29 | In Java |

JVM源码分析系列

  1. 不保证顺序的Class.getMethods
    JVM源码分析之不保证顺序的Class.getMethods

  2. Metaspace
    JVM源码分析之Metaspace解密

metaspace,顾名思义,元数据空间,专门用来存元数据的,它是jdk8里特有的数据结构用来替代perm

  • 为什么会有metaspace
    如果perm设置太小了,系统运行过程中就容易出现内存溢出,设置大了又总感觉浪费,尽管不会实质分配这么大的物理内存。基于这么一个可能的原因,于是metaspace出现了,希望内存的管理不再受到限制,也不要怎么关注元数据这块的OOM问题
  • metaspace的组成
    • Klass Metaspace Klass Metaspace就是用来存klass的,klass是我们熟知的class文件在jvm里的运行时数据结构,不过有点要提的是我们看到的类似A.class其实是存在heap里的,是java.lang.Class的一个对象实例。
    • NoKlass Metaspace NoKlass Metaspace专门来存klass相关的其他的内容,比如method,constantPool等,这块内存是由多块内存组合起来的,所以可以认为是不连续的内存块组成的。

Klass Metaspace和NoKlass Mestaspace都是所有classloader共享的,所以类加载器们要分配内存,但是每个类加载器都有一个SpaceManager,来管理属于这个类加载的内存小块。

  1. JVM源码分析之String.intern()导致的YGC不断变长
    JVM源码分析之String.intern()导致的YGC不断变长

在JVM里存在一个叫做StringTable的数据结构,这个数据结构是一个Hashtable,在我们调用String.intern的时候其实就是先去这个StringTable里查找是否存在一个同名的项,如果存在就直接返回对应的对象,否则就往这个table里插入一项,指向这个String对象,那么再下次通过intern再来访问同名的String对象的时候,就会返回上次插入的这一项指向的String对象

YGC的时间长短和扫描StringTable有关,如果StringTable非常庞大,那YGC过程扫描的时间也会变长

YGC过程不会对StringTable做清理,这也就是我们demo里的情况会让Stringtable越来越大,因为到目前为止还只看到YGC过程,但是在Full GC或者CMS GC过程会对StringTable做清理

  1. 如何定位消耗CPU最多的线程

步骤

- 使用top -Hp <pid> 查看进程所有线程的CPU消耗情况

- 使用jstack <pid> 查看各个线程栈

5.JVM源码分析之Object.wait/notify(All)完全解读

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
new Thread(){
public void run(){
synchronized(lock) {
try {
lock.wait();
} catch(InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
new Thread(){
public void run(){
synchronized(lock) {
try {
lock.notify();
}
}
}
}.start();
  • 为何要加synchronized锁
    从实现上来说,这个锁至关重要,正因为这把锁,才能让整个wait/notify玩转起来
  • wait方法执行后未退出同步块,其他线程如何进入同步块
    因为在wait处理过程中会临时释放同步锁,不过需要注意的是当某个线程调用notify唤起了这个线程的时候,在wait方法退出之前会重新获取这把锁,只有获取了这把锁才会继续执行
  • 为什么wait方法可能抛出InterruptedException异常
    这个异常大家应该都知道,当我们调用了某个线程的interrupt方法时,对应的线程会抛出这个异常,wait方法也不希望破坏这种规则,因此就算当前线程因为wait一直在阻塞,当某个线程希望它起来继续执行的时候,它还是得从阻塞态恢复过来,因此wait方法被唤醒起来的时候会去检测这个状态,当有线程interrupt了它的时候,它就会抛出这个异常从阻塞状态恢复过来。
  • 被notify(All)的线程有规律吗
    这里要分情况: 如果是通过notify来唤起的线程,那先进入wait的线程会先被唤起来 如果是通过nootifyAll唤起的线程,默认情况是最后进入的会先被唤起来,即LIFO的策略
  • notify执行之后立马唤醒线程吗
    hotspot里真正的实现是退出同步块的时候才会去真正唤醒对应的线程,
  • notifyAll是怎么实现全唤起的
    以notifyAll的实现是调用notify的线程在退出其同步块的时候唤醒起最后一个进入wait状态的线程,然后这个线程退出同步块的时候继续唤醒其倒数第二个进入wait状态的线程,依次类推
  • wait的线程是否会影响load
    当线程进入到wait状态的时候其实是会放弃cpu的,也就是说这类线程是不会占用cpu资源
  1. JVM源码分析之FinalReference完全解读

override finalizer的类对象称为f对象。

Finalizer的客观评价

  • Finalizer其实是实现了析构函数的概念,我们在对象被回收前可以执行一些『收拾性』的逻辑,应该说是一个特殊场景的补充,但是这种概念的实现给我们的f对象生命周期以及gc等带来了一些影响:
  • f对象因为Finalizer的引用而变成了一个临时的强引用,即使没有其他的强引用了,还是无法立即被回收
    f对象至少经历两次GC才能被回收,因为只有在FinalizerThread执行完了f对象的finalize方法的情况下才有可能被下次gc回收,而有可能期间已经经历过多次gc了,但是一直还没执行f对象的finalize方法
  • cpu资源比较稀缺的情况下FinalizerThread线程有可能因为优先级比较低而延迟执行f对象的finalize方法
  • 因为f对象的finalize方法迟迟没有执行,有可能会导致大部分f对象进入到old分代,此时容易引发old分代的gc,甚至fullgc,gc暂停时间明显变长
  • f对象的finalize方法被调用了,但是这个对象其实还并没有被回收,虽然可能在不久的将来会被回收
  1. 不可逆的类初始化过程
    定义一个类的时候,可能有静态变量,可能有静态代码块,这些逻辑编译之后会封装到一个叫做clinit的方法里。
    clinit方法在我们第一次主动使用这个类的时候会触发执行,比如我们访问这个类的静态方法或者静态字段就会触发执行clinit,但是这个过程是不可逆的,也就是说当我们执行一遍之后再也不会执行了,如果在执行这个方法过程中出现了异常没有被捕获,那这个类将永远不可用。即使抛出异常,被catch住,也依然会被JVM标记为error标记。
    如果clinit执行失败了,抛了一个未被捕获的异常,那将这个类的状态设置为initialization_error,并且无法再恢复,因为jvm会认为你这次初始化失败了,下次肯定也是失败的,为了防止不断抛这种异常,所以做了一个缓存处理,不是每次都再去执行clinit,因此大家要特别注意,类的初始化过程可千万不能出错,出错就可能只能重启了哦。

8.JVM源码分析之jstat工具原理完全解读

  • jstat如何获取到这些变量的值

变量值显然是从目标进程里获取来的,但是是怎样来的?local socket还是memory share?其实是从一个共享文件里来的,这个文件叫PerfData,主要指的是/tmp/hsperfdata_/这个文件

  • PerfData文件
    • 文件创建
      这个文件是否存在取决于两个参数,一个UsePerfData,另一个是PerfDisableSharedMem,如果设置了-XX:+PerfDisableSharedMem或者-XX:-UsePerfData,那这个文件是不会存在的,默认情况下PerfDisableSharedMem是关闭的,UsePerfData是打开的,所以默认情况下PerfData文件是存在的。
    • 文件删除
      那这个文件什么时候删除?正常情况下当进程退出的时候会自动删除,但是某些极端情况下,比如kill -9,这种信号jvm是不能捕获的,所以导致进程直接退出了,而没有做一些收尾性的工作,这个时候你会发现进程虽然没了,但是这个文件其实还是存在的。在当前用户接下来的任何一个java进程(比如说我们执行jps)起来的时候会去做一个判断,遍历/tmp/hsperfdata_下的进程文件,挨个看进程是不是还存在,如果不存在了就直接删除该文件,判断是否存在的具体操作其实就是发一个kill -0的信号看是否有异常。
    • 文件更新
      由于这个文件是通过mmap的方式映射到了内存里,而jstat是直接通过DirectByteBuffer的方式从PerfData里读取的,所以只要内存里的值变了,那我们从jstat看到的值就会发生变化,内存里的值什么时候变,取决于-XX:PerfDataSamplingInterval这个参数,默认是50ms,也就是说50ms更新一次值,基本上可以认为是实时的了。

SparkStreaming数据产生与导入相关的内存分析

Posted on 2016-11-27 | In Spark |

转自Spark Streaming 数据产生与导入相关的内存分析

前言

我这篇文章会分几个点来描述Spark Streaming 的Receiver在内存方面的表现。

  • 一个大致的数据接受流程
  • 一些存储结构的介绍
  • 哪些点可能导致内存问题,以及相关的配置参数
    另外,有位大牛写了Spark Streaming 源码解析系列,我觉得写的不错,这里也推荐下。

我在部门尽力推荐使用Spark Streaming做数据处理,目前已经应用在日志处理,机器学习等领域。这期间也遇到不少问题,尤其是Kafka在接受到的数据量非常大的情况下,会有一些内存相关的问题。

另外特别说明下,我们仅仅讨论的是High Level的Kafka Stream,也就是输入流通过如下方式创建:

1
KafkaUtils.createStream

并且不开启WAL的情况下。

数据接受流程

启动Spark Streaming(后续缩写为SS)后,SS 会选择一台Executor 启动ReceiverSupervisor,并且标记为Active状态。接着按如下步骤处理:

  1. ReceiverSupervisor会启动对应的Receiver(这里是KafkaReceiver)

  2. KafkaReceiver 会根据配置启动新的线程接受数据,在该线程中调用 ReceiverSupervisor.store 方法填充数据,注意,这里是一条一条填充的。

  3. ReceiverSupervisor 会调用 BlockGenerator.addData 进行数据填充。

到目前为止,整个过程不会有太多内存消耗,正常的一个线性调用。所有复杂的数据结构都隐含在 BlockGenerator 中。

BlockGenerator 存储结构

BlockGenerator 会复杂些,这里有几个点,

  1. 维护了一个缓存 currentBuffer ,就是一个无限长度的ArrayBuffer。currentBuffer 并不会被复用,而是每次都会新建,然后把老的对象直接封装成Block,BlockGenerator会负责保证currentBuffer 只有一个。currentBuffer 填充的速度是可以被限制的,以秒为单位,配置参数为 spark.streaming.receiver.maxRate。这个是Spark内存控制的第一道防线,填充currentBuffer 是阻塞的,消费Kafka的线程直接做填充。

  2. 维护了一个 blocksForPushing 队列, size 默认为10个(1.5.1版本),可通过 spark.streaming.blockQueueSize 进行配置。该队列主要用来实现生产-消费模式。每个元素其实是一个currentBuffer形成的block。

  3. blockIntervalTimer 是一个定时器。其实是一个生产者,负责将currentBuffer 的数据放到 blocksForPushing 中。通过参数 spark.streaming.blockInterval 设置,默认为200ms。放的方式很简单,直接把currentBuffer做为Block的数据源。这就是为什么currentBuffer不会被复用。

  4. blockPushingThread 也是一个定时器,负责将Block从blocksForPushing取出来,然后交给BlockManagerBasedBlockHandler.storeBlock 方法。10毫秒会取一次,不可配置。到这一步,才真的将数据放到了Spark的BlockManager中。

步骤描述完了,我们看看有哪些值得注意的地方。

currentBuffer

首先自然要说下currentBuffer,如果200ms期间你从Kafka接受的数据足够大,则足以把内存承包了。而且currentBuffer使用的并不是spark的storage内存,而是有限的用于运算存储的内存。 默认应该是 heap*0.4。除了把内存搞爆掉了,还有一个是GC。导致receiver所在的Executor 极容易挂掉,处理速度也巨慢。 如果你在SparkUI发现Receiver挂掉了,考虑有没有可能是这个问题。

blocksForPushing

blocksForPushing 这个是作为currentBuffer 和BlockManager之间的中转站。默认存储的数据最大可以达到 10*currentBuffer 大小。一般不打可能,除非你的 spark.streaming.blockInterval 设置的比10ms 还小,官方推荐最小也要设置成 50ms,你就不要搞对抗了。所以这块不用太担心。

blockPushingThread

blockPushingThread 负责从 blocksForPushing 获取数据,并且写入 BlockManager 。这里很蛋疼的事情是,blockPushingThread只写他自己所在的Executor的 blockManager,也就是每个batch周期的数据都会被 一个Executor给扛住了。 这是导致内存被撑爆的最大风险。 也就是说,每个batch周期接受到的数据最好不要超过接受Executor的内存(Storage)的一半。否则有你受的。我发现在数据量很大的情况下,最容易挂掉的就是Receiver所在的Executor了。 建议Spark-Streaming团队最好是能将数据写入到多个BlockManager上。

StorageLevel的配置问题

另外还有几个值得注意的问题:

如果你配置成Memory_Disk ,如果Receiver所在的Executor一旦挂掉,你也歇菜了,整个Spark Streaming作业会失败。失败的原因是一部分block找不到了。

如果你配置成Memory_Disk_2,数据会被replication到不同的节点。一般而言不会出现作业失败或者丢数据。但解决不了Receiver也容易挂的问题,当然还是主要还是内存引起的。

最好是采用默认设置 MEMORY_AND_DISK_SER_2 比较靠谱些。

这里面还有一个风险点就是,如果某个batch processing延迟了,那么对应的BlockManager的数据不会被释放,然后下一个batch的数据还在进,也会加重内存问题。

动态控制消费速率以及相关论文

另外,spark的消费速度可以设置上限以外,亦可以根据processing time 来动态调整。通过 spark.streaming.backpressure.enabled 设置为true 可以打开。算法的论文可参考: Socc 2014: Adaptive Stream Processing using Dynamic Batch Sizing ,还是有用的,我现在也都开启着。

Spark里除了这个 Dynamic,还有一个就是Dynamic Allocation,也就是Executor数量会根据资源使用情况,自动伸缩。我其实蛮喜欢Spark这个特色的。具体的可以查找下相关设计文档。

Spark SQL 学习笔记

Posted on 2016-10-26 | In Spark |

Spark SQL 学习笔记

Spark SQL中实现Hive MapJoin
重点参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

## 定义
- DataFrames
A DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

- Datasets
A Dataset is a new experimental interface added in Spark 1.6 that tries to provide the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.).


## 概述
入口: SQLContext
``` scala
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

Hive支持: HiveContext
功能: more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables

Creating DataFrames

With a SQLContext, applications can create DataFrames from an existing RDD, from a Hive table, or from data sources.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()

// Print the schema in a tree format
df.printSchema()

// Select only the "name" column
df.select("name").show()

// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()

// Select people older than 21
df.filter(df("age") > 21).show()

// Count people by age
df.groupBy("age").count().show()

运行sql

1
val df = sqlContext.sql("SELECT * FROM table")

Creating Datasets

1
2
3
4
5
6
7
8
// Encoders for most common types are automatically provided by importing sqlContext.implicits._
val ds = Seq(1, 2, 3).toDS()
ds.map(_ + 1).collect() // Returns: Array(2, 3, 4)


/ DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path).as[Person]

Interoperating with RDDs

Inferring the Schema Using Reflection

The Scala interface for Spark SQL supports automatically converting an RDD containing case classes to a DataFrame.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.
// 可以使用索引访问每一行中的列
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)

Programmatically Specifying the Schema

  1. Create an RDD of Rows from the original RDD;
  2. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
  3. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // sc is an existing SparkContext.
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    // Create an RDD
    val people = sc.textFile("examples/src/main/resources/people.txt")

    // The schema is encoded in a string
    val schemaString = "name age"

    // Import Row.
    import org.apache.spark.sql.Row;

    // Import Spark SQL data types
    import org.apache.spark.sql.types.{StructType,StructField,StringType};

    // Generate the schema based on the string of schema
    val schema =
    StructType(
    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

    // Convert records of the RDD (people) to Rows.
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

    // Apply the schema to the RDD.
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    // Register the DataFrames as a table.
    peopleDataFrame.registerTempTable("people")

    // SQL statements can be run by using the sql methods provided by sqlContext.
    val results = sqlContext.sql("SELECT name FROM people")

    // The results of SQL queries are DataFrames and support all the normal RDD operations.
    // The columns of a row in the result can be accessed by field index or by field name.
    results.map(t => "Name: " + t(0)).collect().foreach(println)

Data Sources

Generic Load/Save Functions

1
2
3
4
5
6
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

// 显式指定格式
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

Run SQL on files directly

1
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

Saving to Persistent Tables

When working with a HiveContext, DataFrames can also be saved as persistent tables using the saveAsTable command. Unlike the registerTempTable command, saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore.
将会真的使用hive创建一张内表

Parquet

Partition Discovery 分区

Currently, numeric data types and string type are supported.
参数设置: spark.sql.sources.partitionColumnTypeInference.enabled true.
文件路径 path/to/table/gender=male

Schema Merging

支持Parquet属性变更
该属性默认被关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")

// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)

hive支持

1
2
3
4
5
6
7
8
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

JDBC To Other Databases

支持JDBC协议连接其他数据源

1
2
3
val jdbcDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "schema.tablename")).load()

性能优化

Caching Data In Memory

  1. sqlContext.cacheTable(“tableName”) / sqlContext.uncacheTable(“tableName”)
  2. dataFrame.cache()
参数 默认值 说明
spark.sql.inMemoryColumnarStorage.compressed true 是否进行
spark.sql.inMemoryColumnarStorage.batchSize 10000 cache时的batch的大小
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) join时进行广播的数据量
spark.sql.tungsten.enabled true 是否开启tungsten支持
spark.sql.shuffle.partitions 200 join和聚合时shuffle的分区数量

Distributed SQL Engine

启动ThriftServer 使用beeline或者JDBC连接使用

  1. Running the Thrift JDBC/ODBC server
    The Thrift JDBC/ODBC server implemented here corresponds to the HiveServer2 in Hive 1.2.1 You can test the JDBC server with the beeline script that comes with either Spark or Hive 1.2.1.
    To start the JDBC/ODBC server, run the following in the Spark directory:

    ./sbin/start-thriftserver.sh

or

./sbin/start-thriftserver.sh \
–hiveconf hive.server2.thrift.port= \
–hiveconf hive.server2.thrift.bind.host= \
–master

  1. use beeline to test the Thrift JDBC/ODBC server:

    ./bin/beeline
    beeline> !connect jdbc:hive2://localhost:10000

其他特性

Caching

NOTE: CACHE TABLE tbl is now eager by default not lazy. Don’t need to trigger cache materialization manually anymore.

1
2
3
CACHE [LAZY] TABLE [AS SELECT] ...
CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;

动态分区

1
2
3
df.write.partitionBy('year', 'month').saveAsTable(...)
or
df.write.partitionBy('year', 'month').insertInto(...)

Spark Streaming Backpressure分析

Posted on 2016-09-26 | In Spark |

1. 为什么引入Backpressure

默认情况下,Spark Streaming通过Receiver以生产者生产数据的速率接收数据,计算过程中会出现batch processing time > batch interval的情况,其中batch processing time 为实际计算一个批次花费时间, batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能完全处理当前接收速率接收的数据。如果这种情况持续过长的时间,会造成数据在内存中堆积,导致Receiver所在Executor内存溢出等问题(如果设置StorageLevel包含disk, 则内存存放不下的数据会溢写至disk, 加大延迟)。Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming 从v1.5开始引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力。

2. Backpressure

Spark Streaming Backpressure: 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

3. 流量控制点

当Receiver开始接收数据时,会通过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走,包装成block. 在将数据存放入currentBuffer之时,要获取许可(令牌)。如果获取到许可就可以将数据存入buffer, 否则将被阻塞,进而阻塞Receiver从数据源拉取数据。
其令牌投放采用令牌桶机制进行, 原理如下图所示:

牌桶机制: 大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。如果令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中可以保存的最大令牌数永远不会超过桶的大小。当进行某操作时需要令牌时会从令牌桶中取出相应的令牌数,如果获取到则继续操作,否则阻塞。用完之后不用放回。

Streaming 数据流被Receiver接收后,按行解析后存入iterator中。然后逐个存入Buffer,在存入buffer时会先获取token,如果没有token存在,则阻塞;如果获取到则将数据存入buffer. 然后等价后续生成block操作。

Java8 Optional用法

Posted on 2016-09-19 | In Java |

Optional源码

Optional 的三种构造方式: Optional.of(obj), Optional.ofNullable(obj) 和明确的 Optional.empty()

  • Optional.of(obj): 它要求传入的 obj 不能是 null 值的, 否则还没开始进入角色就倒在了 NullPointerException 异常上了.
  • Optional.ofNullable(obj): 它以一种智能的, 宽容的方式来构造一个 Optional 实例. 来者不拒, 传 null 进到就得到 Optional.empty(), 非 null 就调用 Optional.of(obj).

观点:

  1. 当我们非常非常的明确将要传给 Optional.of(obj) 的 obj 参数不可能为 null 时, 比如它是一个刚 new 出来的对象(Optional.of(new User(…))), 或者是一个非 null 常量时;
  2. 当想为 obj 断言不为 null 时, 即我们想在万一 obj 为 null 立即报告 NullPointException 异常, 立即修改, 而不是隐藏空指针异常时, 我们就应该果断的用 Optional.of(obj) 来构造 Optional 实例, 而不让任何不可预计的 null 值有可乘之机隐身于 Optional 中.

使用:

  • 存在即返回, 无则提供默认值

    1
    2
    return user.orElse(null);  //而不是 return user.isPresent() ? user.get() : null;
    return user.orElse(UNKNOWN_USER);
  • 存在即返回, 无则由函数来产生

    1
    return user.orElseGet(() -> fetchAUserFromDatabase()); //而不要 return user.isPresent() ? user: fetchAUserFromDatabase();
  • 存在才对它做点什么

    1
    2
    user.ifPresent(System.out::println);
    return user.map(u -> u.getOrders()).orElse(Collections.emptyList())
  • map 是可能无限级联的, 比如再深一层, 获得用户名的大写形式

    1
    2
    3
    return user.map(u -> u.getUsername())
    .map(name -> name.toUpperCase())
    .orElse(null);
1…567…11

Sun Ke

104 posts
21 categories
61 tags
© 2018 Sun Ke
Powered by Hexo
|
Theme — NexT.Pisces v5.1.4