雁渡寒潭 风吹疏竹

教练,我想打篮球


  • Home

  • Tags

  • Categories

  • Archives

Yarn的基本结构和工作原理

Posted on 2018-01-16 | In Hadoop |

1.YARN 基本架构

YARN是Hadoop 2.0中的资源管理系统,它的基本设计思想是将MRv1中的JobTracker拆分成了两个独立的服务:一个全局的资源管理器 ResourceManager和每个应用程序特有的ApplicationMaster。其中ResourceManager负责整个系统的资源管理和分配,而ApplicationMaster负责单个应用程序的管理。

1.1 YARN基本组成结构

YARN总体上仍然是Master/Slave结构,在整个资源管理框架中,ResourceManager为Master,NodeManager为Slave,ResourceManager负责对各个NodeManager上的资源进行统一管理和调度。当用 户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster,它负责向ResourceManager申请资源,并要求NodeManger启动可以占用一定资源的任务。由于不同的ApplicationMaster被分布到不同的节点上,因此它们之间不会相互影响。

下图描述了YARN的基本组成结构,YARN主要由ResourceManager、NodeManager、 ApplicationMaster和Container等几个组件构成。

  1. ResourceManager(RM)

RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。

(1) 调度器 调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。需要注意的是, 该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬 件故障而产生的失败任务,这些均交由应用程序相关的ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分 配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU、磁盘、网络等资源封装在一起,从而限 定每个任务使用的资源量。此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器,YARN提供了多种直接可用的调度器,比如Fair Scheduler和Capacity Scheduler等。

(2) 应用程序管理器应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

  1. ApplicationMaster(AM)

用户提交的每个应用程序均包含一个AM,主要功能包括:

①与RM调度器协商以获取资源(用Container表示);

②将得到的任务进一步分配给内部的任务;

③与NM通信以启动/停止任务;

④监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

当前YARN自带了两个AM实现,一个是用于演示AM编写方法的实例程序distributedshell,它可以申请一定数目的Container以并 行运行一个Shell命令或者Shell脚本;另一个是运行MapReduce应用程序的AM—MRAppMaster。此外,一些其他的计算框架对应的 AM正在开发中,比如Open MPI、Spark等。

  1. NodeManager(NM)

NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。

  1. Container

Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的 资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。需要注意的 是,Container不同于MRv1中的slot,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。截至本书完成时,YARN仅支持CPU 和内存两种资源,且使用了轻量级资源隔离机制Cgroups进行资源隔离。

2.Yarn的工作流程

运行在YARN上的应用程序主要分为两类:短应用程序和长应用程序,其中,短应用程序是指一定时间内(可能是秒级、分钟级或小时级,尽管天级别或者更长时 间的也存在,但非常少)可运行完成并正常退出的应用程序,比如MapReduce作业、Tez DAG作业等,长应用程序是指不出意外,永不终止运行的应用程序,通常是一些服务,比如Storm Service(主要包括Nimbus和Supervisor两类服务),HBase Service(包括Hmaster和RegionServer两类服务)等,而它们本身作为一个框架提供了编程接口供用户使用。尽管这两类应用程序作用 不同,一类直接运行数据处理程序,一类用于部署服务(服务之上再运行数据处理程序),但运行在YARN上的流程是相同的。

当用户向YARN中提交一个应用程序后,YARN将分两个阶段运行该应用程序:

第一个阶段是启动ApplicationMaster;

第二个阶段是由ApplicationMaster创建应用程序,为它申请资源,并监控它的整个运行过程,直到运行完成。如下图所示,YARN的工作流程分为以下几个步骤:

步骤1 用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。

步骤2 ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。

步骤3 ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。

步骤4 ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。

步骤5 一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。

步骤6 NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。

步骤7 各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务 的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向 ApplicationMaster查询应用程序的当前运行状态。

步骤8 应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。

Livy

Posted on 2018-01-09 | In Spark |

Livy:基于Apache Spark的REST服务

数据库与数据仓库的本质区别是什么

Posted on 2018-01-09 | In 数据仓库 |

整理自知乎: https://www.zhihu.com/question/20623931

逻辑层面/概念层面

数据库和数据仓库其实是一样的或者及其相似的,都是通过某个数据库软件,基于某种数据模型来组织、管理数据。

