图计算入门:Dato

Dato的前身是PowerGraph,GraphLab Create,现在又改名为Dato,其Co-founder貌似是位华人(http://weibo.com/u/1863703874)。

案例集锦:https://dato.com/learn/gallery/
手把手:https://dato.com/learn/how-to/https://github.com/dato-code/how-to
用户指南:https://dato.com/learn/userguide/https://github.com/dato-code/userguide
训练:https://github.com/dato-code/tutorials

Dato Quickstart

Dato需要注册才能使用, 并且有30天的试用期.注册地址:https://dato.com/download,填写基本信息,然后会收到一封邮件。
注册之后会有一个安装指南的页面,Mac下有Dato Launcher带界面的,或者使用pip安装。

下面使用python的虚拟环境安装一个干净的dato测试环境:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Create a virtual environment named dato-env
virtualenv dato-env

# Activate the virtual environment
source dato-env/bin/activate

# Make sure pip is up to date
pip install --upgrade pip

# Install IPython Notebook (optional)
pip install "ipython[notebook]"

# Install your licensed copy of GraphLab Create
pip install --upgrade --no-cache-dir https://get.dato.com/GraphLab-Create/1.5.2/你的注册邮箱/分配给你的KEY/GraphLab-Create-License.tar.gz

如果是旧版本升级, 则到dato-env下执行: bin/pip install graphlab-create==1.5.2

测试dato可用:

1
2
3
4
5
➜  dato-env  bin/python
Python 2.7.8 (default, Oct 20 2014, 15:05:19)
[GCC 4.9.1] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import graphlab as gl

如果没有报错, 说明可以使用graphlab的python包了.
如果执行路径不对,比如不在dato-env下或者直接敲入python都会报错找不到graphlab模块.比如错误是这样的:

1
2
3
4
>>> import graphlab as gl
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ImportError: No module named graphlab

因为系统中已经有python了. 无法认识虚拟环境的python. 所以必须用的是虚拟环境下的python!

Getting Started with GraphLab Create

本节主要参考:https://dato.com/learn/gallery/notebooks/getting_started_with_graphlab_create.html

1.加载数据为SFrame

SFrame: tab分割的结构, 对数据再加工和特征构造非常理想
Graph: 对处理稀疏数据非常理想的一种结构
类似于Spark中将文本文件加载成RDD,简单说就是将外部数据源格式化成可以被内部系统认识的对象

1
2
vertices = gl.SFrame.read_csv('http://s3.amazonaws.com/dato-datasets/bond/bond_vertices.csv')
edges = gl.SFrame.read_csv('http://s3.amazonaws.com/dato-datasets/bond/bond_edges.csv')

读取csv文件时, gl会根据文件第一行的内容推断tab分割列的类型:

1
2
bond_vertices: [str,str,int,int]  
bond_edges: [str,str,str]

查看vertices顶点和edges边, 直接一个变量就可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
>>> vertices
+----------------+--------+-----------------+---------+
| name | gender | license_to_kill | villian |
+----------------+--------+-----------------+---------+
| James Bond | M | 1 | 0 |
| M | M | 1 | 0 |
| Moneypenny | F | 1 | 0 |
| Q | M | 1 | 0 |
| Wai Lin | F | 1 | 0 |
| Inga Bergstorm | F | 0 | 0 |
| Elliot Carver | M | 0 | 1 |
| Paris Carver | F | 0 | 1 |
| Gotz Otto | M | 0 | 1 |
| Henry Gupta | M | 0 | 1 |
+----------------+--------+-----------------+---------+

>>> edges
+----------------+------------+------------+
| src | dst | relation |
+----------------+------------+------------+
| Wai Lin | James Bond | friend |
| M | James Bond | worksfor |
| Inga Bergstorm | James Bond | friend |
| Elliot Carver | James Bond | killed_by |
| Gotz Otto | James Bond | killed_by |
| James Bond | M | managed_by |
| Q | M | managed_by |
| Moneypenny | M | managed_by |
| Q | Moneypenny | colleague |
| M | Moneypenny | worksfor |
+----------------+------------+------------+

2.创建图对象Graph,并添加顶点和边

先创建图对象,再在图上添加顶点和边。在一般的图数据库中,先创建顶点,然后用边把顶点的关联关系建立起来。
不过由于上面的edges已经存在源节点和目标节点的引用,所以直接把顶点对象和边对象加入图即可。
或者可以把vertices和edges各看做一张表,edges表引用了vertices的外键。所以图数据库也是一种数据库哦!

1
2
3
g = gl.SGraph()
g = g.add_vertices(vertices=vertices, vid_field='name')
g = g.add_edges(edges=edges, src_field='src', dst_field='dst')

查看图的结构, 注意到把原先顶点的name改成了__id. 把边的src,dst改成__src_id, __dst_id.

1
2
3
4
>>> g
SGraph({'num_edges': 20, 'num_vertices': 10})
Vertex Fields:['__id', 'gender', 'license_to_kill', 'villian']
Edge Fields:['__src_id', '__dst_id', 'relation']

图对象提供了一些方法可以获取边和顶点. 跟原先的vertices,edges变量的输出类似. 毕竟把vertices和edges输入图后,再从图获取出来应该保持不变。

1
2
g.get_vertices()
g.get_edges()

3.对图简单计算PageRank

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
>>> pr = gl.pagerank.create(g)
PROGRESS: Counting out degree
PROGRESS: Done counting out degree
PROGRESS: +-----------+-----------------------+
PROGRESS: | Iteration | L1 change in pagerank |
PROGRESS: +-----------+-----------------------+
PROGRESS: | 1 | 6.65833 |
PROGRESS: | 2 | 4.65611 |
PROGRESS: | 3 | 3.46298 |
PROGRESS: | 4 | 2.55686 |
PROGRESS: | 5 | 1.95422 |
PROGRESS: | 6 | 1.42139 |
PROGRESS: | 7 | 1.10464 |
PROGRESS: | 8 | 0.806704 |
PROGRESS: | 9 | 0.631771 |
PROGRESS: | 10 | 0.465388 |
PROGRESS: | 11 | 0.364898 |
PROGRESS: | 12 | 0.271257 |
PROGRESS: | 13 | 0.212255 |
PROGRESS: | 14 | 0.159062 |
PROGRESS: | 15 | 0.124071 |
PROGRESS: | 16 | 0.0935911 |
PROGRESS: | 17 | 0.0727674 |
PROGRESS: | 18 | 0.0551714 |
PROGRESS: | 19 | 0.0427744 |
PROGRESS: | 20 | 0.0325555 |
PROGRESS: +-----------+-----------------------+

上面我们看到直接使用gl的pagerank.create方法, 传入构造好的Graph对象, 就返回了pr对象.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
>>> pr
Class : PagerankModel

Graph
-----
num_edges : 20
num_vertices : 10

Results
-------
graph : SGraph. See m['graph']
change in last iteration (L1 norm) : 0.0326
vertex pagerank : SFrame. See m['pagerank']

Settings
--------
maximun number of iterations : 20
convergence threshold (L1 norm) : 0.01
probablity of random jumps to any node in the graph: 0.15

Metrics
-------
training time (secs) : 1.0853
number of iterations : 20

Queryable Fields
----------------
training_time : Total training time of the model
graph : A new SGraph with the pagerank as a vertex property
delta : Change in pagerank for the last iteration in L1 norm
reset_probability : The probablity of randomly jumps to any node in the graph
pagerank : An SFrame with each vertex's pagerank
num_iterations : Number of iterations
threshold : The convergence threshold in L1 norm
max_iterations : The maximun number of iterations to run

看到上面的可查询的字段, 都可以通过pr.get()来获得:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> pr.get('pagerank')
+----------------+----------------+-------------------+
| __id | pagerank | delta |
+----------------+----------------+-------------------+
| Moneypenny | 1.18363921275 | 0.00143637385736 |
| Inga Bergstorm | 0.869872717136 | 0.00477951418076 |
| Henry Gupta | 0.284762885673 | 1.89255522874e-05 |
| Paris Carver | 0.284762885673 | 1.89255522874e-05 |
| Q | 1.18363921275 | 0.00143637385736 |
| Wai Lin | 0.869872717136 | 0.00477951418076 |
| M | 1.87718696576 | 0.00666194771763 |
| James Bond | 2.52743578524 | 0.0132914517076 |
| Elliot Carver | 0.634064732205 | 0.000113553313724 |
| Gotz Otto | 0.284762885673 | 1.89255522874e-05 |
+----------------+----------------+-------------------+

但是上面是没有排序的, 我们按照pagerank这一列进行topK排序, 得到最重要的人: 邦德!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> pr.get('pagerank').topk(column_name='pagerank')
+----------------+----------------+-------------------+
| __id | pagerank | delta |
+----------------+----------------+-------------------+
| James Bond | 2.52743578524 | 0.0132914517076 |
| M | 1.87718696576 | 0.00666194771763 |
| Moneypenny | 1.18363921275 | 0.00143637385736 |
| Q | 1.18363921275 | 0.00143637385736 |
| Inga Bergstorm | 0.869872717136 | 0.00477951418076 |
| Wai Lin | 0.869872717136 | 0.00477951418076 |
| Elliot Carver | 0.634064732205 | 0.000113553313724 |
| Henry Gupta | 0.284762885673 | 1.89255522874e-05 |
| Paris Carver | 0.284762885673 | 1.89255522874e-05 |
| Gotz Otto | 0.284762885673 | 1.89255522874e-05 |
+----------------+----------------+-------------------+

ipython notebook

在Mac下使用Dato Launcher安装成功后,启动GraphLab Create提供了两种方式,命令行或者IPythonNoteBook。
比如选择IPython方式,会自动打开浏览器,你可以创建一个PythonNoteBook,然后在浏览器上可以交互式地写代码。

graphlab ipython

通过g.show,还会打开一个页面:

graphlab web

点击边或顶点,以顶点为例:

graphlab vertices

SFrame(Tablular表格类型的数据)

加载和保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#https://dato.com/learn/userguide/sframe/sframe-intro.html
#读取csv文件
songs = gl.SFrame.read_csv("http://s3.amazonaws.com/dato-datasets/millionsong/song_data.csv")
usage_data = gl.SFrame.read_csv("http://s3.amazonaws.com/dato-datasets/millionsong/10000.txt",
header=False,
delimiter='\t',
column_type_hints={'X3':int})
#重命名列
usage_data.rename({'X1':'user_id', 'X2':'song_id', 'X3':'listen_count'})
#保存成SFrame的二进制格式,这样读取时直接加载,不需要解析
usage_data.save('./music_usage_data')
same_usage_data = gl.load_sframe('./music_usage_data')

#https://dato.com/learn/userguide/sframe/data-manipulation.html
#year的值如果是0,用None代替,防止计算summary时对整体造成偏差
#replace those zeroes with missing values with a Python lambda function
songs['year'] = songs['year'].apply(lambda x: None if x == 0 else x)

songs的原始示例数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--------------------+--------------------------------++--------------------------------+--------------------------------+------+
| song_id | title || release | artist_name | year |
+--------------------+--------------------------------++--------------------------------+--------------------------------+------+
| SOQMMHC12AB0180CB8 | Silent Night || Monster Ballads X-Mas | Faster Pussy cat | 2003 |
| SOVFVAK12A8C1350D9 | Tanssi vaan || Karkuteill\xc3\xa4 | Karkkiautomaatti | 1995 |
| SOGTUKN12AB017F4F1 | No One Could Ever || Butter | Hudson Mohawke | 2006 |
| SOBNYVR12A8C13558C | Si Vos Quer\xc3\xa9s || De Culo | Yerba Brava | 2003 |
| SOHSBXH12A8C13B0DF | Tangle Of Aspens || Rene Ablaze Presents Winte ... | Der Mystic | 0 |
| SOZVAPQ12A8C13B63C | Symphony No. 1 G minor "Si ... || Berwald: Symphonies Nos. 1 ... | David Montgomery | 0 |
| SOQVRHI12A6D4FB2D7 | We Have Got Love || Strictly The Best Vol. 34 | Sasha / Turbulence | 0 |
| SOEYRFT12AB018936C | 2 Da Beat Ch'yall || Da Bomb | Kris Kross | 1993 |
| SOPMIYT12A6D4F851E | Goodbye || Danny Boy | Joseph Locke | 0 |
| SOJCFMH12A8C13B0C2 | Mama_ mama can't you see ? || March to cadence with the ... | The Sun Harbor's Chorus-Do ... | 0 |
+--------------------+--------------------------------++--------------------------------+--------------------------------+------+

usage_data的原始实例数据如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--------------------------------+--------------------+---------------+
| user_id | song_id | listen_count |
+--------------------------------+--------------------+---------------+
| b80344d063b5ccb3212f76538f ... | SOAKIMP12A8C130995 | 1 |
| b80344d063b5ccb3212f76538f ... | SOBBMDR12A8C13253B | 2 |
| b80344d063b5ccb3212f76538f ... | SOBXHDL12A81C204C0 | 1 |
| b80344d063b5ccb3212f76538f ... | SOBYHAJ12A6701BF1D | 1 |
| b80344d063b5ccb3212f76538f ... | SODACBL12A8C13C273 | 1 |
| b80344d063b5ccb3212f76538f ... | SODDNQT12A6D4F5F7E | 5 |
| b80344d063b5ccb3212f76538f ... | SODXRTY12AB0180F3B | 1 |
| b80344d063b5ccb3212f76538f ... | SOFGUAY12AB017B0A8 | 1 |
| b80344d063b5ccb3212f76538f ... | SOFRQTD12A81C233C0 | 1 |
| b80344d063b5ccb3212f76538f ... | SOHQWYZ12A6D4FA701 | 1 |
+--------------------------------+--------------------+---------------+

数据转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#apply a function to multiple columns. 添加love_count列表示在title和artist_name列出现love的次数
#when apply is called on an SFrame instead of an SArray as shown here, the input of the lambda function
#is a dictionary where the keys are the column names, and the values correspond to that row's values.
#lambda函数中row是一个map, row的key是列名,row.values是row对应的值
songs['love_count'] = songs[['title', 'artist_name']].apply(
lambda row: sum(x.lower().split(' ').count('love') for x in row.values()))
songs.topk('love_count', k=5)

#根据类型选择列
song[str]

#获取某一列的概要信息
songs['year'].sketch_summary()
songs['year'].show()

#过滤不需要的记录,只需要有记录年份的
dated_songs = songs[songs['year'] != None]
#summary打印的Length=value为0的记录 + 下面不为0的记录数
len(dated_songs)

#多个过滤条件
reasonable_usage = usage_data[(usage_data['listen_count'] >= 10) & (usage_data['listen_count'] <= 500)]
len(reasonable_usage)

#可视化SFrame
songs.show

#songs中的song_id不是唯一的,因为相同song可能被不同的专辑收录,如果不关心专辑,可以过滤掉重复的song.
#比如song_id=1的有两条记录,一条在专辑1,一条在专辑2里,我们只需要一条即可,而不是把重复的都删掉!
other_cols = songs.column_names()
other_cols.remove('song_id')
agg_list = [gl.aggregate.SELECT_ONE(i) for i in other_cols]
#根据song_id分组,然后随机选择一条,这样每个song_id只有一条记录,最终的结果是每个song_id都是唯一的
unique_songs = songs.groupby('song_id', dict(zip(other_cols, agg_list)))

#播放量最多的歌曲。usage_data的列是用户id(user_id),歌曲id(song_id),播放量(listen_count),所以相同歌曲会有多个用户播放
#select sum(listen_count) as total_listens, count(user_id) as num_unique_users from usage_data group by song_id
tmp = usage_data.groupby(['song_id'], {'total_listens': gl.aggregate.SUM('listen_count'),
'num_unique_users': gl.aggregate.COUNT('user_id')})
#usage_data表没有歌曲具体信息,只有歌曲编号,所以要关联歌曲表,获取歌曲的详细信息,并按照上一步计算出来的total_listens排序
#select songs.*, tmp.* from songs, tmp on songs.song_id = tmp.song_id order by tmp.total_listens desc
tmp.join(songs, ['song_id']).topk('total_listens')

#usage_data中含有user,song,以及播放量,可以认为是用户给歌曲打分,所以具备了推荐系统的必备元素
#当然不能简单地把播放了当做评分的值,而是要归一化,比如约定评分的取值范围是1-5
s = usage_data['listen_count'].sketch_summary()
import numpy
buckets = numpy.linspace(s.quantile(.005), s.quantile(.995), 5)
def bucketize(x):
cur_bucket = 0
for i in range(0,5):
cur_bucket += 1
if x <= buckets[i]:
break
return cur_bucket
usage_data['rating'] = usage_data['listen_count'].apply(bucketize)
usage_data

复杂类型转换

list类型

1
2
3
4
5
6
7
#复杂类型的转换,SArray的list和dict类型可以说不同的
#根据艺术家名字和发行的专辑名称分组,同一个专辑中的所有歌曲放在一个列表中
#select list(title),list(year) from songs group by release, artist_name
albums = songs.groupby(['release','artist_name'], {'tracks': gl.aggregate.CONCAT('title'),
'years': gl.aggregate.CONCAT('year')})
#添加year是为了调试,比如可以得出结论:在同一张专辑中的每首歌曲的发行时间不一定是相同的
albums[0]

albums的结果示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--------------------------------+--------------------------------+|--------------------------------+--------------------------------+
| artist_name | release || tracks | years |
+--------------------------------+--------------------------------++--------------------------------+--------------------------------+
| Veruca Salt | Eight Arms To Hold You || [\'One Last Time\', \'With ... | array('d', [1997.0, 1997.0 ... |
| Les Compagnons De La Chanson | Heritage - Le Chant De Mal ... || ['I Commedianti', 'Il Est ... | None |
| Nelly / Fat Joe / Young Tr ... | Sweat || ['Grand Hang Out'] | array('d', [2004.0]) |
| The Grouch | My Baddest B*tches || ['Silly Putty (Zion I Feat ... | array('d', [1999.0]) |
| Ozzy Osbourne | Diary of a madman / Bark a ... || ["You Can\'t Kill Rock And ... | array('d', [1981.0, 1983.0 ... |
| Peter Hunnigale | Reggae Hits Vol. 32 || ['Weeks Go By'] | None |
| Burning Spear | Studio One Classics || ['Rocking Time'] | array('d', [1973.0]) |
| Bond | New Classix 2008 || ['Allegretto', 'Kashmir'] | None |
| Lee Coombs feat. Katherine ... | Control || ['Control (10Rapid Remix)' ... | None |
| Stevie Wonder | Songs In The Key Of Life || ['Ngiculela-Es Una Histori ... | array('d', [1976.0]) |
+--------------------------------+--------------------------------++--------------------------------+--------------------------------+

将list转为dict类型

1
2
3
4
5
#CONCAT聚合将每一组的指定列的所有值创建为list,现在把list转为dict类型,key是年份,value是歌曲列表
#注意到分组字段多了year,实际上是先按照前两个字段分组,然后对列表中再按照year分组
albums = songs.groupby(['release','artist_name','year'], {'tracks':gl.aggregate.CONCAT('title')})
albums = albums.unstack(column=['year','tracks'], new_column_name='track_dict')
albums

dict类型的albums结果示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--------------------------------+--------------------------------++--------------------------------+
| artist_name | release || track_dict |
+--------------------------------+--------------------------------++--------------------------------+
| Veruca Salt | Eight Arms To Hold You || {1994: [\'Straight\'], 199 ... |
| Les Compagnons De La Chanson | Heritage - Le Chant De Mal ... || None |
| Nelly / Fat Joe / Young Tr ... | Sweat || {2004: ['Grand Hang Out']} |
| The Grouch | My Baddest B*tches || {1999: ['Simple Man']} |
| Ozzy Osbourne | Diary of a madman / Bark a ... || {1986: [\'Secret Loser\'], ... |
| Peter Hunnigale | Reggae Hits Vol. 32 || None |
| Burning Spear | Studio One Classics || {1973: ['Rocking Time']} |
| Bond | New Classix 2008 || None |
| Lee Coombs feat. Katherine ... | Control || None |
| Stevie Wonder | Songs In The Key Of Life || {1976: ['Have A Talk With ... |
+--------------------------------+--------------------------------++--------------------------------+

将dict转换为list

1
2
3
4
5
6
7
8
#一张专辑中歌曲在不同的年份数量,length=1表示所有歌曲都是在同一年发行
albums['num_years'] = albums['track_dict'].item_length()
albums['num_years'].show()

#albums['track_dict']是个Map,上面添加了一列num_years,将Map转换为list(当然Map还在的)
#dict_values表示获取Map中的所有value,即year对应的歌曲列表
albums['track_list'] = albums['track_dict'].dict_values()
albums

下面的track_dict是dict,track_list是dict中value列表(这里把artist_name和release省略掉了),
注意Map的每个value本身就是一个List: track_dict=Map[Int, List[String]],
现在把List[String]抽取出来组成track_list,
即track_list的每个元素都是List[String], tract_list=List[List[String]]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+--------------------------------+-----------+--------------------------------+
| track_dict | num_years | track_list |
+--------------------------------+-----------+--------------------------------+
| {1994: [\'Straight\'], 199 ... | 2 | [[\'Straight\'], [\'With D ... |
| None | None | None |
| {2004: ['Grand Hang Out']} | 1 | [['Grand Hang Out']] |
| {1999: ['Simple Man']} | 1 | [['Simple Man']] |
| {1986: [\'Secret Loser\'], ... | 3 | [["You Can\'t Kill Rock An ... |
| None | None | None |
| {1973: ['Rocking Time']} | 1 | [['Rocking Time']] |
| None | None | None |
| None | None | None |
| {1976: ['Have A Talk With ... | 1 | [['Have A Talk With God']] |
| ... | ... | ... |
+--------------------------------+-----------+--------------------------------+

我们想把tract_list转换成List[String],实际上和scala中的flatten类似(压扁)。即目标是:
将a list of lists change to a single list。

1
2
3
import itertools
albums['track_list'] = albums['track_list'].apply(lambda x: list(itertools.chain(*x)))
albums

压扁后的示例如下,现在看起来就跟原始的songs类似了(当然原始数据没有分组)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+--------------------------------+-----------+--------------------------------+
| track_dict | num_years | track_list |
+--------------------------------+-----------+--------------------------------+
| {1994: [\'Straight\'], 199 ... | 2 | [\'Straight\', \'With Davi ... |
| None | None | None |
| {2004: ['Grand Hang Out']} | 1 | ['Grand Hang Out'] |
| {1999: ['Simple Man']} | 1 | ['Simple Man'] |
| {1986: [\'Secret Loser\'], ... | 3 | ["You Can\'t Kill Rock And ... |
| None | None | None |
| {1973: ['Rocking Time']} | 1 | ['Rocking Time'] |
| None | None | None |
| None | None | None |
| {1976: ['Have A Talk With ... | 1 | ['Have A Talk With God'] |
| ... | ... | ... |
+--------------------------------+-----------+--------------------------------+

可以在list或dict上直接过滤,比如下面要删除1994或1999的所有歌曲

1
albums['track_dict'] = albums['track_dict'].dict_trim_by_keys([1994, 1999])

比如下面第一行记录和上面的第一行记录相比,1994的歌曲Straight不在track_dict中,上面第四行1999的Simple Man也被删了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+--------------------------------+-----------+--------------------------------+
| track_dict | num_years | track_list |
+--------------------------------+-----------+--------------------------------+
| {1997: [\'With David Bowie ... | 2 | [\'Straight\', \'With Davi ... |
| None | None | None |
| {2004: ['Grand Hang Out']} | 1 | ['Grand Hang Out'] |
| {} | 1 | ['Simple Man'] |
| {1986: [\'Secret Loser\'], ... | 3 | ["You Can\'t Kill Rock And ... |
| None | None | None |
| {1973: ['Rocking Time']} | 1 | ['Rocking Time'] |
| None | None | None |
| None | None | None |
| {1976: ['Have A Talk With ... | 1 | ['Have A Talk With God'] |
| ... | ... | ... |
+--------------------------------+-----------+--------------------------------+

dict对条目查找非常适合,可以在dict根据key进行过滤。比如下面的代码会在track_dict中,
只要这样记录中有一条1965的记录,就把整条记录都查询出来。类似于contains方法

1
albums[albums['track_dict'].dict_has_any_keys(1965)]

把上面所有的列打包成一列:

1
2
big_list = albums.pack_columns(albums.column_names())
big_list

比如下面第一行第一个元素实际上是artist_name列,第二个元素是release列,所有列组成一个list
当然如果要反过来,把list分隔成单独的列,可以用unpack函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--------------------------------+
| X1 |
+--------------------------------+
| [\'Veruca Salt\', \'Eight ... |
| ['Les Compagnons De La Cha ... |
| ['Nelly / Fat Joe / Young ... |
| ['The Grouch', 'My Baddest ... |
| [\'Ozzy Osbourne\', \'Diar ... |
| ['Peter Hunnigale', 'Regga ... |
| ['Burning Spear', 'Studio ... |
| ['Bond', 'New Classix 2008 ... |
| ['Lee Coombs feat. Katheri ... |
| ['Stevie Wonder', 'Songs I ... |
+--------------------------------+

SparkRDD

将Spark的RDD和GraphLab的SFrame进行转换,实际上Spark也有DataFrame的概念,所以可以认为Spark的DataFrame和GraphLab的SFrame是类似的。(其实它们都借鉴了python pandas的DataFrame概念)

1
2
➜  spark-sframe git:(master) build/mvn package
[INFO] Replacing /Users/zhengqh/Github/_graph/spark-sframe/target/uber-spark_unity-0.1.jar

SGraph(图数据)

创建图

SGraph是基于SFrame之上提供支持图操作的一个数据结构。注意SGraph是个不可变对象,所以添加一个顶点或一条边后,要重新赋值。否则原来的graph仍然是空的。

1
2
3
4
5
6
7
8
9
10
11
12
13
from graphlab import SGraph, Vertex, Edge
g = SGraph()
verts = [Vertex(0, attr={'breed': 'labrador'}),
Vertex(1, attr={'breed': 'labrador'}),
Vertex(2, attr={'breed': 'vizsla'})]
#重新赋值
g = g.add_vertices(verts)
g = g.add_edges(Edge(1, 2))
print g

SGraph({'num_edges': 1, 'num_vertices': 3})
Vertex Fields:['__id', 'breed']
Edge Fields:['__src_id', '__dst_id']

因为在图对象上调用添加顶点和边的方法返回的仍然是一个图对象,所以可以用链式调用的方式创建一张图

1
2
3
from graphlab import SGraph, Vertex, Edge
g = SGraph().add_vertices([Vertex(i) for i in range(10)]).add_edges(
[Edge(i, i+1) for i in range(9)])

也可以基于SFrame的边列表构建图,不需要顶点,因为边中就包含了顶点。所以顶点会被自动加入到图中。
边的值列表中不作为source或者destination的都被作为边的属性。

1
2
3
4
5
6
7
8
from graphlab import SFrame
edge_data = SFrame.read_csv('http://s3.amazonaws.com/dato-datasets/bond/bond_edges.csv')

g = SGraph()
g = g.add_edges(edge_data, src_field='src', dst_field='dst')
print g

SGraph({'num_edges': 20, 'num_vertices': 10})

当然也可以基于顶点和边一起来构造一张图:

1
2
3
4
5
from graphlab import SFrame
edge_data = SFrame.read_csv('http://s3.amazonaws.com/dato-datasets/bond/bond_edges.csv')
vertex_data = SFrame.read_csv('http://s3.amazonaws.com/dato-datasets/bond/bond_vertices.csv')

g = SGraph(vertices=vertex_data, edges=edge_data, vid_field='name', src_field='src', dst_field='dst')

实际上上面构造SGraph的几种方式可以认为通过构造函数,构造函数可以是空的就要通过方法添加顶点和边,或者只有边,或者有边也有顶点,后两者都不需要通过方法,因为构造函数时已经把数据传入图对象中了。

检索图

小图可以用show方法展示,顶点的label标签可以是id或者任何的顶点属性,比如下面指定了标签为id

1
g.show(vlabel='id', highlight=['James Bond', 'Moneypenny'], arrows=True)

大图显然不能用show把所有边和节点都展现出来,不过可以用g.summary()显示概要信息:图的节点数量和边数量。
或者通过get_vertices和get_edges方法返回SGraph的内容,这个返回值是SFrame,同样可以根据id或其他顶点属性

1
2
3
4
5
6
7
8
9
print g.summary()

#查找图中id为邦德的顶点
sub_verts = g.get_vertices(ids=['James Bond'])
print sub_verts

#查找图中边的属性为worksfor的所有边
sub_edges = g.get_edges(fields={'relation': 'worksfor'})
print sub_edges

get_neighborhood根据给定的目标顶点返回一个子图,radis半径表示目标节点和邻居节点的最大长度,比如1就是一度邻居,2就是二度邻居(也包括一度邻居)。
full_subgraph=true表示,目标节点的一度邻居之间也可以有边。比如下图中绿色节点都是给定目标节点(邦德和Moneypenny)的一度邻居。
M和Q,以及Otto和Carver都和对应的目标节点是一度的关系,它们之间的连线是因为设置了full_subgraph才有的。
如果full_subgraph=false,则邻居之间老死不相往来,邻居只和目标节点有往来。

full_subgraph

1
2
3
targets = ['James Bond', 'Moneypenny']
subgraph = g.get_neighborhood(ids=targets, radius=1, full_subgraph=True)
subgraph.show(vlabel='id', highlight=['James Bond', 'Moneypenny'], arrows=True)

修改图

SGraph是结构化不可变的,但是存储在边和顶点中的数据则是可变的。SGraph.vertices和SGraph.edges的返回值是SFrame。所以SFrame可以分成两种:

  1. 和graph相关的SFrame,这种叫做special SFrame
  2. 正常的SFrame,叫做normal SFrame

虽然下面两个方法打印的内容都是相同的,不过g.get_edges返回的是正常的SFrame(和SGraph对象毫无关系),
而g.edges返回的则是和Graph g绑定在一起的SFrame。要修改图中的边或节点的数据,要使用special SFrame。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
g.edges.print_rows(5)
g.get_edges().print_rows(5)

#使用special SFrame可以更新边的数据(属性)
g.edges['relation'] = g.edges['relation'].apply(lambda x: x[0].upper())
g.get_edges().print_rows(5)

#使用normal SFrame无法更新边的属性。如果说有更新,则打印结果应该是小写的,不过实际上仍然是大写的
e = g.get_edges() # e is a normal SFrame independent of g.
e['relation'] = e['relation'].apply(lambda x: x[0].lower())
g.get_edges().print_rows(5)

#在special SFrame上调用head,tail,append等方法返回的是normal/regular SFrame,所以也是无法做更新操作的
e = g.edges.head(5)
e['is_friend'] = e['relation'].apply(lambda x: x[0] == 'F')

#虽然是special SFrame,不过它也可以使用normal SFrame的一些操作,比如添加或删除边的属性,来达到修改图的目的
g.edges['weight'] = 1.0
del g.edges['weight']

triple_apply方法提供了一种修改SGraph的顶点和边属性的方式。它会异步地将自定义的函数运用到所有的边上,
允许你基于顶点数据(源顶点和目标顶点)或者边数据修改边数据(无法修改顶点数据,如果要修改,可以为顶点新建一个属性,然后在遍历过程中修改)。

  1. 定义方法,输入参数:源顶点,边,目标顶点
  2. 为SGraph的顶点节点添加一个新的属性,比如degree,初始值为0
  3. 自定义的方法被运用到图中所有的边
1
2
3
4
5
6
7
8
9
10
11
#在遍历每条边的时候,为源节点和目标节点的degree属性值+1
def increment_degree(src, edge, dst):
src['degree'] += 1
dst['degree'] += 1
return (src, edge, dst)

#每个顶点都添加一个新的属性degree,初始值为0
g.vertices['degree'] = 0

g = g.triple_apply(increment_degree, mutated_fields=['degree'])
print g.vertices.sort('degree', ascending=False)

计算每个顶点的degree的过程如下图:

degree

返回结果如下,可以看到邦德是最受欢迎的人。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+----------------+--------+--------+-----------------+---------+
| __id | degree | gender | license_to_kill | villian |
+----------------+--------+--------+-----------------+---------+
| James Bond | 8 | M | 1 | 0 |
| Elliot Carver | 7 | M | 0 | 1 |
| M | 6 | M | 1 | 0 |
| Moneypenny | 4 | F | 1 | 0 |
| Q | 4 | M | 1 | 0 |
| Paris Carver | 3 | F | 0 | 1 |
| Inga Bergstorm | 2 | F | 0 | 0 |
| Henry Gupta | 2 | M | 0 | 1 |
| Wai Lin | 2 | F | 1 | 0 |
| Gotz Otto | 2 | M | 0 | 1 |
+----------------+--------+--------+-----------------+---------+

时间序列数据

时间序列数据通常将时间撮作为索引(就好像表的主键一样),根据时间撮可以做这些事情:

  1. 以不同的时间间隔对数据分组
  2. 将不同时间段的数据进行聚合
  3. 将数据转换成正常的分散的间隔,比如将秒级别转换成小时级别,去掉分秒的精度
  4. 窗口操作

TimeSeries对象也是基于SFrame,指定索引列,通过TimeSeries构造函数就可以将普通的SFrame转换为TimeSeries对象。

1
2
3
4
5
6
7
8
9
10
import graphlab as gl

household_data = gl.SFrame("http://s3.amazonaws.com/dato-datasets/household_electric_sample.sf")
household_ts = gl.TimeSeries(household_data, index="DateTime")

#TimeSeries可以转换为SFrame
sf = household_ts.to_sframe()

#TimeSeries中的每一列都是一个SArray对象,选择部分列:
ts_power = household_ts[['Global_active_power', 'Global_reactive_power']]

时间撮建立索引后,会按照时间撮排序,示例数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+---------------------+---------------------+-----------------------+---------+
| DateTime | Global_active_power | Global_reactive_power | Voltage |
+---------------------+---------------------+-----------------------+---------+
| 2006-12-16 17:24:00 | 4.216 | 0.418 | 234.84 |
| 2006-12-16 17:26:00 | 5.374 | 0.498 | 233.29 |
| 2006-12-16 17:28:00 | 3.666 | 0.528 | 235.68 |
| 2006-12-16 17:29:00 | 3.52 | 0.522 | 235.02 |
| 2006-12-16 17:31:00 | 3.7 | 0.52 | 235.22 |
| 2006-12-16 17:32:00 | 3.668 | 0.51 | 233.99 |
| 2006-12-16 17:40:00 | 3.27 | 0.152 | 236.73 |
| 2006-12-16 17:43:00 | 3.728 | 0.0 | 235.84 |
| 2006-12-16 17:44:00 | 5.894 | 0.0 | 232.69 |
| 2006-12-16 17:46:00 | 7.026 | 0.0 | 232.21 |
+---------------------+---------------------+-----------------------+---------+

Resampling

  • Mapping – The operation that determines which time slice a specific observation belongs to. 决定时间片/粒度
  • Interpolation/Upsampling – The operation used to fill in the missing values when there are no observations that map to a particular time slice.
  • Aggregation/Downsampling –The operation used to aggregate multiple observations that belong to the same time slice. 聚合操作

下面的示例中将属于同一天的记录,取最大值(每一列都要取)。可以这么理解:

  1. 把每条记录的时间撮格式从yyyyMMdd hh:mm:ss转换为yyyyMMdd
  2. 将属于同一天yyyyMMdd的记录中取最大的那条记录代表当天的记录
1
2
3
4
import datetime as dt

day = dt.timedelta(days = 1)
daily_ts = household_ts.resample(day, downsample_method='max', upsample_method=None)

从示例结果可以看到原先每分钟都有一条记录,现在结果集中一天只有一条记录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+---------------------+---------------------+-----------------------+---------+
| DateTime | Global_active_power | Global_reactive_power | Voltage |
+---------------------+---------------------+-----------------------+---------+
| 2006-12-16 00:00:00 | 7.026 | 0.528 | 243.73 |
| 2006-12-17 00:00:00 | 6.58 | 0.582 | 249.07 |
| 2006-12-18 00:00:00 | 5.436 | 0.646 | 248.48 |
| 2006-12-19 00:00:00 | 7.84 | 0.606 | 248.89 |
| 2006-12-20 00:00:00 | 5.988 | 0.482 | 249.48 |
| 2006-12-21 00:00:00 | 5.614 | 0.688 | 247.08 |
| 2006-12-22 00:00:00 | 7.884 | 0.622 | 248.82 |
| 2006-12-23 00:00:00 | 8.698 | 0.724 | 246.77 |
| 2006-12-24 00:00:00 | 6.498 | 0.494 | 249.27 |
| 2006-12-25 00:00:00 | 6.702 | 0.7 | 250.62 |
+---------------------+---------------------+-----------------------+---------+

多个TimeSeries可以对被索引的时间撮列进行join:

1
2
3
sf_other = gl.SFrame('http://s3.amazonaws.com/dato-datasets/household_electric_sample_2.sf')
ts_other = gl.TimeSeries(sf_other, index = 'DateTime')
household_ts.index_join(ts_other, how='inner')

时间序列的范围可以通过slice进行过滤。

1
2
3
4
5
6
7
8
9
import datetime as dt

start = dt.datetime(2006, 12, 16, 17, 24)
end = dt.datetime(2007, 11, 26, 21, 2)
sliced_ts = household_ts.slice(start, end)

start = dt.datetime(2010,1,1)
end = dt.datetime(2011,1,1)
ts_2010 = household_ts.slice(start, end)

将一个大的时间序列分成多组小的时间序列,比如按相同工作日分组,周一的数据都放在同一组,以此类推
返回值GroupedTimeSeries,每一个小组都是一个单独的时间序列。

1
2
3
4
5
6
7
8
9
household_ts_groups = household_ts.group(gl.TimeSeries.date_part.WEEKDAY)
print household_ts_groups.groups()

#获取其中一组
household_ts_monday = household_ts_groups.get_group(0)

#循环所有组
for name, group in household_ts_groups:
print name, group

还可以将上面分在不同组的TimeSeries通过union组合成一更大的TimeSeries

1
2
3
4
household_ts_combined = household_ts_groups.get_group(0)
for i in range(1, 7):
group = household_ts_groups.get_group(i)
household_ts_combined = household_ts_combined.union(group)

group union ts

Rolling

基于当前记录,统计过去几条或者往后几条,比如

  1. window_start = -5 and window_end = 0,前5条,包括自己
  2. window_start = 0 and window_end = 5,后5条,包括自己
  3. window_start = -5 and window_end = -1,前5条,不包括自己
  4. window_start = 1 and window_end = 5,后5条,不包括自己

对于第一条记录而言,如果要统计前面几条,显然因为前面没有任何记录。同理最后一条的后面也没有任何记录。
Rolling的算法有mean,sum,counts,variance,min,max,stand等。

1
2
daily_ts['Global_reactive_power_rolling_mean'] = daily_ts['Global_reactive_power'].rolling_mean(-20, 0)
daily_ts[['Global_reactive_power_rolling_mean', 'Global_reactive_power']].print_rows(50)

和rolling aggregates相对的是cumulative aggregates,使用所有之前的记录(包含当前)。该方法没有start或end参数

1
2
daily_ts['Global_reactive_power_cumulative_mean'] = daily_ts['Global_reactive_power'].cumulative_mean()
daily_ts[['Global_reactive_power_cumulative_mean', 'Global_reactive_power']]

不同聚合方式针对的特点:

  • rolling means:capture recent trends in the data 最近的趋势
  • cumulative means:spot global trends in the data 全局的趋势

数据建模

Classification 分类模型

分类有多种模型,不过所有模型都提供下面四种方法(不一定每种模型都需要调用所有方法,比如分类模型调用classify,但不需要调用predict,回归模型调用predict不需要调用classify),GraphLab还可以自动选择最优的模型。

  1. create:创建模型
  2. predict:在模型上做预测(分类模型没有?)
  3. classify:提供充足的统计,来对数据进行分类(只有分类模型才有)
  4. evaluate:测量步骤2的预测的性能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import graphlab as gl

# Load the data
data = gl.SFrame('http://s3.amazonaws.com/dato-datasets/regression/yelp-data.csv')

# Restaurants with rating >=3 are good 评分大于3的餐厅是不错的
data['is_good'] = data['stars'] >= 3

# Make a train-test split 训练集和测试集
train_data, test_data = data.random_split(0.8)

# Automatically picks the right model based on your data. 自动选择最优模型
model = gl.classifier.create(train_data, target='is_good',
features = ['user_avg_stars',
'business_avg_stars',
'user_review_count',
'business_review_count'])

# Generate predictions (class/probabilities etc.), contained in an SFrame.
predictions = model.classify(test_data)

# Evaluate the model, with the results stored in a dictionary
results = model.evaluate(test_data)

Regression 回归模型

Regression和Classification的区别是前者的值是离散的,后者的值是固定的(分类就那么几种)。

回归模型简单来说就是:基于训练的数据(training data),学习出一个function,用来做预测(prediction)。
输入是input features,经过学习出来的function后,输出是output target。

图分析

1
2
3
4
5
6
7
8
9
10
11
import os

data_file = 'US_business_links'
if os.path.exists(data_file):
sg = graphlab.load_sgraph(data_file)
else:
url = 'http://s3.amazonaws.com/dato-datasets/' + data_file
sg = graphlab.load_sgraph(url)
sg.save(data_file)

print sg.summary()

PageRank

1
2
3
4
5
pr = graphlab.pagerank.create(sg, max_iterations=10)
print pr.summary()

pr_out = pr['pagerank']
print pr_out.topk('pagerank', k=10)

三角形数量

1
2
3
4
5
tri = graphlab.triangle_counting.create(sg)
print tri.summary()

tri_out = tri['triangle_count']
print tri_out.topk('triangle_count', k=10)

最短路径

1
2
3
sssp = graphlab.shortest_path.create(sg, source_vid='Microsoft')
sssp.get_path(vid='Weyerhaeuser', show=True,
highlight=['Microsoft', 'Weyerhaeuser'], arrows=True, ewidth=1.5)

Clustering

Application

推荐系统

推荐场景:你可能感兴趣的(给定user_id,返回items),相似的商品(给定item_id,返回items)
必备数据:user_id, item_id, rate

1
2
3
4
5
6
7
8
actions = gl.SFrame.read_csv('./dataset/ml-20m/ratings.csv')
items = gl.SFrame.read_csv('./dataset/ml-20m/movies.csv')

training_data, validation_data = gl.recommender.util.random_split_by_user(actions, 'userId', 'movieId')
model = gl.recommender.create(training_data, 'userId', 'movieId')

# You can now make recommendations for all the users you've just trained on
results = model.recommend()

Ref

http://cloga.info/graphlab%20create/2014/07/26/graphlab_create/


文章目录
  1. 1. Dato Quickstart
    1. 1.1. Getting Started with GraphLab Create
    2. 1.2. ipython notebook
    3. 1.3. SFrame(Tablular表格类型的数据)
      1. 1.3.1. 加载和保存
      2. 1.3.2. 数据转换
      3. 1.3.3. 复杂类型转换
      4. 1.3.4. SparkRDD
    4. 1.4. SGraph(图数据)
    5. 1.5. 时间序列数据
  2. 2. 数据建模
    1. 2.1. Classification 分类模型
    2. 2.2. Regression 回归模型
    3. 2.3. 图分析
    4. 2.4. Clustering
  3. 3. Application
    1. 3.1. 推荐系统
  4. 4. Ref