数据库通常更关注业务交易处理(OLTP),而数据仓库更关注数据分析层面(OLAP),由此产生的数据库模型上也会有很大的差异。数据库通常追求交易的速度,交易完整性,数据的一致性,等等,在数据库模型上主要遵从范式模型(1NF,2NF,3NF,等等),从而尽可能减少数据冗余,保证引用完整性;而数据仓库强调数据分析的效率,复杂查询的速度,数据之间的相关性分析,所以在数据库模型上,数据仓库喜欢使用多维模型,从而提高数据分析的效率。

数据库:传统的关系型数据库的主要应用,主要是基本的、日常的事务处理,例如银行交易。

数据仓库:数据仓库系统的主要应用主要是OLAP(On-Line Analytical Processing),支持复杂的分析操作,侧重决策支持,并且提供直观易懂的查询结果。

举个最常见的例子,拿电商行业来说好了。基本每家电商公司都会经历,从只需要业务数据库到要数据仓库的阶段。

  • 电商早期启动非常容易,入行门槛低。找个外包团队,做了一个可以下单的网页前端 + 几台服务器 + 一个MySQL,就能开门迎客了。这好比手工作坊时期。
  • 第二阶段,流量来了,客户和订单都多起来了,普通查询已经有压力了,这个时候就需要升级架构变成多台服务器和多个业务数据库(量大+分库分表),这个阶段的业务数字和指标还可以勉强从业务数据库里查询。初步进入工业化。
  • 第三个阶段,一般需要 3-5 年左右的时间,随着业务指数级的增长,数据量的会陡增,公司角色也开始多了起来,开始有了 CEO、CMO、CIO,大家需要面临的问题越来越复杂,越来越深入。高管们关心的问题,从最初非常粗放的:“昨天的收入是多少”、“上个月的 PV、UV 是多少”,逐渐演化到非常精细化和具体的用户的集群分析,特定用户在某种使用场景中,例如“20~30岁女性用户在过去五年的第一季度化妆品类商品的购买行为与公司进行的促销活动方案之间的关系”。

这类非常具体,且能够对公司决策起到关键性作用的问题,基本很难从业务数据库从调取出来。原因在于:

  • 业务数据库中的数据结构是为了完成交易而设计的,不是为了而查询和分析的便利设计的。
  • 业务数据库大多是读写优化的,即又要读(查看商品信息),也要写(产生订单,完成支付)。因此对于大量数据的读(查询指标,一般是复杂的只读类型查询)是支持不足的。

而怎么解决这个问题,此时我们就需要建立一个数据仓库了,公司也算开始进入信息化阶段了。数据仓库的作用在于:

  • 数据结构为了分析和查询的便利;
  • 只读优化的数据库,即不需要它写入速度多么快,只要做大量数据的复杂查询的速度足够快就行了。

那么在这里前一种业务数据库(读写都优化)的是业务性数据库,后一种是分析性数据库,即数据仓库。

最后总结一下:数据库 比较流行的有:MySQL, Oracle, SqlServer等数据仓库 比较流行的有:AWS Redshift, Greenplum, Hive等这样把数据从业务性的数据库中提取、加工、导入分析性的数据库就是传统的 ETL 工作。

SpringBoot

Posted on 2018-01-09 | In Spring |

这一篇文章带你感受微服务的生和死,Spring Boot是生和死的主旋律

内容: 给出了微服务的一些概念文章,还有SpringBoot相关的文章

Python知识点

Posted on 2018-01-05 | In Python |

1. sys.path.append() 添加模块至模块扫描路径

当我们导入一个模块时:import xxx,默认情况下python解析器会搜索当前目录、已安装的内置模块和第三方模块,搜索路径存放在sys模块的path中:
sys.path 返回的是一个列表!
该路径已经添加到系统的环境变量了,当我们要添加自己的搜索目录时,可以通过列表的append()方法;

对于模块和自己写的脚本不在同一个目录下,在脚本开头加sys.path.append(‘xxx’):

1
2
3

import sys
sys.path.append(’引用模块的地址')

这种方法是运行时修改,脚本运行后就会失效的。

另外一种方法是:

把路径添加到系统的环境变量,或把该路径的文件夹放进已经添加到系统环境变量的路径内。环境变量的内容会自动添加到模块搜索路径中。

sys模块包含了与python解释器和它的环境有关的函数,这个你可以通过dir(sys)来查看他里面的方法和成员属性。
下面的两个方法可以将模块路径加到当前模块扫描的路径里:
sys.path.append(‘你的模块的名称’)。
sys.path.insert(0,’模块的名称’)
永久添加路径到sys.path中,方式有三,如下:
1)将写好的py文件放到 已经添加到系统环境变量的 目录下 ;
2) 在 /usr/lib/python2.6/site-packages 下面新建一个.pth 文件(以pth作为后缀名)
将模块的路径写进去,一行一个路径,如: vim pythonmodule.pth
/home/liu/shell/config
/home/liu/shell/base
3) 使用PYTHONPATH环境变量
export PYTHONPATH=$PYTHONPATH:/home/liu/shell/config

2. Python ConfigParser模块常用方法示例

在程序中使用配置文件来灵活的配置一些参数是一件很常见的事情,配置文件的解析并不复杂,在Python里更是如此,在官方发布的库中就包含有做这件事情的库,那就是ConfigParser,这里简单的做一些介绍。

Python ConfigParser模块解析的配置文件的格式比较象ini的配置文件格式,就是文件中由多个section构成,每个section下又有多个配置项,比如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
[db]    
db_host=192.168.1.1
db_port=3306
db_user=root
db_pass=password
[concurrent]
thread=200
processor=400
```
假设上面的配置文件的名字为test.conf。里面包含两个section,一个是db, 另一个是concurrent, db里面还包含有4项,concurrent里面有两项。这里来做做解析:
``` python
#-*- encoding: gb2312 -*-
import ConfigParser,string,os,sys
cf = ConfigParser.ConfigParser()
cf.read("test.conf")

# 返回所有的section
s = cf.sections()
print 'section:', s
o = cf.options("db")
print 'options:', o
v = cf.items("db")
print 'db:', v
print '-'*60

#可以按照类型读取出来
db_host = cf.get("db", "db_host")
db_port = cf.getint("db", "db_port")
db_user = cf.get("db", "db_user")
db_pass = cf.get("db", "db_pass")

# 返回的是整型的
threads = cf.getint("concurrent", "thread")
processors = cf.getint("concurrent", "processor")
print "db_host:", db_host
print "db_port:", db_port
print "db_user:", db_user
print "db_pass:", db_pass
print "thread:", threads
print "processor:", processors

#修改一个值,再写回去
cf.set("db", "db_pass", "zhaowei")
cf.write(open("test.conf", "w"))

#添加一个section。(同样要写回)
cf.add_section('liuqing')
cf.set('liuqing', 'int', '15')
cf.set('liuqing', 'bool', 'true')
cf.set('liuqing', 'float', '3.1415')
cf.set('liuqing', 'baz', 'fun')
cf.set('liuqing', 'bar', 'Python')
cf.set('liuqing', 'foo', '%(bar)s is %(baz)s!')
cf.write(open("test.conf", "w"))

#移除section 或者option 。(只要进行了修改就要写回的哦)
cf.remove_option('liuqing','int')
cf.remove_section('liuqing')
cf.write(open("test.conf", "w"))

3. Decorator 的本质

1
2
3
4
5
6
7
8
9
10
11
12
13
def hello(fn):
def wrapper():
print "hello, %s" % fn.__name__
fn()
print "goodby, %s" % fn.__name__
return wrapper


@hello
def foo():
print "i am foo"

foo()

返回:

1
2
3
hello, foo
i am foo
goodby, foo

对于Python的这个@注解语法糖- Syntactic Sugar 来说,当你在用某个@decorator来修饰某个函数func时,如下所示:

1
2
3
@decorator
def func():
pass

其解释器会解释成下面这样的语句:

1
2

func = decorator(func)

尼玛,这不就是把一个函数当参数传到另一个函数中,然后再回调吗?是的,但是,我们需要注意,那里还有一个赋值语句,把decorator这个函数的返回值赋值回了原来的func。
根据《函数式编程》中的first class functions中的定义的,你可以把函数当成变量来使用,所以,decorator必需得返回了一个函数出来给func,这就是所谓的higher order function 高阶函数,不然,后面当func()调用的时候就会出错。 就我们上面那个hello.py里的例子来说,

1
2
3
@hello
def foo():
print "i am foo"

被解释成了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
foo = hello(foo)
```
### class式的 Decorator
首先,先得说一下,decorator的class方式,还是看个示例:
``` python
class myDecorator(object):

def __init__(self, fn):
print "inside myDecorator.__init__()"
self.fn = fn

def __call__(self):
self.fn()
print "inside myDecorator.__call__()"

@myDecorator
def aFunction():
print "inside aFunction()"

print "Finished decorating aFunction()"

aFunction()

# 输出:
# inside myDecorator.__init__()
# Finished decorating aFunction()
# inside aFunction()
# inside myDecorator.__call__()

一些decorator的示例

给函数调用做缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from functools import wraps
def memo(fn):
cache = {}
miss = object()

@wraps(fn)
def wrapper(*args):
result = cache.get(args, miss)
if result is miss:
result = fn(*args)
cache[args] = result
return result

return wrapper

@memo
def fib(n):
if n < 2:
return n
return fib(n - 1) + fib(n - 2)

上面这个例子中,是一个斐波拉契数例的递归算法。我们知道,这个递归是相当没有效率的,因为会重复调用。比如:我们要计算fib(5),于是其分解成fib(4) + fib(3),而fib(4)分解成fib(3)+fib(2),fib(3)又分解成fib(2)+fib(1)…… 你可看到,基本上来说,fib(3), fib(2), fib(1)在整个递归过程中被调用了两次。

而我们用decorator,在调用函数前查询一下缓存,如果没有才调用了,有了就从缓存中返回值。一下子,这个递归从二叉树式的递归成了线性的递归。

注册回调函数

下面这个示例展示了通过URL的路由来调用相关注册的函数示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class MyApp():
def __init__(self):
self.func_map = {}

def register(self, name):
def func_wrapper(func):
self.func_map[name] = func
return func
return func_wrapper

def call_method(self, name=None):
func = self.func_map.get(name, None)
if func is None:
raise Exception("No function registered against - " + str(name))
return func()

app = MyApp()

@app.register('/')
def main_page_func():
return "This is the main page."

@app.register('/next_page')
def next_page_func():
return "This is the next page."

print app.call_method('/')
print app.call_method('/next_page')

注意:
1)上面这个示例中,用类的实例来做decorator。
2)decorator类中没有call(),但是wrapper返回了原函数。所以,原函数没有发生任何变化。

线程异步

下面量个非常简单的异步执行的decorator,注意,异步处理并不简单,下面只是一个示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from threading import Thread
from functools import wraps

def async(func):
@wraps(func)
def async_func(*args, **kwargs):
func_hl = Thread(target = func, args = args, kwargs = kwargs)
func_hl.start()
return func_hl

return async_func

if __name__ == '__main__':
from time import sleep

@async
def print_somedata():
print 'starting print_somedata'
sleep(2)
print 'print_somedata: 2 sec passed'
sleep(2)
print 'print_somedata: 2 sec passed'
sleep(2)
print 'finished print_somedata'

def main():
print_somedata()
print 'back in main'
print_somedata()
print 'back in main'

main()

4.Python的getattr(),setattr(),delattr(),hasattr()

获取对象引用getattr

getattr()函数是Python自省的核心函数,具体使用大体如下:

Getattr用于返回一个对象属性,或者方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class A:   
def __init__(self):
self.name = 'zhangjing'
   #self.age='24'
def method(self):
print"method print"

Instance = A()
print getattr(Instance , 'name, 'not find') #如果Instance 对象中有属性name则打印self.name的值,否则打印'not find'
print getattr(Instance , 'age', 'not find') #如果Instance 对象中有属性age则打印self.age的值,否则打印'not find'
print getattr(a, 'method', 'default')
#如果有方法method,否则打印其地址,否则打印default
print getattr(a, 'method', 'default')()
#如果有方法method,运行函数并打印None否则打印default

注:使用getattr可以轻松实现工厂模式。
例:一个模块支持html、text、xml等格式的打印,根据传入的formate参数的不同,调用不同的函数实现几种格式的输出

1
2
3
4
import statsout   
def output(data, format="text"):
output_function = getattr(statsout, "output_%s" % format)
return output_function(data)

setattr

这是相对应的getattr()。参数是一个对象,一个字符串和一个任意值。字符串可能会列出一个现有的属性或一个新的属性。这个函数将值赋给属性的。该对象允许它提供。例如,setattr(x,“foobar”,123)相当于x.foobar = 123。

delattr

与setattr()相关的一组函数。参数是由一个对象(记住python中一切皆是对象)和一个字符串组成的。string参数必须是对象属性名之一。该函数删除该obj的一个由string指定的属性。delattr(x, ‘foobar’)=del x.foobar

hasattr

hasattr用于确定一个对象是否具有某个属性。

语法:
hasattr(object, name) -> bool
判断object中是否有name属性,返回一个布尔值。

5. 使用@property

在绑定属性时,如果我们直接把属性暴露出去,虽然写起来很简单,但是,没办法检查参数,导致可以把成绩随便改:

1
2
s = Student()
s.score = 9999

这显然不合逻辑。为了限制score的范围,可以通过一个set_score()方法来设置成绩,再通过一个get_score()来获取成绩,这样,在set_score()方法里,就可以检查参数:

1
2
3
4
5
6
7
8
9
10
11
class Student(object):

def get_score(self):
return self._score

def set_score(self, value):
if not isinstance(value, int):
raise ValueError('score must be an integer!')
if value < 0 or value > 100:
raise ValueError('score must between 0 ~ 100!')
self._score = value

现在,对任意的Student实例进行操作,就不能随心所欲地设置score了:

1
2
3
4
5
6
7
8
>>> s = Student()
>>> s.set_score(60) # ok!
>>> s.get_score()
60
>>> s.set_score(9999)
Traceback (most recent call last):
...
ValueError: score must between 0 ~ 100!

但是,上面的调用方法又略显复杂,没有直接用属性这么直接简单。

有没有既能检查参数,又可以用类似属性这样简单的方式来访问类的变量呢?对于追求完美的Python程序员来说,这是必须要做到的!

还记得装饰器(decorator)可以给函数动态加上功能吗?对于类的方法,装饰器一样起作用。Python内置的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
``` python 
class Student(object):

@property
def score(self):
return self._score

@score.setter
def score(self, value):
if not isinstance(value, int):
raise ValueError('score must be an integer!')
if value < 0 or value > 100:
raise ValueError('score must between 0 ~ 100!')
self._score = value

@property的实现比较复杂,我们先考察如何使用。把一个getter方法变成属性,只需要加上

1
2
3
4
5
6
7
8
9
``` python 
>>> s = Student()
>>> s.score = 60 # OK,实际转化为s.set_score(60)
>>> s.score # OK,实际转化为s.get_score()
60
>>> s.score = 9999
Traceback (most recent call last):
...
ValueError: score must between 0 ~ 100!

注意到这个神奇的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

还可以定义只读属性,只定义getter方法,不定义setter方法就是一个只读属性:
``` python
class Student(object):

@property
def birth(self):
return self._birth

@birth.setter
def birth(self, value):
self._birth = value

@property
def age(self):
return 2014 - self._birth

上面的birth是可读写属性,而age就是一个只读属性,因为age可以根据birth和当前时间计算出来。

###小结

@property广泛应用在类的定义中,可以让调用者写出简短的代码,同时保证对参数进行必要的检查,这样,程序运行时就减少了出错的可能性。

6. Python 中的 classmethod 和 staticmethod 有什么具体用途?

@classmethod means: when this method is called, we pass the class as the first argument instead of the instance of that class (as we normally do with methods). This means you can use the class and its properties inside that method rather than a particular instance.

@staticmethod means: when this method is called, we don’t pass an instance of the class to it (as we normally do with methods). This means you can put a function inside a class but you can’t access the instance of that class (this is useful when your method does not use the instance).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

class Kls(object):
def __init__(self, data):
self.data = data
def printd(self):
print(self.data)
@staticmethod
def smethod(*arg):
print('Static:', arg)
@classmethod
def cmethod(*arg):
print('Class:', arg)

>>> ik = Kls(23)
>>> ik.printd()
23
>>> ik.smethod()
Static: ()
>>> ik.cmethod()
Class: (<class '__main__.Kls'>,)
>>> Kls.printd()
TypeError: unbound method printd() must be called with Kls instance as first argument (got nothing instead)
>>> Kls.smethod()
Static: ()
>>> Kls.cmethod()
Class: (<class '__main__.Kls'>,)

Spark写GBK文件

Posted on 2017-12-12 | In Spark |
1
2
3
4
5
6
7
8
val result: RDD[(NullWritable, Text)] = totalData.map {
item =>
val line = s"${item.query}"
(NullWritable.get(), new Text(line.getBytes("GBK")))
}
//设置输出格式,以GBK存储
result.saveAsNewAPIHadoopFile(path, classOf[NullWritable],
classOf[Text], classOf[TextOutputFormat[NullWritable, Text]])

参考:

RDD行动Action操作(6)–saveAsHadoopFile

Spark多文件输出(MultipleOutputFormat)

Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(一)

Hadoop多文件输出:MultipleOutputFormat和MultipleOutputs深究(二)

Hadoop 中文编码相关问题 – mapreduce程序处理GBK编码数据并输出GBK编码数据

如何有效地记录JavaSQL日志

Posted on 2017-12-05 | In Java |

参考: http://blog.oneapm.com/apm-tech/178.html

JUnit测试中文乱码问题解决

Posted on 2017-12-05 | In Java |

pom.xml中添加plugin: 指定测试时的编码类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.5</version>
<configuration>
<forkMode>once</forkMode>
<argLine>-Dfile.encoding=UTF-8</argLine>
<includes>
<include>**/*Test.java</include>
</includes>
<excludes>
</excludes>
</configuration>
</plugin>

参考: http://yong3773.iteye.com/blog/1971454

Spark写Redis实践总结

Posted on 2017-10-30 | In Spark |

Spark写Redis实践总结

Redis是一个高性能键值数据库,最近几年非常流行。笔者所在的团队也在大规模的使用Redis作为后台数据存储解决方案。Redis作为机器学习算法与后台服务器的媒介,算法计算用户数据并写入Redis;后台服务器读取Redis,并为前端提供实时接口。本文主要介绍Spark写如Redis的实践,同时记录一些坑,方便后面回顾。

笔者使用Spark 2.0的scala API,使用jedis客户端API,dependency如下

1
2
3
4
5
6
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
<type>jar</type>
</dependency>

写Redis的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 // 写Redis
sampleData.repartition(500).foreachPartition(rows => {
val rc = new Jedis(redisHost, redisPort)
rc.auth(redisPassword)
val pipe = rc.pipelined

rows.foreach(r => {
val redisKey = r.getAs[String]("key")
val redisValue = r.getAs[String]("value")
pipe.set(redisKey, redisValue)
pipe.expire(redisKey, expireDays * 3600 * 24)
})

pipe.sync()
})

实践1:控制客户端对象数量

sampleData是一个DataSet,每一行有两个数据:key和value。由于构建Jedis客户端会有一定开销,所以一定不要用map,而是mapPartition或foreachPartition。这样,这个开销只会与parition数量相关,与数据总量无关。试想如果sampleData有1亿行,在map中将会构建1亿个Jedis对象。

实践2:批量插入数据

笔者使用了pipe进行批量插入,而不是逐条插入,批量插入效率与逐条插入效率差异参考这里。但是批量插入有个非常大的坑。上面的代码中,一次性批量插入了整个partition的数据,所以如果单个partition的数据量太多,会导致Redis内存溢出,导致服务不可用!

解决方法是在foreachPartition之前,repartition整个DateSet,确保每个分区的数据不要太大。推荐控制在1千左右。正如上面的列子,笔者将sampleData分为500个分区,每个分区1000条,那么sampleData的总数为50万左右。但是,如果数据总量太大,单个分区过小,会导致分区数过大,这样需要提高driver的内存,否则会导致driver内存溢出。

实践3:控制在线更新并发

Redis一般提供在线服务,在更新Redis的同时,它可能在前端提供服务。所以在写Redis时,不能使用太多executor。否则会使得QPS过高,影响在线服务响应,甚至导致Redis瘫痪。推荐的实践方法是提高数据的分区数量,确保每个partition的数量较小,然后逐步提高并发数量(executor数量)。观察在不同数量executor下,并发写入Redis的QPS,直到QPS达到一个可以接受的范围。

进一步改进

上述方案在5千万行数据,每行value小于100Byte的情况下,在我们的redis环境,写入时qps稳定在15000左右,不影响线上服务。这个方案被woliwang同学质疑,主要观点是如果数据量达到亿,十亿级别,如果还是设置每个partition 1000个数据量,会有上百万个partition,容易导致driver OOM。的确,上面的方案还有改进空间,可以对每个partition进一步分组,比将partition控制在10万数据量,然后每次批量写入1000数据,按顺序写100次 就可以写完,这样driver就不需要维护上百万的partition信息,同时也可以控制写入速率。

最后

实践是检验真理的唯一标准。上面的几点实践在不同Redis集群下,具体数值可能不一样,但原理不变。希望这些总结对你有用。如果自己想搭建spark+Redis环境,推荐VPS供应商Vultr,无需购买服务器,随时随地可用,物美价廉。

基于用户投票的排名算法

Posted on 2017-10-12 | In 算法 |

需求

基于用户投票的排名算法(一):Delicious和Hacker News
基于用户投票的排名算法(二):Reddit
基于用户投票的排名算法(三):Stack Overflow
基于用户投票的排名算法(四):牛顿冷却定律

对信息进行排名,意味着将信息按照重要性依次排列,并且及时进行更新。排列的依据,可以基于信息本身的特征,也可以基于用户的投票,即让用户决定,什么样的信息可以排在第一位。
根据用户的投票,决定最近一段时间内的”热文排名”。

最直觉、最简单的算法,莫过于按照单位时间内用户的投票数进行排名。得票最多的项目,自然就排在第一位。这个算法的优点是比较简单、容易部署、内容更新相当快;缺点是,一方面,排名变化不够平滑,前一个小时还排名靠前的内容,往往第二个小时就一落千丈,另一方面,缺乏自动淘汰旧项目的机制,某些热门内容可能会长期占据排行榜前列。
所以根据得票数,系统自动统计出热门文章排行榜。但是,并非得票最多的文章排在第一位,还要考虑时间因素,新文章应该比旧文章更容易得到好的排名。

公式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

其中,  P表示帖子的得票数,减去1是为了忽略发帖人的投票。   T表示距离发帖的时间(单位为小时),加上2是为了防止最新的帖子导致分母过小(之所以选择2,可能是因为从原始文章出现在其他网站,到转贴至Hacker News,平均需要两个小时)。  G表示"重力因子"(gravityth power),即将帖子排名往下拉的力量,默认值为1.8,后文会详细讨论这个值。

从这个公式来看,决定帖子排名有三个因素:
第一个因素是得票数P。
在其他条件不变的情况下,得票越多,排名越高。 如果你不想让"高票帖子"与"低票帖子"的差距过大,可以在得票数上加一个小于1的指数,比如(P-1)^0.8。
第二个因素是距离发帖的时间T。
在其他条件不变的情况下,越是新发表的帖子,排名越高。或者说,一个帖子的排名,会随着时间不断下降。
第三个因素是重力因子G。
它的数值大小决定了排名随时间下降的速度。G值越大,曲线越陡峭,排名下降得越快,意味着排行榜的更新速度越快。

## 所有时段的排名

[基于用户投票的排名算法(五):威尔逊区间](http://www.ruanyifeng.com/blog/2012/03/ranking_algorithm_wilson_score_interval.html)

一种常见的错误算法是:  ``` 得分 = 赞成票 - 反对票

假定有两个项目,项目A是60张赞成票,40张反对票,项目B是550张赞成票,450张反对票。请问,谁应该排在前面?按照上面的公式,B会排在前面,因为它的得分(550 - 450 = 100)高于A(60 - 40 = 20)。但是实际上,B的好评率只有55%(550 / 1000),而A为60%(60 / 100),所以正确的结果应该是A排在前面。
另一种常见的错误算法是: 得分 = 赞成票 / 总票数 如果”总票数”很大,这种算法其实是对的。问题出在如果”总票数”很少,这时就会出错。假定A有2张赞成票、0张反对票,B有100张赞成票、1张反对票。这种算法会使得A排在B前面。这显然错误。

我们先做如下设定:
  (1)每个用户的投票都是独立事件。
  (2)用户只有两个选择,要么投赞成票,要么投反对票。
  (3)如果投票总人数为n,其中赞成票为k,那么赞成票的比例p就等于k/n。
如果你熟悉统计学,可能已经看出来了,这是一种统计分布,叫做”二项分布”(binomial distribution)。这很重要,下面马上要用到。
们的思路是,p越大,就代表这个项目的好评比例越高,越应该排在前面。但是,p的可信性,取决于有多少人投票,如果样本太小,p就不可信。好在我们已经知道,p是”二项分布”中某个事件的发生概率,因此我们可以计算出p的置信区间。所谓”置信区间”,就是说,以某个概率而言,p会落在的那个区间。比如,某个产品的好评率是80%,但是这个值不一定可信。根据统计学,我们只能说,有95%的把握可以断定,好评率在75%到85%之间,即置信区间是[75%, 85%]。
这样一来,排名算法就比较清晰了:
  第一步,计算每个项目的”好评率”(即赞成票的比例)。
  第二步,计算每个”好评率”的置信区间(以95%的概率)。
  第三步,根据置信区间的下限值,进行排名。这个值越大,排名就越高。

置信区间的实质,就是进行可信度的修正,弥补样本量过小的影响。如果样本多,就说明比较可信,不需要很大的修正,所以置信区间会比较窄,下限值会比较大;如果样本少,就说明不一定可信,必须进行较大的修正,所以置信区间会比较宽,下限值会比较小。

二项分布的置信区间有多种计算公式,最常见的是”正态区间”(Normal approximation interval),教科书里几乎都是这种方法。但是,它只适用于样本较多的情况(np > 5 且 n(1 − p) > 5),对于小样本,它的准确性很差。
1927年,美国数学家 Edwin Bidwell Wilson提出了一个修正公式,被称为”威尔逊区间”,很好地解决了小样本的准确性问题。

贝叶斯平均

基于用户投票的排名算法(六):贝叶斯平均

介绍了”威尔逊区间”,它解决了投票人数过少、导致结果不可信的问题。
举例来说,如果只有2个人投票,”威尔逊区间”的下限值会将赞成票的比例大幅拉低。这样做固然保证了排名的可信性,但也带来了另一个问题:排行榜前列总是那些票数最多的项目,新项目或者冷门的项目,很难有出头机会,排名可能会长期靠后。

在排名页面的底部,IMDB给出了它的计算方法。WR = v/(v+m) * R + m/(v+m) *C

  • WR, 加权得分(weighted rating)。
  • R,该电影的用户投票的平均得分(Rating)。
  • v,该电影的投票人数(votes)。
  • m,排名前250名的电影的最低投票数(现在为3000)。
  • C, 所有电影的平均得分(现在为6.9)。

这种算法被称为”贝叶斯平均”(Bayesian average)。因为某种程度上,它借鉴了”贝叶斯推断”(Bayesian inference)的思想:既然不知道投票结果,那就先估计一个值,然后不断用新的信息修正,使得它越来越接近正确的值。
在这个公式中,m(总体平均分)是”先验概率”,每一次新的投票都是一个调整因子,使总体平均分不断向该项目的真实投票结果靠近。投票人数越多,该项目的”贝叶斯平均”就越接近算术平均,对排名的影响就越小。因此,这种方法可以给一些投票人数较少的项目,以相对公平的排名。
“贝叶斯平均”也有缺点,主要问题是它假设用户的投票是正态分布。比如,电影A有10个观众评分,5个为五星,5个为一星;电影B也有10个观众评分,都给了三星。这两部电影的平均得分(无论是算术平均,还是贝叶斯平均)都是三星,但是电影A可能比电影B更值得看。

123…11

Sun Ke

104 posts
21 categories
61 tags
© 2018 Sun Ke
Powered by Hexo
|
Theme — NexT.Pisces v5.1.4