R 高性能编程(全)
原文:
zh.annas-archive.org/md5/6fbdeff1b7c6634baa0ee71fe31e9dc1译者:飞龙
协议:CC BY-NC-SA 4.0
前言
在数据变得越来越重要的世界里,商人和科学家需要工具来高效地分析和处理大量数据。R 是近年来在数据处理、统计分析和数据科学中越来越受欢迎的工具之一。虽然 R 的根源在学术界,但现在它被广泛应用于各个行业和地理区域的组织中。
但 R 的设计对它能够高效管理的数据大小和计算复杂度施加了一些固有的限制。这对需要处理组织中不断增长的数据量的 R 用户来说可能是一个巨大的障碍。
本书,《R 高性能编程》,将帮助你理解在 R 中经常导致性能困难的情况,例如内存和计算限制。它还将展示一系列克服这些性能限制的技术。你可以选择单独使用这些技术,或者根据你的需求和计算环境选择最佳组合。
本书旨在成为一本实用的指南,介绍如何提高 R 程序的性能,同时提供足够的解释来说明为什么,以便你理解每个解决方案背后的推理。因此,我们将为本书中涵盖的每个技术提供代码示例,以及我们在自己的机器上生成的性能分析结果,以展示性能提升。我们鼓励你通过在自己的环境中输入和运行代码来跟随,以亲自看到性能提升。
如果你想要了解 R 的设计以及为什么它有性能限制,R 内部文档(cran.r-project.org/doc/manuals/r-release/R-ints.html)将提供有用的线索。
本书基于开源 R 编写,因为它是最广泛使用的 R 版本,并且对任何人都是免费可用的。如果你使用的是商业版本的 R,请咨询你的软件供应商,看看他们可能为你提供了哪些性能改进。
R 社区已经创建了众多新包来提升 R 的性能,这些包可以在综合 R 存档网络(CRAN)(cran.r-project.org/)上找到。我们无法分析 CRAN 上的每一个包——它们有成千上万个——以查看它们是否为特定操作提供了性能提升。相反,本书专注于 R 程序员最常见的工作任务,并介绍你可以用于任何 R 项目的技巧。
本书涵盖的内容
第一章, 理解 R 的性能 – 为什么 R 程序有时会慢?,通过一瞥 R 的内部结构来开启我们的旅程,探索 R 程序可能遇到性能限制的各种方式。我们将探讨 R 的设计如何在计算(CPU)、内存(RAM)和磁盘输入/输出(I/O)方面在 R 程序中创建性能瓶颈。
第二章, 性能分析 – 测量代码的性能,介绍了我们将在整本书中使用的几种技术来测量 R 代码的性能,以便我们能够理解我们的性能问题的本质。
第三章, 简单调整以使 R 运行更快,描述了如何提高 R 代码的计算速度。这些是您可以在任何 R 程序中使用的基技术。
第四章, 使用编译代码以获得更高的速度,探讨了在另一种编程语言(如 C)中使用编译代码来最大化我们计算的性能。我们将看到编译代码如何比 R 运行得更快,并探讨如何将编译代码集成到我们的 R 程序中。
第五章, 使用 GPU 使 R 运行更快,通过利用图形处理单元(GPU)来以高速运行复杂计算,将我们带入现代加速器的领域。
第六章, 简单调整以减少使用 RAM,描述了管理并优化 R 程序 RAM 利用的基本技术,以便您能够处理更大的数据集。
第七章, 使用有限 RAM 处理大型数据集,解释了如何使用内存高效的数据结构和磁盘驻留数据格式来处理比可用 RAM 更大的数据集。
第八章, 使用并行计算提高性能,介绍了 R 中的并行性。我们将探索如何在单台机器和多台机器上并行运行 R 代码。我们还将探讨在设计我们的并行代码时需要考虑的因素。
第九章, 将数据处理任务卸载到数据库系统中,描述了某些计算可以卸载到外部数据库系统。这有助于最小化大数据在数据库中的进出移动,尤其是在您已经可以访问一个具有计算能力和速度的强大数据库系统时。
第十章, R 和大数据,通过探索使用大数据技术将 R 的性能提升到极限来结束本书。
如果你急于阅读,我们建议你首先阅读以下章节,然后根据你的情况补充阅读其他相关章节:
-
第一章, 理解 R 的性能 – 为什么 R 程序有时运行缓慢?
-
第二章, 性能分析 – 测量代码的性能
-
第三章, 简单调整以使 R 运行更快
-
第六章, 简单调整以使用更少的 RAM
你需要为这本书准备
本书中的所有代码都是在 Mac OS X 10.9 上的 R 3.1.1 64 位下开发的。 wherever possible,它们也已在 Ubuntu 桌面 14.04 LTS 和 Windows 8.1 上进行了测试。所有代码示例都可以从 github.com/r-high-performance-programming/rhpp-2015 下载。
要跟随代码示例,我们建议你在你的环境中安装 R 3.1.1 64 位或更高版本。
我们还建议你在 Unix 环境中运行 R(这包括 Linux 和 Mac OS X)。虽然 R 也可以在 Windows 上运行,但我们将要使用的某些包,例如 "bigmemory",只能在 Unix 环境中运行。在我们的代码示例中,如果 Unix 和 Windows 之间存在差异,我们将指出它们。
你需要 R 的 64 位版本,因为某些操作(例如,创建包含 2³¹ 或更多元素的向量)在 32 位版本中是不可能的。此外,R 的 64 位版本可以利用系统上可用的所有内存,而 32 位版本的限制为不超过 4 GB 的内存(在某些操作系统中,限制可能低至 2 GB)。
你还需要在你的 R 环境中安装包,因为几章中的示例将依赖于额外的包。
一些章节中的示例需要其他软件或包才能运行。这些将在各自的章节中列出,包括安装说明。
如果你无法访问示例中所需的一些软件和工具,你可以在亚马逊网络服务(AWS)上运行它们。特别是,第五章中的示例,使用 GPU 使 R 运行更快,需要一个具有 CUDA 功能的 NVIDIA GPU 的计算机;第九章中的示例,将数据处理卸载到数据库系统,需要各种数据库系统;第十章中的示例,R 和大数据,需要 Hadoop。
要使用 AWS,请使用您的亚马逊账户登录到aws.amazon.com/。如果您还没有账户,请创建一个。创建账户是免费的,但使用服务器、存储和其他资源会产生费用。请咨询 AWS 网站了解您首选区域的最新价格。
AWS 服务在全球的不同区域提供。在撰写本书时,有八个区域——美国有三个,欧洲有一个,亚太地区有三个,南美洲有一个。选择您喜欢的任何区域,例如您所在位置附近或价格最低的区域。要选择区域,请转到 AWS 控制台(console.aws.amazon.com)并选择右上角的位置。一旦选择了区域,请为本书中所有示例所需的 AWS 资源使用相同的区域。
在设置任何计算资源(如服务器或 Hadoop 集群)之前,您需要一个密钥对来登录服务器。如果您还没有 AWS 弹性计算云(EC2)密钥对,请按照以下步骤生成一个:
-
前往 AWS 控制台并点击 EC2。
-
点击左侧菜单中的密钥对。
-
点击创建密钥对。
-
为新的密钥对输入一个名称(例如,mykey)。
-
一旦点击创建,私钥(例如,mykey.pem)将下载到您的计算机上。
在 Linux 和 Mac OS X 上,使用 chmod 400 mykey.pem 在终端窗口中更改私钥文件的权限,以允许所有者仅读取访问。
本书面向的对象
如果您已经是 R 程序员并想找到提高代码效率的方法,那么这本书适合您。虽然您需要熟悉并舒适地使用 R,但您不需要对该语言有深入的专业知识。您需要从本书中受益的技能包括:
-
在您的计算机上安装、升级和运行 R
-
在您的 R 环境中安装和升级 CRAN 软件包
-
创建和操作基本数据结构,如向量、矩阵、列表和数据框
-
使用和在不同 R 数据类型之间转换
-
执行算术、逻辑和其他基本 R 操作
-
使用 R 控制语句,如 if、for、while 和 repeat
-
编写 R 函数
-
使用 R Graphics 绘制图表
如果你是 R 语言的新手,想学习如何编写 R 程序,有许多书籍、在线课程、教程和其他资源可供选择。只需使用您喜欢的搜索引擎搜索即可。
习惯用法
在本书中,您将找到多种文本样式,用于区分不同类型的信息。以下是一些这些样式的示例及其含义的解释。
文本中的代码单词、数据库表名、文件夹名、文件名、文件扩展名、路径名、虚拟 URL、用户输入和 Twitter 处理方式如下:“为了编译函数,我们将使用编译器包中的 cmpfun() 函数。”
代码块设置如下:
fibonacci_rec <- function(n) {if (n <= 1) {return(n)}return(fibonacci_rec(n - 1) + fibonacci_rec(n - 2))
}
新术语和重要词汇以粗体显示。屏幕上显示的词汇,例如在菜单或对话框中,将以如下方式显示:“请确保在安装向导中选择包作者安装和编辑系统 PATH选项。”
注意
警告或重要注意事项以如下方式显示在框中。
小贴士
小技巧和窍门看起来像这样。
读者反馈
我们始终欢迎读者的反馈。让我们知道您对这本书的看法——您喜欢什么或可能不喜欢什么。读者反馈对我们开发您真正从中获得最大收益的标题非常重要。
要发送一般反馈,只需发送电子邮件到 <feedback@packtpub.com>,并在邮件主题中提及书籍标题。
如果您在某个主题上具有专业知识,并且您对撰写或为书籍做出贡献感兴趣,请参阅我们的作者指南www.packtpub.com/authors。
客户支持
现在您是 Packt 书籍的骄傲拥有者,我们有一些事情可以帮助您从购买中获得最大收益。
下载示例代码
您可以从www.packtpub.com的账户下载您购买的所有 Packt 书籍的示例代码文件。如果您在其他地方购买了这本书,您可以访问www.packtpub.com/support并注册,以便将文件直接通过电子邮件发送给您。
错误列表
尽管我们已经尽一切努力确保内容的准确性,但错误仍然可能发生。如果您在我们的书中发现错误——可能是文本或代码中的错误——如果您能向我们报告这一点,我们将不胜感激。通过这样做,您可以节省其他读者的挫败感,并帮助我们改进本书的后续版本。如果您发现任何错误,请通过访问www.packtpub.com/submit-errata,选择您的书籍,点击错误提交表单链接,并输入您的错误详情。一旦您的错误得到验证,您的提交将被接受,错误将被上传到我们的网站,或添加到该标题的错误列表中。任何现有的错误都可以通过从www.packtpub.com/support选择您的标题来查看。
侵权
互联网上版权材料的侵权是一个持续存在的问题,所有媒体都存在。在 Packt,我们非常重视我们版权和许可证的保护。如果您在互联网上发现我们作品的任何非法副本,无论形式如何,请立即提供位置地址或网站名称,以便我们可以寻求补救措施。
请通过 <copyright@packtpub.com> 联系我们,并提供涉嫌侵权材料的链接。
我们感谢您在保护我们作者以及为我们提供有价值内容方面的帮助。
问题
如果你在本书的任何方面遇到问题,可以通过 <questions@packtpub.com> 联系我们,我们将尽力解决。
第一章. 理解 R 的性能——为什么 R 程序有时运行缓慢?
R 是一个用于统计分析与数据处理的优秀工具。当它在 1993 年首次开发时,它被设计为一个用于教授数据分析课程的工具。由于其使用简便,在接下来的 20 年里,它越来越受欢迎,不仅在学术界,也在政府和企业界。R 还是一个开源工具,因此其用户可以免费使用它,并向 R 公共仓库(称为 CRAN Comprehensive R Archive Network)贡献新的统计包。在撰写本书时,随着 CRAN 库的丰富,拥有超过 6,000 个经过良好文档记录且可立即使用的包,R 的吸引力进一步增加。在这 20 年里,组织和个人创造、传输、存储和分析的数据量也呈指数级增长。需要处理和分析不断增长的数据量的 R 程序员有时会发现,在如此重的负载下,R 的性能受到影响。为什么 R 有时表现不佳,我们如何克服其性能限制?本书探讨了 R 性能背后的因素,并提供了各种技术来提高 R 程序的性能,例如优化内存使用、并行计算或甚至利用外部数据处理系统的计算能力。
在我们找到 R 性能问题的解决方案之前,我们需要了解是什么使得 R 在某些情况下表现不佳。本章通过揭开 R 的设计面纱,了解其设计如何限制 R 程序的性能,从而开启了我们对高性能 R 编程的探索。
我们将考察任何计算任务面临的主要限制——CPU、RAM 和磁盘 输入/输出(I/O)——然后看看这些限制在 R 程序中具体是如何体现的。到本章结束时,你将对你的 R 程序可能遇到的瓶颈有所了解。
本章涵盖了以下主题:
-
计算性能的三个限制因素——CPU、RAM 和磁盘 I/O
-
R 是即时解释的
-
R 是单线程的
-
R 需要将所有数据加载到内存中
-
算法设计影响时间和空间复杂度
计算性能的三个限制因素——CPU、RAM 和磁盘 I/O
首先,让我们看看 R 程序在计算机中的执行方式。这是一个非常简化的实际发生情况的版本,但它足以让我们了解 R 的性能限制。以下图展示了执行一个 R 程序所需的步骤。

执行 R 程序的步骤
以这个简单的 R 程序为例,它从 CSV 文件中加载数据,计算列总和,并将结果写入另一个 CSV 文件:
data <- read.csv("mydata.csv")
totals <- colSums(data)
write.csv(totals, "totals.csv")
我们使用编号来理解前面的图:
-
当我们加载和运行一个 R 程序时,R 代码首先被加载到 RAM 中。
-
然后,R 解释器将 R 代码翻译成机器码,并将机器码加载到 CPU 中。
-
CPU 执行程序。
-
程序将从硬盘将待处理的数据加载到 RAM 中(例如示例中的
read.csv())。 -
数据被分小块加载到 CPU 中进行处理。
-
CPU 逐块处理数据,并与 RAM 交换数据块,直到所有数据都被处理(在示例中,CPU 执行
colSums()函数的指令来计算数据集的列总和)。 -
有时,处理过的数据会被存储回硬盘上(例如示例中的
write.csv())。
从这个计算过程的描述中,我们可以看到几个可能发生性能瓶颈的地方:
-
CPU 的速度和性能决定了计算指令,如示例中的
colSums(),的执行速度。这包括将 R 代码解释成机器码以及实际执行机器码来处理数据。 -
计算机上可用的 RAM 大小限制了任何给定时间可以处理的数据量。在这个例子中,如果
mydata.csv文件包含的数据量超过了 RAM 的容量,read.csv()的调用将会失败。 -
数据从硬盘读取或写入的速度(例如示例中的
read.csv()和write.csv()),即硬盘输入/输出(I/O)的速度,影响数据加载到内存和存储回硬盘的速度。
有时,你可能会一次遇到这些限制因素中的一个。例如,当数据集足够小,可以快速从磁盘读取并完全存储在 RAM 中,但对其进行的计算很复杂时,那么只会遇到 CPU 限制。在其他时候,你可能会发现它们以各种组合同时出现。例如,当数据集非常大时,从磁盘加载它需要很长时间,在任何给定时间只能将一小部分加载到内存中,并且对它的任何计算都需要很长时间。在两种情况下,这些都是性能问题的症状。为了诊断问题并找到解决方案,我们需要查看幕后可能引起这些限制发生的情况。
现在我们来看看 R 的设计和工作方式,以及它的性能意味着什么。
R 是即时解释的
在计算机科学术语中,R 被称为一种解释型语言。这意味着每次你执行一个 R 程序时,R 解释器都会即时解释并执行 R 代码。以下图示说明了当你运行任何 R 代码时会发生什么:

解释型语言与编译型语言
R 首先将你的源代码解析成 R 内部对象表示,其中包括 R 代码中的所有语句和表达式。然后 R 评估这个内部 R 对象以执行代码。
这就是为什么 R 是一种如此动态和交互式的编程语言。你可以将 R 语句输入到 R 控制台中并立即得到结果,因为 R 解释器会立即解析和评估代码。这种方法的不利之处在于,R 代码运行相对较慢,因为每次运行它时都需要重新解释,即使它没有改变。
与 C 或 Fortran 这样的编译型语言相比,当你使用编译型语言时,你需要在执行之前将源代码编译成机器代码。这使得编译型语言交互性较差,因为编译步骤可能需要几分钟,即使是对于大型程序,即使你只是对代码进行了微小的更改。另一方面,一旦代码被编译,它会在 CPU 上非常快速地运行,因为它已经是计算机的本地语言。
由于 R 是一种解释型语言,每次你运行一个 R 程序时,CPU 都忙于做两件事:解释你的代码和执行其中的指令。因此,CPU 的速度可以限制 R 程序的性能。我们将在第三章到第五章中学习如何克服 CPU 的限制。
R 是单线程的。
R 在 CPU 上的限制还有另一个方面,即默认情况下,它只在 CPU 的一个线程上运行。无论你在具有 64 个 CPU 核心的强大服务器上安装 R,R 都只会使用其中一个。例如,求一个数值向量的和是一个可以在 CPU 上非常容易并行运行的运算。如果有四个 CPU 核心可用,每个核心可以处理大约四分之一的数据。每个核心计算它所分配的数据块的小计,然后四个小计相加以找到整个数据集的总和。然而,在 R 中,sum() 函数是串行运行的,在一个 CPU 核心上处理整个数据集。实际上,许多大数据操作与这里的求和示例具有相似的性质,相同的任务在许多数据子集上独立运行。在这种情况下,顺序执行操作将是今天大多数并行计算架构的浪费。在第八章《通过并行计算提高性能》中,我们将学习如何在 R 中编写并行程序以克服这一限制。
R 需要将所有数据加载到内存中。
在 R 中处理的所有数据都必须完全加载到 RAM 中。这意味着一旦数据被加载,CPU 就可以对其进行处理,这对于性能来说是非常好的。另一方面,这也意味着你可以处理的数据的最大大小取决于系统上可用的空闲 RAM 量。记住,不是你电脑上的所有 RAM 都可以供 R 使用。操作系统、后台进程以及任何在 CPU 上运行的其他应用程序也会竞争使用 RAM。R 可用的 RAM 可能只是系统上安装的总 RAM 的一部分。
此外,R 还需要足够的 RAM 来存储其计算结果。根据你执行的计算类型,你可能需要的可用 RAM 是数据大小的两倍甚至更多。
R 的 32 位版本也受其可访问的 RAM 量所限制。根据操作系统不同,即使实际上有更多的 RAM 可用,它们可能也仅限于 2 GB 到 4 GB 的 RAM。此外,由于内存地址限制,R 的 32 位版本中的数据结构最多只能包含 2³¹-1 = 2,147,483,647 个元素。因为这些限制,你应该尽可能使用 R 的 64 位版本。
注意
在 R 3.0 版本之前的所有版本中,即使是 64 位版本,向量和其它数据结构都面临这个 2,147,483,647 个元素的限制。如果你的数据超过了这个大小,你需要使用 R 3.0 或其后续版本的 64 位版本。
当我们尝试加载一个比可用 RAM 大的数据集时会发生什么?有时,数据加载成功,但一旦可用 RAM 被用尽,操作系统开始将 RAM 中的数据交换到硬盘上的交换文件中。这不是 R 的特性;这取决于操作系统。当这种情况发生时,R 认为所有数据都已加载到 RAM 中,而实际上操作系统正在后台努力在 RAM 和硬盘上的交换文件之间交换数据。在这种情况下,我们除了内存瓶颈之外,还遇到了磁盘 I/O 瓶颈。由于磁盘 I/O 非常慢(硬盘的速度通常以毫秒计算,而 RAM 的速度以纳秒计算),这可能导致 R 看起来像是冻结了或者变得无响应。在我们讨论的三个性能限制中,磁盘 I/O 通常对 R 的性能影响最大。
第六章, 减少 RAM 使用的简单技巧 和 第七章, 在有限 RAM 下处理大型数据集 将讨论如何优化内存使用以及如何处理无法装入内存的大型数据集。
算法设计影响时间和空间复杂度
我们还没有讨论的一个性能因素是你的代码。你运行的计算类型和算法可以对性能产生巨大影响。计算机科学家用复杂度来描述程序的性能特征。特别是,我们关注两种类型的复杂度:
-
时间复杂度:这指的是运行 R 程序所需的计算时间与处理的数据大小之间的关系
-
空间复杂度:这指的是运行 R 程序所需的内存与处理的数据大小之间的关系
让我们看看时间复杂性的一个例子。假设我们需要编写一个函数来计算第n个斐波那契数,即序列 0, 1, 1, 2, 3, 5, 8, 13, …中的数,其中每个数是前两个数的和。一个简单的方法是编写一个递归函数,如下所示:
fibonacci_rec <- function(n) {if (n <= 1) {return(n)}return(fibonacci_rec(n - 1) + fibonacci_rec(n - 2))
}
由于第n个斐波那契数是第(n-1)个和第(n-2)个斐波那契数的和,这个函数只是调用自身来计算前两个数,然后将它们相加。让我们看看使用microbenchmark()函数计算第 25 个斐波那契数需要多长时间,这个函数可以从 CRAN 下载和安装(我们将在第二章中更详细地了解如何使用这个函数,测量代码的性能):
microbenchmark(fibonacci_rec(25), unit = "ms")
## Unit: milliseconds
## expr min lq mean median uq
## fibonacci_rec(25) 170.1014 179.8 191.4213 183.5275 197.5833
## max neval
## 253.1433 100
中位时间是 184 毫秒。由于递归的工作方式,存在大量的不必要的重复。例如,为了计算第 25 个斐波那契数,我们需要计算序列中的第 23 个和第 24 个数。但是,计算第 24 个数也涉及到计算第 23 个数,所以第 23 个数被计算了两次。而第 22 个数是计算第 23 个和第 24 个数所必需的,以此类推。
我们可以通过只计算每个数字一次来减少这种重复。以下代码展示了斐波那契函数的另一种实现方式,它正是这样做的。它按顺序从最小到最大计算斐波那契数,并在数值向量fib中记住已计算的数字。因此,每个斐波那契数只计算一次:
fibonacci_seq <- function(n) {if (n <= 1) {return(n)}# (n+1)th element of this vector is the nth Fibonacci numberfib <- rep.int(NA_real_, n + 1)fib[1] <- 0fib[2] <- 1for (i in 2:n) {fib[i + 1] <- fib[i] + fib[i - 1]}return(fib[n + 1])
}
提示
下载示例代码
你可以从你购买的所有 Packt 书籍的账户中下载示例代码文件。www.packtpub.com。如果你在其他地方购买了这本书,你可以访问www.packtpub.com/support并注册,以便将文件直接通过电子邮件发送给你。
通过基准测试这个顺序函数,我们发现它运行的中位时间是 0.04 毫秒,比递归版本减少了 99.98%!
microbenchmark(fibonacci_seq(25), unit = "ms")
## Unit: milliseconds
## expr min lq mean median uq
## fibonacci_seq(25) 0.03171 0.036133 0.0446416 0.0405555 0.04459
## max neval
## 0.114714 100
为了演示时间复杂度的概念,我们对n从 0 到 50 的不同值进行了基准测试。中位执行时间如下所示:

斐波那契函数递归版本与顺序版本的执行时间对比
随着我们增加n的值,斐波那契函数递归版本的执行时间呈指数增长。它大约与1.6^n成比例——每次n增加 1,它大约乘以 1.6 倍。执行时间增长如此之快,以至于在计算第 50 个斐波那契数之后,计算时间过长。另一方面,尽管从图表上看不出,但顺序版本的执行时间呈线性增长——每次n的增加会使执行时间增加 1.3 微秒。由于顺序版本的计算复杂度远低于递归版本,随着n的增加,它的性能将表现得更好。作为一个例子,当n=50时,顺序版本仅用了几分之一毫秒就完成了计算,而递归版本则用了超过八个小时!
虽然我们在这里不会这样做,但可以进行类似的练习来比较不同算法的空间复杂度。给定一定量的计算资源,您选择的算法和代码的设计可以对您的 R 程序实现所需性能水平的能力产生重大影响。
摘要
在本章中,我们了解了 R 程序有时会遇到计算性能面临的三个约束——CPU、RAM 和磁盘 I/O。我们探讨了 R 的设计,并学习了其解释性和单线程特性如何导致其运行缓慢,以及当数据太大而无法适应 RAM 时,它如何遇到内存和磁盘 I/O 限制。最后,我们通过比较两个具有非常不同性能特性的斐波那契函数实现来了解 R 代码的设计在确定性能方面的重要作用。
这些性能问题并非不可克服。本书的其余部分将向您展示不同的方法来克服或绕过这些问题,并释放 R 的潜在能力。
第二章. 分析 – 测量代码的性能
提高 R 程序性能的第一步是确定性能瓶颈发生的位置。为此,我们分析或测量 R 程序在运行时的性能,相对于各种指标,如执行时间、内存利用率、CPU 利用率和磁盘 I/O。这让我们对程序及其各部分的表现有一个很好的了解,因此我们可以首先解决最大的瓶颈。本章将向您展示如何使用一些简单的工具来测量 R 程序的性能。
这里应用了 80/20 法则。通常,通过解决 20%的最大性能问题,可以实现 80%的可能性能提升。我们将探讨如何确定先解决哪些问题,以便在最少的时间和精力下获得最大的改进。
本章涵盖以下主题:
-
测量总执行时间
-
分析执行时间
-
分析内存利用率
-
使用操作系统工具监控内存利用率、CPU 利用率和磁盘 I/O
-
识别和解决瓶颈
测量总执行时间
当人们说他们的程序表现不佳时,他们通常指的是执行时间或程序完成执行所需的时间。在许多情况下,执行时间可能是最重要的性能指标,因为它对人和过程有直接影响。较短的执行时间意味着 R 程序员可以更快地完成他的或她的分析,从而更快地得出见解。
结果表明,执行时间也是可以准确且详细测量的最易测量的性能特征(尽管不一定最容易解决)。因此,我们将通过学习如何测量 R 程序的执行时间来学习如何分析 R 代码。我们将学习三种不同的工具来完成这项任务:system.time()、benchmark() 和 microbenchmark()。
使用 system.time() 测量执行时间
我们将要了解的第一个分析工具是 system.time()。这是一个非常有用的工具,我们可以用它来测量任何 R 表达式的执行时间。
假设我们想知道生成 1000 万个均匀随机变量需要多长时间。看看以下语句及其在 R 控制台运行时的输出:
system.time(runif(1e8))
## user system elapsed
## 2.969 0.166 3.138
runif(1e8) 表达式生成 0 到 1 之间的 1000 万个随机值。为了测量运行此命令所需的时间,我们只需将此表达式传递给 system.time()。
输出包含三个元素,所有元素均以秒为单位:
-
用户时间:此元素是指给定表达式执行用户指令所花费的 CPU 时间,例如,遍历一个数组。它不包括其他进程使用的 CPU 时间(例如,如果计算机在后台运行病毒扫描,它所消耗的 CPU 时间不计入)。
-
系统时间:系统时间是指为给定表达式执行系统指令所收取的 CPU 时间,例如,打开和关闭文件,或分配和释放内存。这不包括其他进程使用的 CPU 时间。
-
耗时:耗时是指执行给定表达式的总时钟时间。它包括 CPU 在处理其他进程上花费的时间以及等待时间(例如,等待文件被打开以供读取)。有时,耗时可能长于用户时间和系统时间的总和,因为 CPU 正在处理其他进程的多任务,或者它必须等待资源(如文件和网络连接)可用。在其他时候,耗时可能短于用户时间和系统时间的总和。这可能会发生在使用多个线程或 CPU 执行表达式时。例如,一个需要 10 秒用户时间的任务,如果有两个 CPU 分担负载,可以在 5 秒内完成。
大多数时候,我们感兴趣的是执行给定表达式的总耗时。当表达式在单个线程上执行(R 的默认设置)时,耗时通常非常接近用户时间和系统时间的总和。如果不是这种情况,则表达式可能花费时间等待资源可用,或者系统上存在许多其他进程正在竞争 CPU 时间。
小贴士
在运行 system.time() 之前,最好关闭系统上所有不必要的程序和进程,以减少对 CPU 时间的竞争,并获得准确的测量结果。当然,不应关闭防病毒软件或其他任何关键系统软件。
注意
system.time() 声明实际上返回一个包含五个元素的向量,但它的 print() 函数只显示前三个。要查看所有五个元素,我们可以调用 print(unclass(system.time(expr)))。其他两个元素是 expr 所产生的任何子进程的执行的系统时间和用户时间。在 Windows 机器上,这些不可用,始终会显示为 NA。
这是我们多次运行 system.time() 并使用相同表达式时发生的情况:
system.time(runif(1e8))
## user system elapsed
## 2.963 0.160 3.128
system.time(runif(1e8))
## user system elapsed
## 2.971 0.162 3.136
system.time(runif(1e8))
## user system elapsed
## 2.944 0.161 3.106
通过重复运行 system.time(),我们每次都会得到略微不同的结果,因为 R 的开销、操作系统缓存机制、其他正在运行的进程以及许多其他因素可能会对执行时间产生轻微影响。
使用 rbenchmark 重复时间测量
有时多次运行相同的表达式并获取平均执行时间,甚至获取多次运行中执行时间的分布,这很有帮助。rbenchmark CRAN 软件包让我们可以轻松地做到这一点。
首先,安装并加载 rbenchmark 软件包:
install.packages("rbenchmark")
library(rbenchmark)
接下来,使用 benchmark() 运行相同的随机数生成任务 10 次,通过指定 replications=10:
bench1 <- benchmark(runif(1e8), replications=10)
bench1
## test replications elapsed relative user.self
## 1 runif(1e+08) 10 32.38 1 29.781
## sys.self user.child sys.child
## 1 2.565 0 0
结果显示了在 10 次重复中生成 1 亿个均匀随机变量所花费的总系统时间和用户时间。我们可以使用 within() 来将时间测量值除以重复次数,从而找到每次重复的平均时间:
within(bench1, {elapsed.mean <- elapsed/replicationsuser.self.mean <- user.self/replicationssys.self.mean <- sys.self/replications})
## test replications elapsed relative user.self
## 1 runif(1e+08) 10 32.38 1 29.781
## sys.self user.child sys.child sys.self.mean user.self.mean
## 1 2.565 0 0 0.2565 2.9781
## elapsed.mean
## 1 3.238
如果我们想知道每次重复的执行时间,或者执行时间在重复中的分布情况呢?我们可以将一个向量而不是单个数字作为 replications 参数传递。对于这个向量的每个元素,benchmark() 将执行指定的表达式。因此,我们可以得到随机数生成执行一次的 10 个样本,如下面的代码所示。除了用户和系统时间外,benchmark() 还返回一个额外的列,relative,它表示每次重复的经过时间与最快的一次相比如何。例如,第一次重复花费了最快重复(第四次)1.011 倍的时间,或者运行时间长了 1.1%:
benchmark(runif(1e8), replications=rep.int(1, 10))
## test replications elapsed relative user.self
## 1 runif(1e+08) 1 3.162 1.011 2.971
## 2 runif(1e+08) 1 3.145 1.005 2.951
## 3 runif(1e+08) 1 3.141 1.004 2.949
## 4 runif(1e+08) 1 3.128 1.000 2.937
## 5 runif(1e+08) 1 3.261 1.043 3.021
## 6 runif(1e+08) 1 3.207 1.025 2.993
## 7 runif(1e+08) 1 3.274 1.047 3.035
## 8 runif(1e+08) 1 3.174 1.015 2.966
## 9 runif(1e+08) 1 3.172 1.014 2.970
## 10 runif(1e+08) 1 3.230 1.033 3.004
## sys.self user.child sys.child
## 1 0.187 0 0
## 2 0.191 0 0
## 3 0.189 0 0
## 4 0.190 0 0
## 5 0.228 0 0
## 6 0.210 0 0
## 7 0.230 0 0
## 8 0.207 0 0
## 9 0.201 0 0
## 10 0.224 0 0
使用 microbenchmark 测量执行时间的分布
CRAN 包 microbenchmark 提供了另一种测量 R 表达式执行时间的方法。尽管它的 microbenchmark() 函数只测量经过时间,而不是用户时间或系统时间,但它可以给出重复运行中执行时间分布的概览。它还自动纠正了与执行时间测试相关的开销。如果你不需要测量用户或系统时间,microbenchmark() 函数非常方便,可以用来测量多次重复的短运行任务。我们将在本书中多次使用这个工具。
安装并加载 microbenchmark 包:
install.packages("microbenchmark")
library(microbenchmark)
现在,使用 microbenchmark() 运行相同的随机数生成任务 10 次:
microbenchmark(runif(1e8), times=10)
## Unit: seconds
## expr min lq median uq max
## runif(1e+08) 3.170571 3.193331 3.25089 3.299966 3.314355
## neval
## 10
统计数据显示了 10 次重复中经过时间的最小值、下四分位数、中位数、上四分位数和最大值。这让我们对相同表达式的不同重复中的经过时间分布有了概念。
分析执行时间
到目前为止,我们已经看到了如何测量整个 R 表达式的执行时间。对于包含对其他函数调用等多个部分的更复杂表达式呢?是否有方法可以深入挖掘并分析构成表达式的每个部分的执行时间?R 内置了 Rprof() 分析工具,允许我们做到这一点。让我们看看它是如何工作的。
使用 Rprof() 分析函数
在这个例子中,我们编写以下 sampvar() 函数来计算数值向量的无偏样本方差。这显然不是编写此函数的最佳方式(实际上 R 提供了 var() 函数来完成此操作),但它有助于说明代码分析是如何工作的:
# Compute sample variance of numeric vector x
sampvar <- function(x) {# Compute sum of vector xmy.sum <- function(x) {sum <- 0for (i in x) {sum <- sum + i}sum}# Compute sum of squared variances of the elements of x from# the mean musq.var <- function(x, mu) {sum <- 0for (i in x) {sum <- sum + (i - mu) ^ 2}sum}mu <- my.sum(x) / length(x)sq <- sq.var(x, mu)sq / (length(x) - 1)
}
在 sampvar() 函数中,我们定义了两个实用函数:
-
my.sum():通过遍历向量的元素来计算向量的和。 -
sq.var():通过遍历向量的元素,计算向量与给定均值平方偏差的总和。
sampvar() 函数首先计算样本均值,然后计算从该均值到平方偏差的总和,最后通过除以 n-1 来计算样本方差。
我们可以这样分析 sampvar() 函数:
x <- runif(1e7)
Rprof("Rprof.out")
y <- sampvar(x)
Rprof(NULL)
summaryRprof("Rprof.out")
## $by.self
## self.time self.pct total.time total.pct
## "sq.var" 4.38 58.24 5.28 70.21
## "my.sum" 1.88 25.00 2.24 29.79
## "^" 0.46 6.12 0.46 6.12
## "+" 0.44 5.85 0.44 5.85
## "-" 0.28 3.72 0.28 3.72
## "(" 0.08 1.06 0.08 1.06
##
## $by.total
## total.time total.pct self.time self.pct
## "sampvar" 7.52 100.00 0.00 0.00
## "sq.var" 5.28 70.21 4.38 58.24
## "my.sum" 2.24 29.79 1.88 25.00
## "^" 0.46 6.12 0.46 6.12
## "+" 0.44 5.85 0.44 5.85
## "-" 0.28 3.72 0.28 3.72
## "(" 0.08 1.06 0.08 1.06
##
## $sample.interval
## [1] 0.02
##
## $sampling.time
## [1] 7.52
这就是代码的工作方式:
-
runif(1e7)表达式生成一个包含 1000 万个随机数的随机样本。 -
Rprof("Rprof.out")表达式告诉 R 开始分析。Rprof.out是一个文件的名称,其中存储了分析数据。除非指定了另一个文件路径,否则它将存储在 R 的当前工作目录中。 -
sampvar(x)表达式调用我们刚刚创建的函数。 -
Rprof(NULL)表达式告诉 R 停止分析。否则,它将继续分析我们运行的其他 R 语句,但这些语句我们并不打算分析。 -
summaryRprof("Rprof.out")表达式打印了分析结果。
分析结果
结果被分解为几个度量:
-
self.time和self.pct列表示每个函数的耗时,不包括被函数调用的其他函数的耗时。 -
total.time和total.pct列表示每个函数的总耗时,包括函数调用内部花费的时间。
从分析数据中,我们得到一些有趣的观察:
-
sampvar()函数的self.time可以忽略不计(报告为零),这表明运行sampvar所花费的大部分时间是由它调用的函数贡献的。 -
虽然
sampvar()总共花费了 7.52 秒,但其中 5.28 秒是由sq.var()贡献的,2.24 秒是由my.sum()贡献的(参见sq.var()和my.sum()的total.time)。 -
sq.var()函数执行所需时间最长(70.21%),看起来是一个开始提高性能的好地方。 -
R 操作符
-、+和*非常快,每个操作的总耗时不超过 0.46 秒,尽管它们被执行了数百万次。
Rprof() 通过观察 R 表达式运行时的调用栈来工作,并在固定的时间间隔(默认为每 0.02 秒)对调用栈进行快照,以查看当前正在执行哪个函数。从这些快照中,summaryRprof() 可以计算出每个函数花费了多少时间。
为了更直观地查看分析数据,我们可以使用 proftools 包。我们还需要从 Bioconductor 仓库安装 graph 和 Rgraphviz 包:
install.packages("proftools")
source("http://bioconductor.org/biocLite.R")
biocLite(c("graph", "Rgraphviz"))
library(proftools)
p <- readProfileData(filename="Rprof.out")
plotProfileCallGraph(p, style=google.style, score="total")
plotProfileCallGraph() 函数生成一个直观的视觉分析图。我们使用 google.style 模板,该模板使用更大的框来显示 self.time 较长的函数。我们还指定 score="total" 以根据 total.time 为框着色。以下图显示了相同分析数据的输出:

由 plotProfileCallGrah() 生成的 sampvar() 的分析数据
从 sampvar() 我们可以看到,它有最长的 total.time 为 100%。这是预期的,因为它是被分析的功能。运行时间第二长的函数是 sq.var(),占用了 70.21% 的运行时间。sq.var() 也恰好有最长的 self.time,这可以从其框的大小中看出。因此,sq.var() 似乎是在解决性能问题时的一个很好的候选者。
Rprof() 函数是一个有用的工具,可以帮助我们了解 R 程序不同部分的表现,并快速找到我们可以解决以改进 R 代码整体性能的瓶颈。
分析内存利用率
接下来,让我们考虑如何分析 R 代码的内存使用情况。
一种方法是使用 Rprof(),通过设置 memory.profiling 参数和相应的 memory 参数到 summaryRprof():
Rprof("Rprof-mem.out", memory.profiling=TRUE)
y <- sampvar(x)
Rprof(NULL)
summaryRprof("Rprof-mem.out", memory="both")
## $by.self
## self.time self.pct total.time total.pct mem.total
## "sq.var" 4.16 54.88 5.40 71.24 1129.4
## "my.sum" 1.82 24.01 2.18 28.76 526.9
## "^" 0.56 7.39 0.56 7.39 171.0
## "+" 0.44 5.80 0.44 5.80 129.2
## "-" 0.40 5.28 0.40 5.28 140.2
## "(" 0.20 2.64 0.20 2.64 49.7
##
## $by.total
## total.time total.pct mem.total self.time self.pct
## "sampvar" 7.58 100.00 1656.2 0.00 0.00
## "sq.var" 5.40 71.24 1129.4 4.16 54.88
## "my.sum" 2.18 28.76 526.9 1.82 24.01
## "^" 0.56 7.39 171.0 0.56 7.39
## "+" 0.44 5.80 129.2 0.44 5.80
## "-" 0.40 5.28 140.2 0.40 5.28
## "(" 0.20 2.64 49.7 0.20 2.64
##
## $sample.interval
## [1] 0.02
##
## $sampling.time
## [1] 7.58
现在的输出显示了一个额外的列 mem.total,报告每个函数的内存利用率。对于这个例子,似乎运行 sampvar() 需要 1,656 MB 的内存!这对于对包含 1000 万个元素的数值向量进行计算来说似乎异常高,这将在内存中测量为只有 76.3 MB(你可以通过运行 print(object.size(x), units="auto") 来检查这一点)。
不幸的是,mem.total 是一个误导性的度量,因为 Rprof() 将内存使用量归因于在它进行快照时恰好正在运行的函数,但内存可能已经被其他函数使用且尚未释放。此外,R 的垃圾回收器定期将未使用的内存释放给操作系统,因此任何给定时间实际使用的内存可能与 Rprof() 报告的内存大相径庭。换句话说,Rprof() 提供了在运行 R 代码时分配的总内存量的指示,但没有考虑到垃圾回收器释放的内存。
要查看垃圾回收如何影响内存利用率,我们可以运行以下代码:
> gcinfo(TRUE)
y <- sampvar(x)
## Garbage collection 945 = 886+43+16 (level 0) ...
## 31.1 Mbytes of cons cells used (59%)
## 82.8 Mbytes of vectors used (66%)
## Garbage collection 946 = 887+43+16 (level 0) ...
## 31.1 Mbytes of cons cells used (59%)
## 82.8 Mbytes of vectors used (66%)
##... (truncated for brevity) ...
gcinfo(FALSE)
gcinfo(TRUE) 表达式告诉 R 在垃圾回收器释放内存时通知我们。在我们的机器上,垃圾回收器在运行 sampvar() 时被激活了 272 次!尽管 Rprof() 报告说总共分配了 1.7 GB 的内存,但垃圾回收器一直在努力释放未使用的内存,以便 R 的总内存消耗保持在可管理的 113.9 MB 左右(31.1 MB + 82.8 MB)。
由于 Rprof() 测量的是累积分配的内存,而不考虑垃圾回收,因此它不适合确定 R 程序是否会超出系统上的可用内存。gcinfo() 通过在每次垃圾回收间隔提供内存消耗的快照,提供了一个更清晰的画面,尽管仍然是一个近似值。
注意
在这种情况下,gcinfo()和gc()函数给出了相当好的内存利用率估计,因为我们的代码只使用标准的 R 操作。一些 R 包使用自定义内存分配器,而gcinfo()和gc()无法跟踪,因此内存利用率可能会被低估。
使用操作系统工具监控内存利用率、CPU 利用率和磁盘 I/O
与执行时间不同,R 没有提供任何好的工具来分析 CPU 利用率和磁盘 I/O。即使是 R 中的内存分析工具也可能无法提供完整或准确的图像。这就是我们转向操作系统提供的系统监控工具来监控 R 程序运行时的计算资源的原因。在 Windows 中是任务管理器或资源监视器,在 Mac OS X 中是活动监视器,在 Linux 中是top。运行这些工具时,寻找代表 R 的进程(通常称为R或rsession)。
我们得到的信息取决于操作系统,但以下是关注 R 资源利用率的几个关键指标:
-
% CPU 或 CPU 使用率:R 占用的系统 CPU 时间的百分比
-
% 内存,驻留内存或工作集:R 占用的系统物理内存的百分比
-
交换空间大小或页面输出:存储在操作系统交换空间中的 R 使用的内存大小
-
每秒读取或写入的字节数:R 从/向磁盘读取或写入数据的速率
此外,我们还可能想要监控以下系统级资源利用率指标:
-
% 可用内存:系统物理内存中可用于使用的百分比
-
交换空间大小或页面输出:存储在操作系统交换空间中的内存总大小
前述指标有助于解决 R 的性能问题:
-
高 CPU 利用率:CPU 很可能是 R 性能的主要瓶颈。使用本章中介绍的分析技术来识别代码中占用 CPU 时间最多的部分。
-
低 CPU 利用率,低可用系统内存,但交换空间大小大,以及高磁盘 I/O:系统可能正在耗尽物理内存,因此将内存交换到磁盘上。使用第六章中介绍的内存管理技术,减少 RAM 使用的简单调整和第七章中介绍的使用有限 RAM 处理大型数据集,以减少 R 程序所需的内存。
-
足够的可用系统内存和高磁盘 I/O:程序非常频繁地写入/读取磁盘。检查是否有不必要的 I/O 操作,并在有足够可用内存的情况下将中间数据存储在内存中。
识别和解决瓶颈
现在我们已经介绍了分析 R 代码的基本技术,我们应该首先尝试解决哪些性能瓶颈?
作为一项经验法则,我们首先尝试改进导致最大性能瓶颈的代码片段,无论是执行时间、内存利用率还是其他指标。这些可以通过前面提到的分析技术来识别。然后我们逐步解决最大的瓶颈,直到程序的整体性能足够好。
如您所回忆的,我们使用Rprof()对varsamp()函数进行了性能分析。具有最高self.time的函数是sq.var()。我们如何使这个函数运行得更快呢?我们可以将其写成向量操作的形式my.sum((x - mu) ^ 2),而不是通过遍历x的每个元素。正如我们将在下一章中看到的,将循环转换为向量操作是加快许多 R 操作的好方法。实际上,我们甚至可以完全删除该函数,因为新的向量表达式只需要一行:
# Compute sample variance of numeric vector x
sampvar <- function(x) {# Compute sum of vector xmy.sum <- function(x) {sum <- 0for (i in x) {sum <- sum + i}sum}mu <- my.sum(x) / length(x)sq <- my.sum((x - mu) ^ 2)sq / (length(x) - 1)
}x <- runif(1e7)
Rprof("Rprof-mem.out", memory.profiling=TRUE)
y <- sampvar(x)
Rprof(NULL)
summaryRprof("Rprof-mem.out", memory="both")
## $by.self
## self.time self.pct total.time total.pct mem.total
## "my.sum" 3.92 85.22 4.60 100.00 1180.6
## "+" 0.66 14.35 0.66 14.35 104.2
## "-" 0.02 0.43 0.02 0.43 83.1
##
## $by.total
## total.time total.pct mem.total self.time self.pct
## "my.sum" 4.60 100.00 1180.6 3.92 85.22
## "eval" 4.60 100.00 1180.6 0.00 0.00
## "sampvar" 4.60 100.00 1180.6 0.00 0.00
## "source" 4.60 100.00 1180.6 0.00 0.00
## "withVisible" 4.60 100.00 1180.6 0.00 0.00
## "+" 0.66 14.35 104.2 0.66 14.35
## "-" 0.02 0.43 83.1 0.02 0.43
##
## $sample.interval
## [1] 0.02
##
## $sampling.time
## [1] 4.6
这个改动将运行时间减少了 2.98 秒,并将函数运行时分配的总内存减少了 477 MB。
现在,my.sum()函数对总运行时间的贡献达到了显著的 85%。让我们用 R 中的sum()函数替换它,该函数运行得更快:
# Compute sample variance of numeric vector x
sampvar <- function(x) {mu <- sum(x) / length(x)sq <- sum((x - mu) ^ 2)sq / (length(x) - 1)
}x <- runif(1e7)
Rprof("Rprof-mem.out", memory.profiling=TRUE)
y <- sampvar(x)
Rprof(NULL)
summaryRprof("Rprof-mem.out", memory="both")
## $by.self
## self.time self.pct total.time total.pct mem.total
## "-" 0.08 100 0.08 100 76.2
##
## $by.total
## total.time total.pct mem.total self.time self.pct
## "-" 0.08 100 76.2 0.08 100
## "sampvar" 0.08 100 76.2 0.00 0
##
## $sample.interval
## [1] 0.02
##
## $sampling.time
## [1] 0.08
哇!通过两个简单的步骤,我们将sampvar()的运行时间从 7.58 秒减少到 0.08 秒(减少了 99%)。此外,Rprof()报告的内存利用率也从超过 1.6 GB 减少到仅有 76.2 MB(减少了 95.4%)。这种内存分配和垃圾回收的减少也在加快我们的代码中发挥了重要作用。
让我们比较我们的代码与 R 函数var()的运行速度,后者是用 C 编写的以实现最佳性能(我们将在第四章,使用编译代码以获得更高的速度):
library(microbenchmark)
microbenchmark(sampvar(x), var(x))
## Unit: milliseconds
## expr min lq median uq max neval
## sampvar(x) 44.31072 44.90836 50.38668 62.14281 74.93704 100
## var(x) 35.62815 36.60720 37.04430 37.88039 42.85260 100
我们的函数的中位运行时间为 50 毫秒,比具有中位数为 37 毫秒的优化 C 版本多出 36%的时间。
前面的练习说明了如何将代码分析作为工作流程的一部分,用于识别、优先排序和修复 R 程序中的性能问题。本书的其余部分将介绍我们可以用来解决特定性能问题的技术。
摘要
在本章中,我们学习了如何使用system.time()、benchmark()(来自rbenchmark包)和microbenchmark()(来自microbenchmark包)来测量 R 表达式的执行时间。我们探讨了如何使用Rprof()和summaryRprof()来分析 R 程序不同部分的执行时间和内存使用情况,并使用proftools包以直观的视觉形式显示结果。
我们还看到了操作系统提供的监控工具在理解 R 程序整体性能以及这些系统度量如何提供关于我们 R 程序可能遇到性能瓶颈的线索方面的作用。
最后,我们学习了如何在实践中应用配置文件技术,通过迭代工作流程来识别、优先排序和解决 R 代码中与性能相关的问题。
在下一章中,我们将学习一些简单的调整来提高 R 代码的运行速度。
第三章:简单调整以使 R 运行更快
提高 R 代码的速度并不一定涉及高级优化技术,如并行化代码或使其在数据库中运行。实际上,有一些简单的调整,虽然并不总是显而易见,但可以使 R 运行得更快。在本章中,描述了其中的一些调整。这些调整绝对不能涵盖所有可能的简单优化方法。然而,它们构成了获得一些速度提升的最基本、也最常遇到的机会。
本章按照递减的普遍性顺序介绍了这些调整——更普遍的调整几乎可以在所有 R 代码中找到,无论其应用如何。每个调整都伴随着一个示例代码,这些代码故意保持简单,以免不必要的应用特定知识掩盖了对预期概念的说明。在这些所有示例中,都是使用 R 中的随机函数生成人工数据集。
本章涵盖了以下主题:
-
向量化
-
使用内置函数
-
预分配内存
-
使用更简单的数据结构
-
在大型数据上使用哈希表进行频繁查找
-
在 CRAN 中寻找快速替代包
向量化
大多数 R 用户都应该遇到过这个第一个调整。本质上,向量化允许 R 运算符将向量作为参数进行快速处理多个值。这与 C、C++和 Java 等一些其他编程语言不同,在这些语言中,多个值的处理通常是通过遍历并应用运算符到向量(或数组)的每个元素来完成的。R 是一种灵活的语言,允许用户使用迭代或向量化进行编程。然而,大多数时候,迭代会带来显著且不必要的计算成本,因为 R 是一种解释型语言,而不是编译型语言。
以以下简单的代码为例。其目的是简单地计算随机向量data中每个元素的平方。第一种方法是设置一个for循环遍历data中的每个元素并单独平方。许多人可能会倾向于采取这种方法,因为这是在其他编程语言中通常的做法。然而,在 R 中,一个更优化的方法是直接在data向量上应用平方运算符。这会得到与for循环完全相同的结果,但速度要快得多:
N <- 1E5
data <- sample(1:30, size=N, replace=T)
system.time({ data_sq1 <- numeric(N)for(j in 1:N) {data_sq1[j] <- data[j]²}
})
## user system elapsed
## 0.144 0.011 0.156
system.time(data_sq2 <- data²)
## user system elapsed
## 0 0 0
以下表格显示了随着向量大小(以对数尺度)从 100,000 增加到 100,000,000 时的性能提升。请注意,非向量化方法的计算时间大约是向量化方法的 200 倍,无论向量大小如何。
| 向量大小 | 100,000 | 1,000,000 | 10,000,000 | 100,000,000 |
|---|---|---|---|---|
| 非向量化 | 120 ms | 1.19 s | 11.9 s | 117 s |
| 向量化 | 508 μs | 5.67 ms | 52.5 ms | 583 ms |
当 R 执行代码时,它需要在幕后执行许多步骤。一个例子是类型检查。R 对象,如向量,不需要严格定义为特定类型,如整数或字符。可以在整数向量中添加一个字符而不会触发任何错误——R 会自动将向量转换为字符向量。每次在向量上应用运算符时,R 只需要检查一次向量的类型,但使用迭代方法,这种类型检查会像迭代次数一样多次发生,这会产生一些计算成本。
内置函数的使用
作为一种编程语言,R 包含底层运算符,例如基本算术运算符,可以用来构建更复杂的运算符或函数。虽然 R 提供了定义函数的灵活性,但与编译语言中的等效函数相比,性能比较几乎总是偏向后者。然而,R 和一些 CRAN 软件包提供了一组丰富的函数,这些函数是用 C/C++ 等编译语言实现的。通常,使用这些函数而不是编写自定义 R 函数来完成相同任务更为可取。
考虑以下随机矩阵 data 行求和的简单示例。可以通过调用 apply() 函数并设置边距为 1(表示行操作)以及将 FUN(或函数)参数设置为 sum 来构建执行这些功能的代码。或者,R 提供了一个用于此目的的内置函数,称为 rowSums。通过 system.time 测量的前者方法的计算时间比后者方法长 11 倍,后者是一个优化并预编译的 C 函数:
data <- rnorm(1E4*1000)
dim(data) <- c(1E4,1000)
system.time(data_sum1 <- apply(data, 1, sum))
## user system elapsed
## 0.241 0.053 0.294
system.time(data_sum2 <- rowSums(data))
## user system elapsed
## 0.026 0.000 0.026
说到优化函数,我们提高 R 代码速度的努力不应仅限于 R 伴随的预编译函数。多年来,开源社区已经开发了一系列优化库,R 可以利用这些库。以基本线性代数子程序(BLAS)为例(更多信息请参阅 www.netlib.org/blas/)。它在 20 世纪 70 年代为 Fortran 开发,此后由于矩阵运算构成了许多领域算法的构建块,因此被其他语言(包括 R)广泛使用。现在有许多 BLAS 的实现,其中一些包括以多线程方式执行矩阵运算的能力。
例如,Mac OS X 版本的 R 默认启用了 BLAS。使用的 BLAS 实现是 R 中称为 libRblas.0.dylib 的参考 BLAS。然而,Mac OS X 自带其自己的 BLAS 版本,libBLAS.dylib,该版本针对其硬件进行了优化。R 可以配置为使用优化的 BLAS,通过在终端执行以下命令:
$ cd /Library/Frameworks/R.framework/Resources/lib
$ ln -sf /System/Library/Frameworks/Accelerate.framework/Frameworks/vecLib.framework/Versions/Current/libBLAS.dylib libRblas.dylib
为了测试使用不同 BLAS 库的效果,下面的 R 代码在一个大随机矩阵上执行简单的矩阵乘法。使用 R 的默认 BLAS 库,我们完成这项任务大约需要 7 秒钟。在将 R 指向优化的 BLAS 之后,同样的任务在约十分之一的时间内完成:
data <- rnorm(1E7)
dim(data) <- c(1E4, 1E3)
system.time(data_mul <- t(data) %*% data)
## user system elapsed
## 7.123 0.015 7.136
system.time(data_mul <- t(data) %*% data) # with optimized BLAS
## user system elapsed
## 1.304 0.005 0.726
可供您下载的 BLAS 版本有 Windows 和 Linux。如果 R 是用启用了 BLAS 编译的,即通过在从 R 的源代码编译 R 时设置配置选项为--enable-BLAS-shlib,那么在 BLAS 版本之间切换的方式与 Mac OS X 类似:通过用新的库文件替换默认的 BLAS 库文件。在 Windows 中,默认库位于R_HOME\bin\x64\Rblas.dll;而在 Linux 中,它位于R_HOME/lib/libRblas.so。
预分配内存
大多数强类型编程语言,如 C、C++和 Java,通常要求在向其应用任何操作之前声明一个向量(或数组)。这种声明实际上预分配了向量所需的内存空间。在某些特殊情况下会使用动态内存分配,但这很少是首选,主要是因为动态内存分配会减慢程序的速度。每次调整向量大小时,程序都需要执行额外的步骤,包括将向量复制到更大的或更小的内存块中,并删除旧向量。如果预分配了内存,这些步骤就不需要了。
当涉及到预分配内存时,R 与其他编程语言没有区别。然而,作为一个解释型语言,它施加的控制较少,因此用户很容易忽略这一点——如果向量的内存没有预分配,R 不会抛出任何编译错误。尽管如此,在 R 中没有预分配内存可能会导致执行时间显著变长,尤其是在向量很大时。
为了演示这一点,让我们看一下下面的 R 代码。它展示了两种生成一系列随机数的方法,其中每个向量元素定义为前一个元素的值加减一个介于-5 到 5 之间的随机整数。第一种方法(将结果存储在data_series1中)绕过了向量内存的预分配,即它从一个只有一个元素的向量开始,并在每次迭代中添加一个新元素。第二种方法(结果在data_series2中)通过声明一个大小为N的数值向量来预分配内存。预分配的空间,由向量的索引表示,在每次迭代中填充。通过预分配内存,一个包含 10,000 个元素的向量的计算时间比动态分配快 10 倍。通过改变向量大小进行的基准测试,如即将到来的表格所示,显示当预分配内存时,计算时间呈线性增长,而当动态分配内存时,增长呈超线性。因此,避免在 R 中不必要的动态内存分配对于性能至关重要:
N <- 1E4
data_series1 <- 1
system.time({for (j in 2:N) {data_series1 <- c(data_series1,data_series1[j-1]+sample(-5:5, size=1))}
})
## user system elapsed
## 0.254 0.004 0.257
data_series2 <- numeric(N)
data_series2[1] <- 1
system.time({for (j in 2:N) {data_series2[j] <- data_series2[j-1]+sample(-5:5, size=1)}
})
## user system elapsed
## 0.066 0.003 0.068
| 向量大小 | 10 | 100 | 1000 | 10,000 |
|---|---|---|---|---|
| 动态分配 | 0 | 0.006 | 0.288 | 25.373 |
| 预分配 | 0.001 | 0.006 | 0.062 | 0.577 |
在这一点上,比较 R 中的 apply 函数族与循环很有趣。大多数 R 用户都熟悉 apply() 函数及其变体,包括 lapply()、sapply() 和 tapply()。它们提供了对集合(例如 data.frame、list 或 vector/matrix)中的单个元素重复执行相同操作的手段。实际上,apply 函数族可以作为 R 中循环的可能替代品,前提是迭代之间没有依赖关系。除了简化表达式(通常可以将多行 for 循环表达为单行的 apply() 调用)之外,apply 函数族还提供了自动处理内存预分配和其他家务活动(如删除循环索引)的好处。
但 apply 是否比循环有性能优势?以下代码提供了一个答案。使用了两种不同的方法来生成一个大小在 1 到 30 之间随机设定的正态分布随机向量的列表。第一种方法使用 for 循环,而第二种使用 lapply()。对这两种方法应用 system.time() 显示,lapply() 比循环快得多:
N <- 1E5
data <- sample(1:30, size=N, replace=T)
data_rand1 <- list()
system.time(for(i in 1:N) data_rand1[[i]] <- rnorm(data[i]))
## user system elapsed
## 33.891 1.241 35.120
system.time(data_rand2 <- lapply(data, rnorm))
## user system elapsed
## 0.597 0.037 0.633
但请注意,for 循环是天真地实现的,没有预先分配内存。以下代码现在使用预分配的内存对其进行修改。其计算时间显著减少,比 lapply() 慢不到十分之一秒:
data_rand3 <- vector("list", N)
system.time(for(i in 1:N) data_rand3[[i]] <- rnorm(data[i]))
## user system elapsed
## 0.737 0.036 0.773
为了更有说服力地建立这一点,使用 microbenchmark() 重复进行了比较,每次表达式运行 100 次。结果表明,lapply() 比循环有轻微的性能优势:
microbenchmark(data_rand2 <- lapply(data, rnorm),for(i in 1:N) data_rand3[[i]] <- rnorm(data[i]))
## Unit: milliseconds
## expr min
## data_rand2 <- lapply(data, rnorm) 441.1108
## for (i in 1:N) data_rand3[[i]] <- rnorm(data[i]) 531.1212
## lq mean median uq max neval
## 459.9666 498.1296 477.4583 517.4329 634.7849 100
## 555.8512 603.7997 581.5236 662.2536 745.4247 100
基于此,在 R 中尽可能用 apply 替换 for 循环的一般观点是有效的,但性能提升可能不会非常显著。在第六章“减少 RAM 使用量的简单技巧”中,将讨论 apply 的另一个好处——它揭示了 R 代码中可以并行化的部分。
简单数据结构的使用
许多 R 用户会同意,data.frame 作为一种数据结构是 R 中数据分析的得力工具。它提供了一种直观的方式来表示典型的结构化数据集,其中行和列分别代表观测值和变量。data.frame 对象也比矩阵提供了更多的灵活性,因为它允许不同类型的变量(例如,单个 data.frame 中的字符和数值变量)。此外,在 data.frame 仅存储相同类型变量的情况下,基本的矩阵运算可以方便地应用于它,而无需任何显式的强制转换。然而,这种便利性可能会带来性能下降。
在data.frame上执行矩阵运算比在矩阵上慢。其中一个原因是大多数矩阵运算首先将data.frame强制转换为matrix,然后再进行计算。因此,在可能的情况下,应该使用matrix代替data.frame。下面的代码演示了这一点。目标是简单地在一个矩阵及其等效的data.frame表示上执行行求和。使用matrix表示比使用data.frame表示快约 3 倍:
data <- rnorm(1E4*1000)
dim(data) <- c(1E4,1000)
system.time(data_rs1 <- rowSums(data))
## user system elapsed
## 0.026 0.000 0.026
data_df <- data.frame(data)
system.time(data_rs2 <- rowSums(data_df))
## user system elapsed
## 0.060 0.015 0.076
然而,在许多 R 的案例中,使用data.frame是不可避免的,例如,当数据集包含混合变量类型时。在这种情况下,也有一个简单的调整可以改善data.frame上最常用的操作之一,即子集操作的速度。通常通过以下代码通过逻辑测试对data.frame的行(或列)进行条件化来实现子集操作:
data <- rnorm(1E5*1000)
dim(data) <- c(1E5,1000)
data_df <- data.frame(data)
system.time(data_df[data_df$X100>0 & data_df$X200<0,])
## user system elapsed
## 2.436 0.221 2.656
一种替代方法是使用which函数包装条件。如以下所示,速度显著提高:
system.time(data_df[which(data_df$X100>0 & data_df$X200<0),])
## user system elapsed
## 0.245 0.086 0.331
在大数据上频繁查找时使用哈希表
数据分析中的一个常见任务是数据查找,这通常通过 R 中的列表实现。例如,为了查找客户的年龄,我们可以定义一个列表,比如cust_age,将值设置为客户的年龄,将名称设置为相应的客户名称(或 ID),即names(cust_age) <- cust_name。在这种情况下,要查找 John Doe 的年龄,可以调用以下内容:cust_age[["John_Doe"]]。然而,R 中列表的实现并没有针对查找进行优化;在包含N个元素的列表上进行查找需要O(N)的时间复杂度。这意味着列表中索引较晚的值需要更多的时间来查找。随着N的增长,这种影响会变得更强。当程序需要频繁查找时,累积效应可能会非常显著。提供更优化数据查找的列表的替代方案是哈希表。在 R 中,这可以通过 CRAN 包hash获得。哈希表的查找需要O(1)的时间复杂度。
下面的代码演示了在哈希表中进行查找比在列表中查找的优势。它模拟了从随机列表及其等效哈希表表示中进行的 1,000 次查找。列表所需的总计算时间为 6.14 秒,而哈希表为 0.31 秒。一个权衡是生成哈希表比列表需要更多的时间。但对于需要在大数据上频繁查找的程序来说,这种开销可以忽略不计:
data <- rnorm(1E6)
data_ls <- as.list(data)
names(data_ls) <- paste("V", c(1:1E6), sep="")
index_rand <- sample(1:1E6, size=1000, replace=T)
index <- paste("V", index_rand, sep="")
list_comptime <- sapply(index, FUN=function(x){system.time(data_ls[[x]])[3]})
sum(list_comptime)
## [1] 6.144
library(hash)
data_h <- hash(names(data_ls), data)
hash_comptime <- sapply(index, FUN=function(x){system.time(data_h[[x]])[3]})
sum(hash_comptime)
## [1] 0.308
在 CRAN 中寻找快速替代包
R 的一个关键优势是其丰富且活跃的开源社区,CRAN。截至编写本文时,CRAN 上有超过 6,000 个 R 包。鉴于这一点,多个包提供相同的功能是很常见的。其中一些替代方案专门设计用来提高基础包或现有 CRAN 包的性能。其他替代方案虽然没有明确针对性能提升,但作为副产品实现了这一点。
为了实现性能提升而开发的替代快速包的例子是 fastcluster 包。它是为了通过 hclust 函数提高基础包提供的层次聚类速度而开发的。根据层次聚类过程中每次分支合并后距离矩阵的更新方式,其时间复杂度可能会有显著变化。fastcluster 包是使用优化的 C++ 代码开发的,与 hclust 中实现的例程相比,速度显著提高。以下 R 代码比较了两个函数在具有 10,000 行和 100 列的随机矩阵上的性能:
data <- rnorm(1E4*100)
dim(data) <- c(1E4,100)
dist_data <- dist(data)
system.time(hc_data <- hclust(dist_data))
## user system elapsed
## 3.488 0.200 4.081
library(fastcluster)
system.time(hc_data <- hclust(dist_data))
## user system elapsed
## 1.972 0.123 2.127
一个具有多个实现且其中一个实现比其他实现更快的函数的例子是主成分分析(PCA)。PCA 是一种降维技术,通过将数据集投影到正交轴(称为主成分)上以最大化数据集的方差来实现其目标。PCA 最常见的方法是通过数据集协方差矩阵的特征值分解。但还有其他方法。在 R 中,这些替代方法体现在两个 PCA 函数 prcomp 和 princomp 中(都是 stats 包的一部分)。以下代码在具有 100,000 行和 100 列的随机矩阵上的快速比较表明,princomp 比 prcomp 快近 2 倍:
data <- rnorm(1E5*100)
dim(data) <- c(1E5,100)
system.time(prcomp_data <- prcomp(data))
## user system elapsed
## 4.101 0.091 4.190
system.time(princomp_data <- princomp(data))
## user system elapsed
## 2.505 0.071 2.576
还有其他快速包的例子,既有明确的也有隐含的。它们包括:
-
fastmatch: 这提供了 R 的基础match函数的更快版本 -
RcppEigen: 这包括线性建模lm的更快版本 -
data.table: 这提供了比标准data.frame操作更快的数据处理操作 -
dplyr: 这提供了一套高效操作数据框对象(data frame-like objects)的工具
摘要
本章描述了一些简单的调整来提高 R 代码的速度。其中一些调整是众所周知的,但在实践中经常被忽视;其他一些则不那么明显。无论它们的性质如何,尽管它们很简单,这些低垂的果实可以提供显著的性能提升,有时甚至比后续章节中讨论的高级优化还要多。因此,这些调整应被视为优化 R 代码的第一步。
在下一章中,我们将看到如何通过使用编译代码来进一步提升 R 的性能。
第四章:使用编译型代码以获得更高的速度
到目前为止,我们已经探讨了如何优化 R 代码的计算性能。如果在优化代码之后,它仍然运行得太慢怎么办?在本章中,我们将探讨如何通过使用编译型代码来克服由 R 代码即时解释引起的性能限制。许多 CRAN 包使用编译型代码以提供最佳性能,因此利用编译型代码的一个简单方法就是使用这些包。然而,有时需要执行一个特定的任务,而这个任务没有现成的包。了解如何为 R 编写编译型代码,以便使 R 程序运行得更快是有用的。
我们将首先了解如何在执行之前编译 R 代码,然后我们将探讨如何将 C/C++等编译型语言集成到 R 中,以便我们可以以本地 CPU 速度运行 R 程序。
本章涵盖了以下主题:
-
在执行之前编译 R 代码
-
在 R 中使用编译型语言
在执行之前编译 R 代码
在第一章中,理解 R 的性能 – 为什么 R 程序有时运行得慢? 我们看到了 R 作为一个解释型语言,每次运行 R 程序时都必须解析和评估代码。这需要大量的 CPU 时间,并减慢了 R 程序的执行速度。R 提供了compiler包来在一定程度上减少这个问题。该包中的函数允许我们在执行代码之前预先编译 R 代码,并在执行代码时为 R 节省一步或两步。让我们看看这是如何工作的。
编译函数
让我们定义一个mov.avg()函数,该函数用于计算数值序列的移动平均:
# Compute the n-period moving average of x
mov.avg <- function(x, n=20) {total <- numeric(length(x) - n + 1)for (i in 1:n) {total <- total + x[i:(length(x) - n + i)]}total / n
}
给定一个数值向量x和周期n,我们首先计算x中n个元素的窗口总和。例如,如果x是[1, 2, 1, 3, 5]且n是2,那么我们计算total为[1+2, 2+1, 1+3, 3+5] = [3, 3, 4, 8]。我们通过在x上循环n次,选择x的元素移动窗口,并将这些元素加到total上来实现这一点。最后,我们通过将total除以n来计算移动平均。
为了编译函数,我们将使用compiler包中的cmpfun()函数。compiler包提供的编译函数在四个不同的优化级别上操作,编号为 0 到 3;数字越高,编译型代码对性能的优化程度就越高。
让我们编译不同级别的mov.avg()函数,以查看执行时间的差异。在这里,我们通过将optimize参数传递给cmpfun()函数,创建了四个在不同优化级别编译的mov.avg()函数副本:
library(compiler)
mov.avg.compiled0 <- cmpfun(mov.avg, options=list(optimize=0))
mov.avg.compiled1 <- cmpfun(mov.avg, options=list(optimize=1))
mov.avg.compiled2 <- cmpfun(mov.avg, options=list(optimize=2))
mov.avg.compiled3 <- cmpfun(mov.avg, options=list(optimize=3))
接下来,我们将通过计算包含100个元素的数值向量的 20 期移动平均,来基准测试原始的mov.avg()函数和四个编译版本的性能:
library(microbenchmark)
x <- runif(100)
bench <- microbenchmark(mov.avg(x),mov.avg.compiled0(x),mov.avg.compiled1(x),mov.avg.compiled2(x),mov.avg.compiled3(x))
bench
## Unit: microseconds
## expr min lq median uq max
## mov.avg(x) 34.257 37.6865 41.3630 72.3015 131.101
## mov.avg.compiled0(x) 33.500 36.9065 41.9995 72.8770 2605.917
## mov.avg.compiled1(x) 34.643 36.8615 41.0650 71.8480 117.632
## mov.avg.compiled2(x) 24.050 25.9040 28.3060 51.8685 3693.741
## mov.avg.compiled3(x) 23.399 24.6540 27.7670 49.6385 89.595
## neval
## 100
## 100
## 100
## 100
## 100
从中位执行时间来看,原始函数需要 41.4 μs。优化级别 0 和 1 的编译函数耗时大致相同,分别为 42.0 μs 和 41.1 μs。然而,优化级别 2 和 3 的表现良好,分别为 28.3 μs 和 27.8 μs。它们分别将执行时间减少了 32% 和 33%。
最小下四分位数和最大上四分位数的统计量显示与 mov.avg.compiled2() 和 mov.avg.compiled3() 执行时间少于 mov.avg()、mov.avg.compiled0() 和 mov.avg.compiled1() 相似。
注意
我们不应该依赖于最大统计量,因为它可能是不稳定的,每次运行 microbenchmark() 时都会产生广泛的价值范围。这是由于 R 的垃圾回收时的异常值,或者当函数的执行因与其他进程竞争 CPU 时间而减慢时。
下面的图表以直观的视觉形式显示了基准测试结果的分布。
小贴士
使用 autoplot() 函数生成基准测试结果的直观可视化。为此需要 ggplot2 包:
library(ggplot2)
autoplot(bench)

移动平均函数的微基准测试结果的自动绘图
当 R 代码编译时的性能提升取决于代码中包含的 R 表达式的类型。在我们的例子中,我们实现了适度的性能提升,因为 mov.avg() 函数中的 for 循环和算术运算可以被优化。然而,编译主要调用已经为性能优化过的其他函数(如 sum())的代码不会导致显著的性能提升。
小贴士
compiler 包提供了不同的函数来编译不同类型的 R 代码:
-
cmpfun()编译 R 函数。 -
compile()编译 R 表达式。 -
cmpfile()编译存储在文件中的 R 表达式。
R 代码的即时(JIT)编译
R 也支持即时(JIT)编译。当启用 JIT 编译时,R 将自动编译任何执行而未显式调用 compile 函数之一的代码。这是方便的,因为任何现有的 R 代码都可以享受代码编译的性能提升,而无需任何修改。
要激活 JIT 编译,请使用 compiler 包中的 enableJIT() 函数:
library(compiler)
enableJIT(level=3)
level 参数告诉 R 在执行前编译多少代码。level 的有效值包括:
-
0:它禁用 JIT。 -
1:它在首次使用之前编译函数。 -
2:此外,它还在函数被复制之前编译函数。这对于像 lattice 这样的存储函数在列表中的包很有用。 -
3:它在执行之前编译循环。
让我们用即时编译(JIT)来基准测试(未编译的)mov.avg() 函数:
microbenchmark(mov.avg(x))
## Unit: microseconds
## expr min lq median uq max neval
## mov.avg(x) 23.164 24.009 24.519 25.128 6097.067 100
JIT 编译将 mov.avg() 的中位执行时间从 41.4 μs 降低到 24.5 μs——提高了 41%!
小贴士
通过在启动 R 之前在操作系统中设置R_ENABLE_JIT环境变量,也可以启用 JIT 编译。R_ENABLE_JIT的值应设置为level参数的值。
在 R 中使用编译型语言
代码编译可以在计算性能上提供一定的提升,但这些提升是有限的,因为编译后的代码仍然需要以动态方式由 R 进行评估。例如,我们在第三章中解释了简化 R 运行速度的方法,说明了 R 作为一种动态类型的语言,在应用任何操作之前需要检查对象类型。在mov.avg()的情况下,每次 R 遇到+运算符时,都需要检查x是否为数值向量,因为它可能在for循环的每次迭代中都被修改。相比之下,一种静态类型的语言在编译时执行这些检查,从而实现更快的运行时性能。
由于 R 的动态特性以及许多其他原因,R 的动态特性对计算性能构成了障碍。突破这些障碍的唯一方法是转向 C 等编译型语言,并在 R 中使用它们。本节假设您对 C/C++等编译型语言有一些基本了解,包括指针和数组。
先决条件
为了编译本章中的示例,需要一套开发工具,包括 C/C++ 编译器。
Windows 用户应从cran.r-project.org/bin/windows/Rtools/下载并安装Rtools。请选择与您的 R 版本相对应的Rtools版本。在安装向导中,务必选择包作者安装和编辑系统 PATH选项。
在 Mac OS X 上,下载并安装 Xcode 命令行工具。如果您使用的是 Mac OS X 10.9 Mavericks 或更高版本,只需在终端中运行xcode-select –install即可。对于更早版本的 Mac OS X,请在 developer.apple.com/ 创建一个开发者账户。然后登录,转到 developer.apple.com/downloads/index.action 并搜索适用于您操作系统版本的 Xcode 命令行工具。
大多数 Linux 发行版都提供了一种简单的方法来安装标准开发工具;请查阅您发行版的文档以获取说明。如果您使用的是 Debian 或 Ubuntu,只需安装r-base-dev即可获得您需要的所有工具。
内联包含编译代码
inline CRAN 包允许我们在 R 中嵌入 C、C++、Objective-C、Objective-C++ 和 Fortran 代码。这对于使用少量编译代码加速小的 R 函数非常有用。
下面是一个使用inline包在 C 中实现mov.avg()的示例:
library(inline)
mov.avg.inline <- cfunction(sig=signature(x="numeric", n="integer"),body="/* Coerce arguments to the correct types needed.x needs to be a numeric vector (type REALSXP), and nneeds to be an integer vector (type INTSXP). */SEXP x2 = PROTECT(coerceVector(x, REALSXP));SEXP n2 = PROTECT(coerceVector(n, INTSXP));/* Create accessors to the actual data being pointed to bythe two SEXP's. */double *x_p = REAL(x2);int n_val = asInteger(n2);// Vector lengthsint x_len = length(x2);int res_len = x_len - n_val + 1;/* Create and initialize a numeric vector (type REALSXP)of length res_len, using allocVector().Since memory is allocated, use PROTECT to protect theobject from R's garbage collection. */SEXP res = PROTECT(allocVector(REALSXP, res_len));double *res_p = REAL(res);for (int i = 0; i < res_len; i++) {res_p[i] = 0;}// Compute window sumfor (int j = 0; j < n_val; j++) {for (int k = 0; k < res_len; k++) {res_p[k] += x_p[j + k];}}// Compute moving averagefor (int l = 0; l < res_len; l++) {res_p[l] /= n_val;}// Unprotect allocated memory and return resultsUNPROTECT(3);return res;',language="C")
我们加载 inline 包,并使用 cfunction() 定义 mov.avg.inline()。cfunction() 接受许多参数(更多细节请查阅文档),但这里我们只需要三个:
-
sig=signature(x="numeric", n="integer"): 这定义了函数的签名。在这种情况下,它将看起来像mov.avg.inline(x, n),其中x具有数值类,而n具有整数字类。 -
body:body参数包含函数主体的代码,使用你选择的编程语言。 -
language="C": 这指定了代码主体中的编程语言。有效的值有C,C++,Fortran,F95,ObjectiveC, 和ObjectiveC++。
函数的第一步是通过调用 coerceVector() 确保提供给函数的参数是正确的类型。这个函数返回一个 SEXP(s 表达式)指针,这是在 C 和 C++ 中表示所有 R 对象的方式。这些指针指向存储数据的 SEXPREC(s 表达式)记录结构,并包含一些头部信息。代码的前两行定义了两个新的 SEXP 变量 x2 和 n2,它们存储了通过强制转换函数参数创建的新 R 对象的指针。
因为 coerceVector() 在内存中创建新的数据结构来存储指定类型的数据,所以我们用宏函数 PROTECT() 包裹对 coerceVector() 的调用,以保护新创建的数据结构免受 R 的垃圾回收机制的影响。这是必要的,因为 R 不会知道在 C 中何时变量不再需要,并且可能会过于积极地为仍然需要的对象释放内存。每次为新的 R 对象分配内存时都需要调用 PROTECT()。
现在 x2 和 n2 包含了 R 对象的 SEXP 指针,这些对象代表了强制转换后的参数。因为 x2 和 n2 指向 SEXPREC 结构,所以我们仍然无法直接访问存储数据的 C 数组。有几种方法可以获取对数据的访问权限。在 double *x_p = REAL(x2); 中,REAL() 宏返回一个指向双精度浮点数组第一个元素的 double* 指针。int n_val = asInteger(n2); 声明通过调用 asInteger() 便利函数以不同的方式返回由 n2 指向的数组中的第一个整数值。注意这里的区别;x_p 是指向双精度数组的指针,而 n_val 是包含参数实际值的整数。根据哪种方式更方便,可以使用这两种方式访问 R 数据。
接下来,我们计算将要存储结果的数值向量的长度 res_len,并使用 allocVector() 创建该向量。同样,这也被 PROTECT() 包裹起来,因为正在为新的对象分配内存。这个表达式的结果是指向新 R 数值向量的 SEXP。REAL(res) 提供了对之前提到的底层 C 双精度浮点数组的访问。
下一个嵌套的for循环对n_val周期内的窗口和进行计算。然后,通过将结果数组中的每个元素除以n_val来计算移动平均。
在返回结果之前,需要进行一些清理工作。UNPROTECT(3)告诉 R,为 C 中分配内存的三个对象不再需要垃圾收集保护。UNPROTECT()的参数必须与函数中PROTECT()调用的次数相匹配。在这种情况下,垃圾收集器可能会释放x2和n2的内存。然而,res对象被返回到 R,其中适用正常的垃圾收集机制。
让我们通过调用原始的mov.avg()函数和mov.avg.inline(),并确保值匹配来进行一个简单的测试,以确保我们的代码工作正常:
x <- runif(100)
all(mov.avg(x, 20) == mov.avg.inline(x, 20))
## [1] TRUE
C 代码将比原始未编译的 R 函数快多少?这如下所示:
microbenchmark(mov.avg(x, 20), mov.avg.inline(x, 20))
## Unit: microseconds
## expr min lq median uq max
## mov.avg(x, 20) 32.909 34.113 34.8240 35.6975 130.155
## mov.avg.inline(x, 20) 1.347 1.423 1.5535 1.7015 14.169
## neval
## 100
## 100
C 代码的平均执行时间仅为 1.55 μs,而 R 代码为 34.8 μs——执行时间减少了 96!即使是 C 代码的最大执行时间(14.2 μs)也小于 R 代码的最小执行时间(32.9 μs)的一半。这些微秒的节省可能看起来不多,但当我们需要处理更大的数据集或计算较大周期的移动平均时,这种差异变得非常显著:
y <- runif(1e7)
microbenchmark(mov.avg(y, 20), mov.avg.inline(y, 20))
## Unit: milliseconds
## expr min lq median uq
## mov.avg(y, 20) 2046.4608 2198.6103 2252.7003 2318.721
## mov.avg.inline(y, 20) 272.8686 280.2837 283.3647 292.587
## max neval
## 3606.3911 100
## 374.0193 100
当数据包含 1000 万个数字时,差异更为明显;在 R 中超过 2 秒,而在 C 中仅为 0.28 秒。在某些商业环境中,每一毫秒都很重要,2 秒的延迟是不可接受的。在这种情况下,将关键的数据处理代码编写为 C 或 Fortran 等编译语言,并通过inline将其嵌入到 R 中,将极大地提高计算性能。
调用外部编译代码
我们已经看到了如何使用编译语言在 R 中定义函数。当我们想要使用编译代码实现更复杂的功能时,例如创建整个 R 包或链接到外部库,从外部开发代码并从 R 中调用它可能更容易。
R 提供了一些接口来调用外部编译代码:
-
.C(): 这将调用最多 65 个参数的 C 或 C++代码。在调用 C 函数之前,必须在 R 中进行类型检查和强制转换。由.C()调用的函数不应返回任何值;相反,当函数被调用时,应该将数据结构中存储的结果提供给函数。例如,如果我们使用.C()接口实现mov.avg(),函数调用可能看起来像.C("mov_avg_C", as.numeric(x), as.integer(n), numeric(length(x) - n + 1))。 -
.Fortran(): 这与.C()类似,但它调用 Fortran 代码。 -
.Call():这也调用 C 或 C++代码,最多 65 个参数。类型检查和强制转换可以在 R 或 C/C++(如mov.avg.inline()示例中)中进行。由.Call()调用的函数可以返回 R 对象。如果需要多个返回值,可以返回 R 列表。例如,ma <- .Call("mov_avg_C", x, n)。 -
.External():这与.Call()类似,除了所有参数都通过单个SEXP传递。因此,使用.External()调用的函数可以接受可变数量的参数和几乎无限数量的参数。
inline包提供的函数实际上是某些低级接口的包装器,这使得开发人员更容易将编译代码嵌入 R 中。
本书范围之外,无法详细解释如何使用这些接口。要了解更多信息,请阅读《编写 R 扩展》手册中的“系统与外语接口”和“R API”部分(更多信息请访问cran.r-project.org/doc/manuals/r-release/R-exts.html)。
注意
对于 Java 程序员,CRAN 上的rJava包提供了对 Java 代码的接口。
相反,我们想介绍Rcpp包,它提供了一个方便的、高级的 API 来访问.Call()接口,用于 C++代码。以下是使用Rcpp实现的移动平均函数:
#include <Rcpp.h>// [[Rcpp::export]]
Rcpp::NumericVector mov_avg_Rcpp(Rcpp::NumericVector x,int n=20) {// Vector lengthsint x_len = x.size();int res_len = x_len - n + 1;// Create and initialize vector for resultsRcpp::NumericVector res(res_len);// Compute window sumfor (int j = 0; j < n; j++) {for (int k = 0; k < res_len; k++) {res[k] += x[j + k];}}// Compute moving averagefor (int l = 0; l < res_len; l++) {res[l] /= n;}// Return resultsreturn res;
}
第一行#include <Rcpp.h>导入使用Rcpp类和函数所需的头文件。注释// [[Rcpp::export]]是Rcpp属性。它告诉Rcpp以下函数应该导出到 R。
在mov_avg_Rcpp()中不使用SEXP指针。相反,Rcpp提供了表示标准 R 类的类。我们甚至可以指定n是一个单个整数而不是整数向量。每当从 R 调用mov_avg_Rcpp()时,Rcpp将自动检查提供的参数是否为正确的类型。
注意,这里没有调用PROTECT()或UNPROTECT()。当Rcpp::NumericVector res(res_len);创建一个新的结果数值向量时,Rcpp负责内存分配和防止垃圾回收。它甚至将新向量的值初始化为零。
Rcpp还提供了直接访问x参数中的数据和结果向量res的功能,而无需请求数据的指针。
使用Rcpp,我们可以编写比使用本地的.C()或.Call()接口更简洁、更易读的代码。
现在,让我们看看如何在 R 中调用此函数。除了加载Rcpp库之外,还需要做的一件事是调用sourceCpp(),这将编译 C++代码并将函数导出到 R:
library(Rcpp)
sourceCpp('mov_avg_Rcpp.cpp")
现在,我们可以调用mov_avg_Rcpp()并对其进行基准测试,与之前的版本进行比较:
x <- runif(100)
microbenchmark(mov.avg(x, 20),mov.avg.inline(x, 20),mov_avg_Rcpp(x, 20))
## Unit: microseconds
## expr min lq median uq max
## mov.avg(x, 20) 33.902 35.779 37.472 49.7340 101.325
## mov.avg.inline(x, 20) 1.327 1.513 1.718 1.9655 14.129
## mov_avg_Rcpp(x, 20) 2.382 2.727 2.874 3.9705 11.424
## neval
## 100
## 100
## 100
Rcpp 版本比内联版本运行得稍慢,但仍然比我们的纯 R 代码快得多。它提供了一个良好的性能水平,API 比其他 R 提供的接口简单得多。
Rcpp 提供了许多本书无法涵盖的功能,例如包作者工具、常见操作(如向量操作)的 sugar 函数等。更多详情、代码示例和资源,请查阅 Rcpp 网站 www.rcpp.org/。Rcpp 的创作者之一,Dirk Eddelbuettel,还撰写了 Seamless R and C++ Integration with Rcpp (use R!) 一书,提供了全面的指南。
使用编译代码时,有一些注意事项需要记住。我们将在下面解释一些常见的;Writing R Extensions 手册对这些主题提供了全面的处理。
在 R 中使用编译代码时,有一些事情需要记住。我们将在下面解释一些常见的;Writing R Extensions 手册对这些主题提供了全面的处理。
R API
到目前为止使用的 C 函数和宏来自头文件 Rinternals.h,该文件位于 R_INCLUDE_DIR,在标准 R 安装中默认为 R_HOME/include。此文件,连同 R.h 和 R_INCLUDE_DIR 中的其他头文件,为 C/C++ 代码提供了与 R 交互的各种 API。它们一起提供了一组丰富的函数,用于:
-
操作 R 对象(例如,排序向量)
-
管理内存分配和释放
-
数学(例如,三角函数)
-
数学常数
-
随机数生成
-
统计分布(例如,
rnorm和punif) -
BLAS、LAPACK 和 LINPACK 线性代数例程
-
以及更多
值得探索这些文件,以了解哪些功能可用于 C/C++ 代码。其中一些也可以从 Fortran 调用。Writing R Extensions 中的 Organization of header files 部分描述了每个头文件。
R 数据类型与本地数据类型
在使用编译语言工作时,了解 R 类型如何映射到不同的本地数据类型是有用的,如下表所示:
| R 存储模式 | C 类型 | Fortran 类型 |
|---|---|---|
logical |
int * |
INTEGER |
integer |
int * |
INTEGER |
double |
double * |
DOUBLE PRECISION |
complex |
Rcomplex * |
DOUBLE COMPLEX |
character |
char ** |
CHARACTER*255 |
raw |
unsigned char * |
none |
在处理 C/C++ 中的 SEXP 指针或在 Rcpp 中的类型类时,以下是最常用的类型(查阅 R 或 Rcpp 的文档以获取完整列表):
| R 类型 | SEXP 类型 | Rcpp 类型 |
|---|---|---|
numeric |
REALSXP |
NumericVector / NumericMatrix |
integer |
INTSXP |
IntegerVector / IntegerMatrix |
complex |
CPLXSXP |
ComplexVector / ComplexMatrix |
logical |
LGLSXP |
LogicalVector / LogicalMatrix |
character |
STRSXP |
CharacterVector / CharacterMatrix |
list |
VECSXP |
List |
data.frame |
none |
DataFrame |
创建 R 对象和垃圾回收
我们已经看到,可以通过调用allocVector()和coerceVector()来创建 R 对象并为它们分配内存。Rinternals.h定义了其他内存分配函数,例如allocList()和allocArray()。对alloc*()函数或coerceVector()的任何调用都需要用PROTECT()包装。
在mov.avg.inline()示例中,UNPROTECT()用于在返回结果之前立即移除垃圾收集保护。UNPROTECT()也可以在任何函数点调用,以允许垃圾收集器释放不再需要的 R 对象。保护机制是基于堆栈的,因此UNPROTECT(n)移除了最后被保护的n个对象的保护。或者,可以使用UNPROTECT_PTR(p)来解除SEXP p指向的特定对象的保护,即使它不在堆栈的顶部。
在创建许多 R 对象的复杂 C/C++代码中,一旦它们不再需要,就解除保护它们是一种良好的实践,这样垃圾收集器就可以高效地工作。然而,确保那些未受保护的对象在代码中不再被使用是程序员的职责,以防止任何内存错误。
最后,始终记得将PROTECT()调用的数量与UNPROTECT()或UNPROTECT_PTR()未解除保护的对象总数相匹配。
为非 R 对象分配内存
有时,需要内存来存储中间计算的结果,这些结果不需要从 R 中访问。R 提供了两种在 C/C++中分配这种内存的方法。
第一种方法,瞬态存储分配,允许您分配在.C()、.Call()或.External()调用结束时由 R 自动回收的内存。为此,使用char *R_alloc(size_t n, int size)函数,该函数分配每个size字节的n个单元,并返回指向分配内存的指针。典型的用法可能如下所示:
int *x = (int *) R_alloc(100, sizeof(int));
在 C/C++函数内不需要释放分配的内存,因为当函数执行结束时,R 会负责处理。
用户控制的内存机制提供了对内存分配和释放的更多控制。这允许在 C/C++代码的不同部分之间释放内存。例如,在一个迭代算法中,每个计算阶段都会产生大量中间数据,可以释放之前迭代的内存,以确保有足够的空闲内存来完成未来的迭代。此接口中有三个函数:
-
type* Calloc(size_t n, type): 这会分配指定大小和类型的内存 -
type* Realloc(any *p, size_t n, type): 这会将*p处分配的内存的大小更改为指定的大小和类型 -
void Free(any *p): 这会释放*p处的内存
这些函数类似于 C 函数calloc()、realloc()和free(),但 R 增加了额外的错误处理。如果它们返回,则表示内存已成功分配或释放。
摘要
在本章中,我们介绍了一系列技术,用于在编译型语言中利用代码以实现接近原生 CPU 的性能。虽然示例主要集中在 C 和 C++上,但类似的策略也可以用于 Fortran 或 Java。
我们首先看到,通过使用compile包在执行 R 代码之前对其进行编译,可以提供适度的性能提升,特别是对于包含许多循环和基本操作的代码。JIT 编译对执行任何 R 代码也自动执行相同的操作。然而,由于 R 在核心上是动态语言,R 代码的优化程度是有限的。
超越 R 语言,我们使用了 C 和 C++来实现显著的性能提升。我们学习了如何在 R 中使用inline包定义 C 函数,以及如何使用Rcpp从 R 中调用外部 C++函数。
在这个过程中,我们了解了 R 如何使用SEXP指针和SEXPREC记录结构在 C/C++中表示不同类型的数据,以及如何使用这些结构来操作 R 对象。我们还学习了在 C/C++中工作时的内存分配、释放和垃圾回收的复杂性。
最后,我们简要介绍了 R API,这些 API 可以从 C、C++或 Fortran 中提供丰富的 R 功能。
本章介绍了在 R 中使用编译型语言实现优化计算性能的高级技术。这些技术允许 R 程序员利用编译型语言的力量和速度,同时享受 R 作为数据处理环境的简单性和灵活性。使用编译型语言带来的巨大性能提升伴随着同样巨大的责任,即详细了解这些技术的工作原理,以确保它们可以安全有效地使用。关于这个主题,可以写出整本书;我们鼓励您查阅其他资源,包括编写 R 扩展手册,以获得更深入和更全面的处理。
在下一章中,我们将探讨如何利用图形处理单元(GPU)的计算能力来处理某些类型的计算。
第五章:使用 GPU 使 R 运行更快
在本章中,我们将探讨另一种方法来加速 R 代码的执行,这种方法通常未被充分利用,尽管它是大多数计算机的一部分——图形处理单元(GPU),也称为显卡。当我们想到 GPU 时,我们常常想到它能够产生的惊人图形。实际上,GPU 由具有高度并行处理能力的科技驱动,这些科技类似于世界上顶级的超级计算机。在过去,使用 GPU 进行编程非常困难。然而,在过去的几年里,随着 CUDA 和 OpenCL 等 GPU 编程平台的推出,编程 GPU 对许多程序员来说变得容易多了。更好的是,R 社区为 R 用户开发了一些包,以便利用 GPU 的计算能力。
要运行本章中的示例,您需要一个具有 CUDA 功能的 NVIDIA GPU。
本章涵盖:
-
GPU 上的通用计算
-
R 和 GPU
-
使用
gputools在 R 中进行快速统计建模
GPU 上的通用计算
从历史上看,GPU 是设计和用于渲染高分辨率图形的,例如用于视频游戏。为了能够每秒渲染数百万个像素,GPU 利用高度并行的架构,专门用于渲染图形所需的计算类型。从高层次来看,GPU 的架构类似于 CPU 的架构——它有自己的多核处理器和内存。然而,由于 GPU 不是为通用计算设计的,与 CPU 相比,单个核心要简单得多,时钟速度较慢,对复杂指令的支持有限。此外,它们通常比 CPU 少有 RAM。为了实现实时渲染,大多数 GPU 计算都是以高度并行的方式进行的,拥有比 CPU 多得多的核心——现代 GPU 可能拥有超过 2,000 个核心。鉴于一个核心可以运行多个线程,在 GPU 上可以运行数万个并行线程。
在 20 世纪 90 年代,程序员开始意识到,某些图形渲染之外的计算可以从 GPU 的高度并行架构中受益。还记得 R 中 第三章 中向量操作的尴尬并行性吗,通过简单调整使 R 运行更快;想象一下,如果它们由数千个 GPU 核心同时完成,会有多大的加速潜力。这种认识催生了 GPU 上的通用计算(GPGPU)。
但编程 GPU 具有挑战性。使用由 DirectX 和 OpenGL 等标准提供的低级接口,程序员必须欺骗 GPU 以计算数字,就像它们在渲染图形一样。意识到这一挑战,人们开始努力开发适合 GPGPU 的适当编程语言和支持架构。这些努力的成果是两种称为 CUDA 和 OpenCL 的技术,分别由 NVIDIA 和 Apple 开发,现在由 Khronos 维护。虽然 CUDA 是专有的,并且仅在 NVIDIA GPU 上工作,但 OpenCL 是品牌无关的,甚至能够支持其他加速器,如现场可编程门阵列(FPGAs)。
R 和 GPU
R 社区已经开发了一些包,供 R 程序员利用 GPU。R 的矢量化特性使得使用 GPU 成为一种自然的选择。这里列出了用于 GPU 编程的一些 R 包:
-
gputools: 这提供了围绕基于 GPU 的算法的 R 函数,用于常见操作,如线性模型和矩阵代数。它需要 CUDA,因此需要一个 NVIDIA GPU。 -
gmatrix: 这提供了gmatrix和gvector类,分别用于在 NVIDIA GPU 上表示矩阵和向量。它还提供了用于常见矩阵操作(如矩阵代数、随机数生成和排序)的函数。 -
RCUDA: 这为从 R 会话中加载和调用 CUDA 内核提供了一个低级接口。使用 RCUDA 需要很好地理解 CUDA 语言,但允许更多的灵活性和代码优化。更多关于它的信息可以在www.omegahat.org/RCUDA/找到。 -
OpenCL: 这与 RCUDA 类似,但与 OpenCL 接口。它适用于拥有非 NVIDIA GPU(如 ATI、Intel 或 AMD GPU)的用户。
其他 CRAN 包可用于 GPU 上的更多专用功能,例如线性回归。有关这些包的列表,请参阅由 Dirk Eddelbuettel 在 CRAN 网站上维护的CRAN 任务视图:使用 R 进行高性能和并行计算的 GPU 部分,网址为cran.r-project.org/web/views/HighPerformanceComputing.html。
在本章中,我们将仅关注gputools,并使用该包的一些示例来说明 GPU 如何加快 R 中的计算速度。
安装 gputools
安装gputools的步骤如下:
-
确保您的计算机具有 CUDA 功能的 GPU 卡。有关 CUDA 功能 GPU 卡的列表,请参阅
developer.nvidia.com/cuda-gpus。 -
从
developer.nvidia.com/cuda-downloads下载并安装 CUDA 工具包。 -
根据指定的
gputools安装说明设置一些环境变量cran.r-project.org/web/packages/gputools/INSTALL。 -
打开一个 R 会话并运行
install.packages("gputools")。
如果你没有具有 CUDA 功能的 NVIDIA GPU,亚马逊网络服务(AWS)提供名为 g2.2xlarge 的 GPU 实例,在撰写本文时,这些实例配备了 NVIDIA GRID K520 GPU,具有 1,536 个 CUDA 核心和 4 GB 的视频内存。你可以使用这些实例以及 NVIDIA 提供的预装了 CUDA 工具包和驱动程序的 亚马逊机器镜像(AMIs)。Windows 和 Linux AMIs 都可在 aws.amazon.com/marketplace/seller-profile/ref=sp_mpg_product_vendor?ie=UTF8&id=c568fe05-e33b-411c-b0ab-047218431da9 找到。对于本章,我们使用了 Linux AMI 版本 2014.03.2。
使用 gputools 在 R 中进行快速统计建模
gputools 提供了一种在 GPU 上执行统计函数的便捷方式,无需 CUDA 编程。所有繁重的工作,包括将数据从 RAM 复制到 GPU 内存以及设置要使用的核心数,都已封装在函数中(实际上,gputools 依赖于封装良好的 CUBLAS 库,该库为 GPU 提供线性代数函数)。例如,要在 CPU 上对 mtcars 数据集执行线性建模,我们使用 lm() 函数:lm(mpg~cyl+disp+hp, data=mtcars)。要在 GPU 上运行它,我们调用 gputools 中的 gpuLm() 函数:gpuLm(mpg~cyl+disp+hp, data=mtcars)。gpuLm() 的输出格式与 lm() 相同。
为了展示我们可以从 GPU 获得的加速效果,我们将对具有 100 个变量的随机数据集计算肯德尔相关系数。我们将使用从 100、200、… 到 500 个记录的不同观察数,以便观察与 CPU 版本相比的加速效果。代码如下:
library(gputools)
A <- lapply(c(1:5), function(x) {matrix(rnorm((x*1e2) * 1e2), 1e2, (x*1e2))})
cpu_k_time <- sapply(A, function(x) {system.time(cor(x=x, method="kendall"))[[3]]})
gpu_k_time <- sapply(A, function(x) {system.time(gpuCor(x=x, method="kendall"))[[3]]})
K <- data.frame(cpu=cpu_k_time, gpu=gpu_k_time)
我们在 AWS 上的 NVIDIA GRID K520 GPU 上测试了此代码;你获得的效果取决于你的 GPU。计算时间在下面的图中进行了展示。我们看到,cor() 相关函数的 CPU 版本随着记录数的增加呈超线性增长。另一方面,GPU 版本随着记录数的增加,计算时间几乎没有增加,这从几乎平坦的红色线条中可以看出。

GPU 与 CPU 计算肯德尔相关系数的计算时间
接下来,我们将对gputools中提供的其他几个函数进行计时比较:线性模型(gpuLm())、广义线性模型(gpuGlm())、距离矩阵计算(gpuDist())和矩阵乘法(gpuMatMult())。这些测试所使用的数据集有 1,000 个观测值和 1,000 个变量,除了gpuLm,它使用的是包含 10,000 个观测值和 1,000 个变量的数据集。使用microbenchmark()函数来比较这些算法的 CPU 和 GPU 版本的执行时间:
library(microbenchmark)
A <- matrix(rnorm(1e7), 1e4, 1e3)
A_df <- data.frame(A)
A_df$label <- rnorm(1e4)
microbenchmark(lm(label~., data=A_df),gpuLm(label~., data=A_df), times=30L)
## Unit: seconds
## expr min lq median uq
## lm(formu,...) 18.153458 18.228136 18.264231 18.274046
## gpuLm(formu,...) 9.310136 9.424152 9.467559 9.507548
## max
## 18.32938
## 10.25019A <- matrix(rnorm(1e6), 1e3, 1e3)
A_df <- data.frame(A)
A_df$label <- rbinom(1e3, size=1, prob=0.5)
microbenchmark(glm(label~., data=A_df, family=binomial), gpuGlm(label~., data=A_df, family=binomial),times=30L)
## Unit: seconds
## expr min lq median uq max
## glm(formu,...) 23.64777 23.68571 23.73135 23.82055 24.07102
## gpuGlm(formu,...) 15.14166 15.30302 15.42091 15.50876 15.71143 microbenchmark(dist(A), gpuDist(A), times=30L)
## Unit: milliseconds
## expr min lq median uq max
## dist(A) 11113.4842 11141.2138 11167.81 11194.852 11287.2603
## gpuDist(A) 191.1447 203.6862 222.79 229.408 239.9834 B <- matrix(rnorm(1E6), 1E3, 1E3)
microbenchmark(A%*%B, gpuMatMult(A, B), times=30L)
## Unit: milliseconds
## expr min lq median uq
## A%*%B 921.68863 934.64234 945.74926 955.33485
## gpuMatMult(A,B) 33.28274 33.59875 33.70138 37.35431
## max
## 1029.75887
## 38.29123
测试结果展示了在 R 中使用 GPU 计算的力量。然而,就像任何其他并行程序一样,并不是所有函数在 GPU 上执行时都会获得更快的性能。例如,当运行皮尔逊相关性的比较(通过将method参数从kendall改为pearson)时,如图所示,GPU 的性能比 CPU 慢。由于肯德尔相关性的额外排序操作,它已知比皮尔逊相关性计算量更大(我们的基准测试显示计算肯德尔相关性比计算皮尔逊相关性慢数百倍)。然而,似乎这种肯德尔相关性算法的实现非常适合 GPU 的高度并行架构,从而导致了我们在本章第一个例子中看到的性能提升。另一方面,计算皮尔逊相关性的算法在从 CPU 切换到 GPU 时表现不佳,这表明它不适合 GPU 的架构。没有研究底层 CUDA 代码和 GPU 架构的细节,很难确切指出两种算法性能差异的确切原因。在决定为特定任务使用 GPU 之前,最好是像我们在这里所做的那样,对 GPU 与 CPU 的相对性能进行基准测试:

在 GPU 与 CPU 中计算皮尔逊相关性的计算时间
通常,以下因素会影响 GPU 的性能:
-
GPU 最适合处理数据并行问题(参见第八章 Chapter 8,“通过并行计算提高性能”,以了解数据并行的定义)。它们不适合需要大量线程间同步的任务。
-
GPU 的性能取决于主内存(RAM)和 GPU 内存之间传输的数据量,因为 RAM 和 GPU 内存之间的连接带宽较低。良好的 GPU 编程应该尽量减少这种数据传输。
解决这些因素需要使用 RCUDA 或 OpenCL 提供的低级 GPU 接口进行编程。其他努力正在被做出以最小化程序员优化 CUDA 或 OpenCL 代码所需的工作量。例如,为了解决 RAM-GPU 内存瓶颈,AMD 发布了一款将 RAM 和 GPU 内存结合在一张卡上的 GPU。
摘要
在本章中,我们学习了如何通过利用 GPU 来加速 R 中的某些计算。鉴于今天的大多数计算机都配备了 GPU,这为提高 R 程序的性能提供了一个快速的机会。这对于与 GPU 接口的 R 包数量不断增长的情况尤其如此。其中一些,如gputools,甚至不需要了解 CUDA 或 OpenCL。GPU 并不保证所有任务都能提高性能。
在下一章中,我们将关注解决 R 程序中的 RAM 相关问题。
第六章。减少 RAM 使用量的简单调整
到目前为止,我们已经学习了克服 CPU 限制并提高 R 程序速度的技术。如您从 第一章 回忆的那样,理解 R 的性能 – 为什么 R 程序有时运行缓慢? 另一个 R 的关键约束是内存。R 程序执行任务所需的所有数据都必须加载到计算机的内存或 RAM 中。RAM 也用于任何中间计算,因此处理给定数据集所需的 RAM 量可能是数据集大小的多倍,这取决于正在执行的任务或算法的类型。当需要处理大型数据集或可用 RAM 很少时,这可能会成为一个问题。
在本章和下一章中,我们将学习如何优化 R 程序的 RAM 利用率,以便成功执行内存密集型任务。
本章涵盖了:
-
重复使用对象而不占用更多内存
-
当不再需要时删除中间数据
-
在线计算值而不是持久存储它们
-
交换活跃和非活跃数据
重复使用对象而不占用更多内存
第一个调整利用了 R 如何使用 copy-on-modification 模型来管理对象的内存。在这个模型中,当创建一个对象 x 的副本时,例如使用 y <- x,实际上在内存中并没有进行复制。相反,新变量 y 简单地指向包含 x 的同一块内存。当 y 第一次被修改时,R 会将数据复制到一个新的内存块中,这样 x 和 y 就有了它们自己的数据副本。这就是为什么这种内存管理模型被称为 copy-on-modification。这意味着有时可以从现有对象中创建新对象,而无需占用额外的内存。为了识别潜在的内存瓶颈并管理 R 程序的内存利用率,了解 R 在何时复制数据以及何时不复制数据是有帮助的。
以以下代码为例,它生成一个包含 100 万个元素的数值向量 x 并创建一个包含 x 两个副本的列表 y。我们可以使用 object.size() 函数检查对象的大小:
x <- runif(1e6)
print(object.size(x), units = "auto")
## 7.6 Mb
y <- list(x, x)
print(object.size(y), units = "auto")
## 15.3 Mb
乍一看,似乎有两个对象:x,它占用了 7.6 MB 的内存,而 y,它占用了 15.3 MB。然而,内存利用率可以通过不同的方式来衡量,并且结果令人惊讶:
library(pryr)
object_size(x)
## 8 MB
object_size(y)
## 8 MB
来自 CRAN 包 pryr 的 object_size() 函数比基础 R 的 object.size() 函数稍微不同且更准确地测量了 x 和 y 的大小。它报告说 y,包含两个数值向量,只占用了 8 MB 的内存——与 x 相同,x 是一个长度相同的单个数值向量。这怎么可能呢?来自 pryr 包的 address() 函数揭示了每个对象指向的实际内存块:
address(x)
## [1] "0x10f992000"
address(y)
## [1] "0x7ff18b30e478"
address(y[[1]])
## [1] "0x10f992000"
address(y[[2]])
## [1] "0x10f992000"
如预期,列表 y 指向的内存位置与数值向量 x 不同,表明它是一个不同的对象。但是,y 的两个元素在内存中指向原始对象 x。R 在不必要的情况下不会复制对象。在这种情况下,它只是在 y 中创建了两个指向 x 的指针。这种方式非常高效,实际上,x 和 y 合并在一起只占用 8 MB,这是 x 的大小!
object_size(x, y)
## 8 MB
注意
实际上,需要一点额外的内存来存储 y 和它对 x 的指针,但这可以忽略不计,并且不会出现在这个测量中。
当 y 中的某个向量被修改时,R 会创建一个新的副本,因为这个向量现在与 x 不同:
y[[1]][1] <- 0
address(x)
## [1] "0x10f992000"
address(y[[1]])
## [1] "0x110134000"
address(y[[2]])
## [1] "0x10f992000"
object_size(y)
## 16 MB
object_size(x, y)
## 16 MB
y[[1]] 向量现在指向内存中与 x 和 y[[2]] 不同的向量。因此,y 占用 16 MB 的 RAM,而 x 和 y 合并在一起仍然只占用 16 MB(因为 y[[2]] 仍然指向 x)。另一种跟踪这个情况的方法是在对象被复制到使用 tracemem() 时,它会随时给出警告,表明正在跟踪的对象被复制。看看当 y[[2]] 被修改时会发生什么:
tracemem(y[[2]])
## [1] "<0x10f992000>"
y[[2]][1] <- 0
## tracemem[0x10f992000 -> 0x1108d6000]:
untracemem(y[[2]])
tracemem[0x10f992000 -> 0x1108d6000] 这一行表示在修改向量 y[[2]] 时创建了一个副本,并给出了新副本的内存地址。现在,x、y[[1]] 和 y[[2]] 在内存中是不同的对象,因此 x 和 y 所使用的总内存为 24 MB:
address(x)
## [1] "0x10f992000"
address(y[[1]])
## [1] "0x110134000"
address(y[[2]])
## [1] "0x1108d6000"
object_size(y)
## 16 MB
object_size(x, y)
## 24 MB
当修改 y 的一个元素时,需要创建 x 的副本以确保原始对象 x 保持不变。否则,修改一个对象可能会无意中修改另一个对象,导致程序中可能出现难以发现的错误。
R 判断一个对象是否应该被复制的方式是通过跟踪是否有其他对象引用它。当 y 被创建时,R 知道 x 正在别处被使用,并且在修改时需要创建一个副本。
注意
R 只计算到两个引用,这对于它判断是否复制对象是足够的。只要两个或更多变量引用同一个对象,R 在修改时会复制它。
现在,当我们第一次修改 x 时,R 会创建它的一个副本,因为 x 之前已经被 y 引用。尽管 y 现在有自己的数据副本,但 R 为了谨慎起见,仍然创建 x 的副本以避免潜在的冲突。然而,随后的对 x 的修改不会导致不必要的复制,因为新的 x 副本没有在其他地方使用,正如这个例子所示:
tracemem(x)
## [1] "<0x10f992000>"
x[1]<- 1
## tracemem[0x10f992000 -> 0x111078000]:
x[1]<- 1
x[1]<- 0.5
x[2] <- 0.3
untracemem(x)
通常,只要一个向量没有被其他任何对象引用,R 允许它原地修改,从而避免复制向量的 CPU 和 RAM 开销。
注意
这个例子在 RStudio 中不起作用:
tracemem(x)
## [1] "<0x10e73c000>"
x[1] <- 0
## tracemem[0x10e73c000 -> 0x110d66000]:
x[2] <- 1
## tracemem[0x110d66000 -> 0x115478000]:
x[3] <- 0.5
## tracemem[0x115478000 -> 0x115c79000]:
untracemem(x)
这是因为 RStudio 在其自己的环境中保留每个对象的引用,所以 R 认为在别处有对 x 的引用。每次修改时,它都会创建 x 的一个副本以确保安全。
现在我们已经了解了 R 何时复制数据,我们可以优化 R 程序以避免不必要地复制数据。例如,假设我们有两个包含一百万客户年龄和性别的向量:
customer.age <- sample(18:100, 1e6, replace=TRUE)
customer.gender <- sample(c("Male", "Female"), 1e6, TRUE)
零售商使用"cust #"作为客户 ID。我们希望为每个向量标记客户 ID,这样我们就可以通过客户 ID 轻松查找信息,使用像customer.age["cust 1"]这样的表达式。一种方法是为每个向量单独构造名称。然后,这两个向量组合将占用 84 MB 的内存:
names(customer.age) <- paste("cust", 1:1e6)
names(customer.gender) <- paste("cust", 1:1e6)
object_size(customer.age, customer.gender)
## 84 MB
或者,名称可以存储在一个单独的向量中,年龄和性别向量随后可以引用:
customer.names <- paste("cust", 1:1e6)
names(customer.age) <- customer.names
names(customer.gender) <- customer.names
object_size(customer.age, customer.gender, customer.names)
## 76 MB
这个简单的更改节省了 8 MB 的内存。在更大的、更复杂的数据结构上,这些从避免不必要地复制数据中节省下来的内存可以非常显著。
相同的按修改复制行为也适用于函数参数。当一个对象传递给函数时,它不会被复制;R 只是提供了一个指向该对象的指针。然而,如果对象在函数内部被修改,R 会在函数的环境中创建该对象的副本,以确保原始对象在函数外部不会被以任何方式修改。在编程语言术语中,这被称为按值传递,因为函数被赋予了它们的参数值。这是 R 作为函数式编程语言设计的一部分。与此相对的是按引用传递,这在其他编程语言中有时被使用,例如 Java 和 C/C++,在这些语言中,函数可以接收对内存地址的引用或指针。在这种情况下,函数可以修改它们的参数,而无需在内存中创建额外的副本,并且修改在函数退出后仍然持续。
R 函数按值传递模型的一个后果是,许多函数需要复制它们接收到的数据。例如,调用sort(x)返回一个包含x排序值的新的向量,而不是就地排序值(这在 Java 和 C/C++中通常是做法)。调用像sort()这样的函数通常需要额外的内存,至少与原始数据一样大,有时更大。
当不再需要中间数据时移除它
在大型 R 程序中,对象通常在多个地方创建。通常,在程序早期部分创建的对象在程序的后期部分不再需要。当面临内存限制时,释放不再需要的对象占用的内存是有用的,这样程序的后续部分可以成功运行。
用于此的主要工具是rm()函数,它从当前 R 环境中移除给定列表的对象。
在下面的示例中,我们有一个包含来自零售店的 50 万个交易的 DataFrame 以及每个交易中的商品。DataFrame 的每一行代表在销售数据库中发生的唯一交易项对。尽管我们不得不在真实业务环境中生成这个示例的数据,但这些数据可以从零售商的销售数据库中提取:
trans.lengths <- rpois(5e5, 3) + 1L
trans <- rep.int(1:5e5, trans.lengths)
items <- unlist(lapply(trans.lengths, sample.int, n = 1000))
sales.data <- data.frame(trans = trans, item = items)
数据看起来像这样,例如,前九行表明交易 1 包含商品 680、846、196 等(你生成数据可能看起来不同):
head(sales.data, 15)
## trans item
## 1 1 680
## 2 1 846
## 3 1 196
## 4 1 191
## 5 1 20
## 6 1 852
## 7 1 623
## 8 1 206
## 9 1 775
## 10 2 624
## 11 2 31
## 12 2 718
## 13 2 190
## 14 3 482
## 15 3 946
我们的任务是找到常见的商品篮子,即在同一交易中频繁一起出现的商品,或者频繁项集。arules CRAN 包中的apriori()函数可以用来找到这些频繁项集。但它不接受从销售数据库中提取的交易项对形式的数据。相反,arules定义了它接受的输入transactions类。我们需要将数据框的商品列拆分为不同的交易,然后将生成的列表强制转换为transactions对象:
library(arules)
trans.list <- split(sales.data$item, sales.data$trans)
trans.arules <- as(trans.list, "transactions")
我们现在可以调用apriori()函数来找到频繁项集。在这个例子中,我们想要支持度至少为 0.3 的项集,即至少在 30%的交易中出现的商品集合。
freq.itemsets <- apriori(trans.arules, list(support = 0.3))
当我们从交易项对的数据框开始时,我们必须将其转换为几种不同的格式,然后数据才能被apriori()使用。这些中间数据结构都占用了宝贵的内存:
object_size(sales.data)
## 16 MB
object_size(trans.list)
## 62.1 MB
object_size(trans.arules)
## 44 MB
当数据集很大或内存不足时,apriori()可能会因为内存不足而无法执行。在这种情况下,可以在调用apriori()之前或甚至在每个数据转换步骤之间使用rm()来释放内存,通过删除不需要的对象。以下代码说明了这一点:
trans.list <- split(sales.data$item, sales.data$trans)
rm(sales.data)
trans.arules <- as(trans.list, "transactions")
rm(trans.list)
freq.itemsets <- apriori(trans.arules, list(support = 0.3))
自动删除临时变量的另一种技术是将代码封装在函数中。这样,在函数中创建的任何变量在函数结束时都会自动删除。例如,如果我们只需要在调用apriori()之前删除临时变量,因为那时代码往往会遇到内存限制。我们可以将所有之前的代码行封装在一个函数中:
# Automatically remove temporary variables by encapsulating code
# in a function
prepare_data <- function(sales.data) {trans.list <- split(sales.data$item, sales.data$trans)trans.arules <- as(trans.list, "transactions")return(trans.arules)
}
trans.arules <- prepare_data(sales.data)
freq.itemsets <- apriori(trans.arules, list(support = 0.3))
在调用prepare_data()之后,不需要显式调用rm(),其中的任何临时变量都会被删除。在这种情况下,只有一个临时变量trans.list被删除。但同样的技术也可以在函数中声明更多临时变量时使用。这种方法不仅方便删除临时变量,而且使代码更易于阅读和维护。
在大型 R 程序中,定期删除大型数据结构可以帮助最小化整体内存使用。当调用 rm() 时,内存可能不会立即释放并返回给操作系统。相反,R 的 垃圾回收器会在需要时自动释放内存,或者当从移除的对象中释放的内存量超过阈值时。
在线计算值而不是持久存储
在执行 R 程序时,有时方便将程序所需的所有数据,包括中间计算结果,在执行前缓存到 RAM 中。在执行过程中,当程序需要访问数据的任何部分时,由于所有数据都已加载到 R 工作空间中,因此可以非常快速地进行。在 RAM 中缓存中间结果可以显著节省计算时间,尤其是在频繁访问时,因为避免了数据的重复计算。
当缓存的数据可以适应 RAM 时,这不是问题。然而,当没有足够的内存空间来容纳数据时,它就变成了问题。好消息是,在许多情况下,程序不需要同时访问数据的所有部分。一种解决方案是在 RAM 和硬盘之间交换数据的部分。因为磁盘 I/O 很慢,正如我们在第一章中确立的,理解 R 的性能 – 为什么 R 程序有时运行缓慢?,这种方法可能会导致执行缓慢。更好的解决方案是计算和重新计算当前需要的部分数据。是的,计算需要计算时间,但通常比磁盘 I/O 成本低。
让我们从一个数据科学中常见的任务举例:层次聚类。在一些常用的层次聚类变体中,例如单链接、完全链接和平均链接,算法中的一个重要步骤是计算数据集中每对观测值之间的距离矩阵,然后决定哪一对观测值彼此最接近。这一步骤可以通过以下代码实现,其中我们人为地创建了一个随机数据集 A,包含 10,000 个观测值(行)和 10 个特征(列)。代码首先计算 A 的距离矩阵,将距离矩阵的对角线元素设置为 NA,因为观测值总是最接近自身的,最后使用 which() 函数找到最近的观测值对。在这个例子中,发现观测值 6778 和 6737 是最近的一对。要执行此程序,大约需要 801 MB 的 RAM,如下代码中 object_size() 的输出所示。这是因为尽管数据集只占用 800 KB,但其距离矩阵需要大约原空间的平方量,因为它存储了所有成对距离:
A <- matrix(rnorm(1E5), 1E4, 10)
dist_mat <- as.matrix(dist(A))
diag(dist_mat) <- NA
res1 <- which(dist_mat == min(dist_mat, na.rm=T), arr.ind = T)[1,]
res1
## row col
## 6778 6737
object_size(A)
## 800 kB
object_size(dist_mat)
## 801 MB
仔细观察后,我们实际上并不需要一次性获取整个距离矩阵来找到最近的一对。可以计算第一个观测值与剩余观测值之间的成对距离,以获取该集合的最小对;对第二个观测值重复此过程,然后比较两个集合的最小值,依此类推。这样做会带来额外的步骤,因此计算时间会更长,但与前面的代码相比,它只需要占用很小一部分 RAM(因为一次只维护距离矩阵的一小部分)。下面的代码展示了如何执行此操作。它首先使用 pdist 包计算 A 中所有观测值与观测值 1 之间的距离,然后只为此块找到并保存最近的对到临时列表 output。使用 lapply 对 A 中的观测值 2、3、…、10,000 重复此过程。每个块的最接近对集合存储在 temp_res 列表中。最后一步是找到这个集合中的最小对并将其存储在变量 res2 中。评估 res2 的输出会显示与前面代码找到的结果相同。然而,这次我们只需要 temp_res 列表占用 2.7 MB 的内存。
library(pdist)
temp_res <- lapply(1:nrow(A), function(x) {temp <- as.matrix(pdist(X = A, Y = A[x,]));temp[x] <- NA;output_val <- min(temp, na.rm=T);output_ind <- c(x, which(temp == output_val));output <- list(val = output_val, ind = output_ind);
})
val_vec <- sapply(temp_res, FUN=function(x) x$val)
ind_vec <- sapply(temp_res, FUN=function(x) x$ind)
res2 <- ind_vec[, which.min(val_vec)]
res2
## [1] 6778 6737
object_size(temp_res)
## 2.72 MB
object_size(val_vec)
## 80 kB
object_size(ind_vec)
## 80.2 kB
确实,第二种方法所需的时间更长。我们可以通过并行化代码来加快速度,例如,用 parallel 包中的 parLApply() 替换 lapply()(参见第八章 Multiplying Performance with Parallel Computing,使用并行计算提高性能)。在实践中,对于寻找最接近的一对观测值且不存储完整距离矩阵的特定情况,我们可以利用优化的 k 近邻函数,如 FNN 包中的 knn() 函数。如果这样的替代优化包不可用,那么像前面代码中所示的计算即时值的做法对于减少内存使用是有用的。
交换活跃和非活跃数据
在某些情况下,为了在程序中稍后使用而移除以释放内存的大对象是必需的。R 提供了将数据保存到磁盘并在有足够内存时稍后重新加载的工具。回到零售销售数据示例,假设我们在挖掘频繁项集之后需要 sales.data 数据框进行进一步处理。我们可以使用 saveRDS() 将其保存到磁盘,然后使用 readRDS() 在稍后重新加载:
trans.list <- split(sales.data$item, sales.data$trans)
saveRDS(sales.data, "sales.data.rds")
rm(sales.data)
trans.arules <- as(trans.list, "transactions")
rm(trans.list)
freq.itemsets <- apriori(trans.arules, list(support = 0.3))
sales.data <- readRDS("sales.data.rds")
# Perform further processing with sales.data
saveRDS() 和 readRDS() 函数每次保存一个对象,而不保存对象名称。例如,名称 sales.data 不会被保存。然而,列名称 trans 和 items 被保存。作为替代,可以使用 save() 和 load() 函数来处理多个对象,甚至是一个环境中所有对象及其变量名称。
摘要
在本章中,我们学习了 R 内存管理的修改时复制语义。对这一机制的良好理解使我们能够找到减少 R 程序内存消耗的机会。
我们还看到了如何在不必要时从环境中移除临时变量和中间计算,以释放内存供后续计算使用。除了显式地移除临时变量外,我们还学习了两种自动管理临时变量的方法。首先,即时计算在内存中不创建持久变量的情况下产生中间数据。其次,函数是分组相关操作的有用方式,当退出函数时可以自动移除临时变量。
最后,我们看到了如何将数据保存到磁盘上以释放内存,并在需要时重新加载它们。
在下一章中,我们将探讨更多高级技术,以优化内存消耗,并允许 R 程序处理更大的数据集,甚至那些太大而无法装入内存的数据。
第七章. 使用有限 RAM 处理大数据集
在上一章中,我们学习了如何通过减少数据的复制和删除临时数据来优化 R 程序的内存消耗。有时,这仍然不够。我们可能有一些数据太大,甚至无法放入内存,更不用说对其进行任何计算了,或者即使数据可以放入内存,可供我们进行所需分析的空闲内存也不多。
在本章中,我们将学习克服内存限制并处理大数据集的高级技术。
本章涵盖:
-
使用内存高效的数据结构
-
使用内存映射文件和分块处理数据
使用内存高效的数据结构
当你处理大数据集时,首先要考虑的是是否可以使用更内存高效的数据结构来存储和处理相同的信息。但首先我们需要知道数据在 R 中是如何存储的。向量是 R 中几乎所有数据类型的基本构建块。R 提供了逻辑、整数、数值、复数、字符和原始类型的原子向量。许多其他数据结构也是从向量构建的。例如,列表在 R 的内部存储结构中本质上就是向量。它们与原子向量的不同之处在于它们存储指向其他 R 对象的指针,而不是原子值。这就是为什么列表可以包含不同类型的对象。
让我们来看看每种原子数据类型需要多少内存。为此,我们将创建每种类型的包含一百万个元素的向量,并使用object.size()(对于字符向量,我们将调用rep.int(NA_character_, 1e6),这将创建一个真正包含NA值的空字符向量,正如我们很快将看到的,它比包含空字符串的字符向量占用更少的内存)来测量它们的内存消耗:
object.size(logical(1e6))
## 4000040 bytes
object.size(integer(1e6))
## 4000040 bytes
object.size(numeric(1e6))
## 8000040 bytes
object.size(complex(1e6))
## 16000040 bytes
object.size(rep.int(NA_character_, 1e6))
## 8000040 bytes
object.size(raw(1e6))
## 1000040 bytes
object.size(vector("list", 1e6))
## 8000040 bytes
这些结果是从 R 的 64 位版本中获得的。请注意,所有这些向量都占用 1 百万字节以上的内存,额外还占用 40 字节。这 40 字节被向量头占用,R 使用这些头存储有关向量的信息,例如长度和数据类型。剩余的空间被向量中存储的数据占用。
通过将这些数字除以一百万,我们发现原始值每个占用 1 字节,逻辑和整数值占用 4 字节,数值占用 8 字节,复数值占用 16 字节。以下图显示了这些类型向量的结构和内存需求:

逻辑、整数、数值和复数向量的内部结构
字符向量和列表有一点不同,因为它们在其向量中不存储实际数据。相反,它们存储指向其他包含实际数据的向量的指针。在计算机的内存中,字符向量或列表的每个元素都是一个指针,在 32 位 R 中占用 4 字节,在 64 位 R 中占用 8 字节。这将在以下图中展示:

列表和字符向量的内部结构
让我们更仔细地检查字符向量,看看它们是如何存储的。为此,我们将生成三个不同的字符向量,每个向量都有 1,000,000 个由 10 个字符组成的字符串。第一个向量简单地包含 1,000,000 个 "0123456789" 字符串的副本,使用 formatC() 函数生成,以占用十个字符。第二个向量包含 1,000 个由 formatC() 生成的 1,000 个唯一字符串的副本,每个字符串占用 10 个字符。第三个向量包含 1,000,000 个具有 10 个字符的唯一字符串。因为这些向量包含相同数量的字符串,且长度相同,我们预计它们将占用相同数量的内存。让我们测试这个假设:
object.size(rep.int("0123456789", 1e6))
## 8000096 bytes
object.size(rep.int(formatC(seq_len(1e3), width = 10), 1e3))
## 8056040 bytes
object.size(formatC(seq_len(1e6), width = 10))
## 64000040 bytes
结果表明,三个字符向量占用的内存量差异很大,这取决于字符串的实际内容。这是因为 R 为了节省内存,只在其 CHARSXP 缓存中存储每个唯一字符串的一个副本。我们创建的字符向量实际上存储的是指向这个缓存中字符串的指针,而不是字符串本身。
此外,这个缓存中的每个字符串都是一个完整的 R 向量,具有 24 个或 40 字节的头信息(在 32 位和 64 位 R 中分别),以及一个字符串。空字符被附加到字符串的末尾,总向量长度向上取整到最接近的 8 的倍数。例如,字符串 0123456789 将被存储为 0123456789\0(其中 \0 是空字符)加上五个额外的字节,总共 16 字节。在 64 位 R 中,加上 40 字节的头信息,这个 10 个字符的字符串将占用 56 字节的内存。
回到结果,包含 1,000,000 个 0123456789 复制品的第一个向量,其字符向量本身需要 8,000,040 字节,用于存储指针,另外还需要 56 字节来存储字符串本身。这使得总字节数达到 8,000,096 字节,这是 object.size() 报告的。
第二个向量包含 1,000 个唯一字符串,因此总共使用 8,000,040 + 1,000 × 56 = 8,056,040 字节的内存。
第三个向量包含 1,000,000 个唯一字符串,因此总共使用 8,000,040 + 1,000,000 × 56 = 64,000,040 字节的内存。
显然,字符向量的内存消耗取决于向量中包含的唯一字符串数量。
较小的数据类型
在理解了原子向量在 R 中的存储方式之后,我们现在来看看一些简单的策略来减少大数据集的内存占用,以便它们可以适应内存进行分析。
一种方法是在可能的情况下将数据强制转换为较小的数据类型。例如,如果一个数据集只包含整数值,将它们存储在整数向量而不是数值向量中可以减少大约一半的内存消耗:
object.size(as.numeric(seq_len(1e6)))
## 8000040 bytes
object.size(as.integer(seq_len(1e6)))
## 4000040 bytes
这也适用于字符字符串。在一个字符向量中,如果有许多重复的字符串,将其转换为因子向量可以减少内存消耗,因为因子实际上是索引唯一字符串(因子的水平)的整数向量:
strings <- rep.int(formatC(seq_len(1e4), width = 1000), 100)
factors <- factor(strings)
object.size(strings)
## 18480040 bytes
object.size(factors)
## 14560400 bytes
这些相同的技巧也可以应用于其他数据结构的组件,例如基于原子向量的矩阵、数据框和列表。
稀疏矩阵
有时数据可能非常稀疏,即包含很多零或空值。与在内存中存储完整的矩阵向量相比,使用稀疏矩阵可以显著减少表示数据所需的内存量。R 中的Matrix包提供了稀疏矩阵。
假设我们有一个 1,000×1,000 的数值矩阵(总共有 1 百万个元素),其中大约有 70%的零。我们可以使用Matrix()函数从这个数据创建密集或稀疏矩阵,具体取决于稀疏参数:
library(Matrix)
n <- rnorm(1e6)
n[sample.int(1e6, 7e5)] <- 0
m.dense <- Matrix(n, 1e3, 1e3, sparse = FALSE)
m.sparse <- Matrix(n, 1e3, 1e3, sparse = TRUE)
object.size(n)
## 8000040 bytes
object.size(m.dense)
## 8001112 bytes
object.size(m.sparse)
## 3605424 bytes
稠密矩阵所需的内存量与原始数据的数值向量大致相同。稀疏矩阵通过减少 55%的数据大小来减少数据量。
稀疏矩阵对于二进制数据(TRUE/FALSE,0/1,"yes"/"no","hot"/"cold"等)也非常有用。只需将二进制数据转换为逻辑值,其中大多数类是FALSE(如果大多数类是TRUE,则反转数据)。然后我们可以创建只存储矩阵中TRUE值出现位置的稀疏矩阵。再次,让我们在一个 70%稀疏的 1 百万个逻辑值矩阵上测试这个方法:
l <- sample(c(FALSE, TRUE), 1e6, TRUE, c(0.7, 0.3))
m2.dense <- Matrix(l, 1e3, 1e3, sparse = FALSE)
m2.sparse <- Matrix(l, 1e3, 1e3, sparse = TRUE)
object.size(l)
## 4000040 bytes
object.size(m2.dense)
## 4001112 bytes
object.size(m2.sparse)
## 2404384 bytes
稀疏逻辑矩阵比稀疏数值矩阵更加紧凑,其大小减少了 33%。
对称矩阵
对称矩阵,即等于其转置的矩阵,在许多统计方法中都有应用。例如,距离矩阵、相关矩阵和图邻接矩阵。由于我们可以通过取一半矩阵的镜像来生成矩阵的另一半,包括对角线,因此只保留矩阵的一半可以节省内存。Matrix包提供了dspMatrix类来高效地存储对称矩阵:
library(Matrix)
data <- matrix(rnorm(1E5), 1E2, 1E3)
A <- cor(data)
isSymmetric(A)
## [1] TRUE
B <- as(A, "dspMatrix")
object.size(A)
## 8000200 bytes
object.size(B)
## 4005320 bytes
除了稀疏和对称矩阵之外,Matrix软件包还提供了几种其他高效的矩阵类型数据结构,包括三角矩阵和对角矩阵。根据数据类型的不同,这些数据结构中的一些可能比前面示例中使用的通用稀疏或对称矩阵更节省内存。此外,该软件包使得基本的矩阵运算,如矩阵乘法(%*%),适用于密集和稀疏矩阵。因此,在大多数情况下,我们不需要手动将矩阵运算从密集版本转换为稀疏版本。有关更多详细信息,请参阅Matrix软件包的文档。
位向量
二进制数据可以用位向量以更有效的方式表示。与 R 中的逻辑值不同,逻辑值占用四个字节或 32 位,位向量只使用一个位来存储每个逻辑值。这将逻辑值的内存消耗减少了 32 倍。然而,位向量不能存储NA值,因此它们不适用于包含NA值的数据。
在 R 中,CRAN 上的bit软件包提供了位向量。让我们比较逻辑向量和等效位向量的大小:
library(bit)
l <- sample(c(TRUE, FALSE), 1e6, TRUE)
b <- as.bit(l)
object.size(l)
## 4000040 bytes
object.size(b)
## 126344 bytes
如预期的那样,位向量的大小是 3.2%,即逻辑向量的 1/32。
位向量还允许进行更快的逻辑运算:
library(microbenchmark)
l2 <- sample(c(TRUE, FALSE), 1e6, TRUE)
b2 <- as.bit(l2)
microbenchmark(!l, !b)
## Unit: microseconds
## expr min lq median uq max neval
## !l 1201.993 1452.2925 1566.966 2951.0405 23045.003 100
## !b 51.145 64.7185 107.065 113.2045 461.624 100
microbenchmark(l & l2, b & b2)
## Unit: microseconds
## expr min lq median uq max neval
## l & l2 22808.696 23104.647 23309.7475 24473.137 38334.65 100
## b & b2 60.948 64.615 78.5025 135.126 13732.20 100
microbenchmark(l == l2, b == b2)
## Unit: microseconds
## expr min lq median uq max neval
## l == l2 1954.402 2208.3235 2227.8980 2320.104 16825.13 100
## b == b2 60.263 63.2235 87.7245 121.448 14184.91 100
当处理大量逻辑或二进制数据时,位向量不仅节省内存,而且在进行操作时还能提供速度提升。
使用内存映射文件和分块处理数据
有些数据集非常大,即使应用了所有内存优化技术并使用了最有效的数据类型,它们仍然太大,无法装入或处理在内存中。除了获取额外的 RAM 之外,处理此类大型数据的一种方法是将它们以内存映射文件的形式存储在磁盘上,并一次将一小部分数据加载到内存中进行处理。
例如,假设我们有一个数据集,如果完全加载到内存中,将需要 100 GB 的 RAM,并且还需要额外的 100 GB 空闲内存来处理数据。如果处理数据的计算机只有 64 GB 的 RAM,我们可能需要将数据分成四个 25 GB 的数据块。然后 R 程序将一次加载一个数据块到内存中,并对每个数据块进行必要的计算。在所有数据块都处理完毕后,将最终将每个数据块的计算结果合并,以计算最终结果。这能否轻松完成取决于正在数据上运行的算法的性质。一些算法可以轻松地转换为对数据块进行计算,而其他算法可能需要付出相当大的努力才能做到这一点。
有两个 CRAN 软件包提供了内存映射文件,以便以这种方式处理大型数据集:bigmemory和ff。我们将依次查看这些软件包。
大内存包
bigmemory CRAN 包提供了一个类似于矩阵的数据结构,称为 big.matrix。存储在 big.matrix 对象中的数据可以是 double 类型(8 字节,默认),integer 类型(4 字节),short 类型(2 字节)或 char 类型(1 字节)。big.matrix 对象可以存在于 RAM 中或以内存映射文件的形式存在,并且可以像标准 R 矩阵一样进行操作。
注意
在撰写本文时,bigmemory 在 Windows 上不受支持,但包作者正在努力解决这个问题。
要创建一个 big.matrix 对象,我们可以调用 big.matrix() 来创建一个新对象,或者调用 as.big.matrix() 将矩阵强制转换为 big.matrix。在下一个示例中,我们将创建一个包含 10 亿行和 3 列的新 big.matrix 对象,位于 R 的临时文件夹中:
library(bigmemory)
bm <- big.matrix(1e9, 3, backingfile = "bm",backingpath = tempdir())
bm
## An object of class "big.matrix"
## Slot "address":
## <pointer: 0x7fac1950a4a0>
运行这个程序可能需要一段时间,但完成后,我们将有一个新的对象 bm,它存储指向新内存映射文件的指针。我们可以在临时目录中找到名为 bm 的新文件,大小为 22 GB:
aloysius@localhost RtmpG0CQdS $ ls -lh
total 46875024
-rw-r--r-- 1 aloysius staff 22G Sep 18 08:02 bm
-rw-r--r-- 1 aloysius staff 452B Sep 18 08:02 bm.desc
这样大的数据集可能无法适应大多数计算机的主内存。还创建了一个名为 bm.desc 的文件,与数据文件一起使用。该文件用于在以后或由另一个 R 程序通过调用类似 my.bm <- attach.big.matrix(file.path(tempdir(), "bm.desc")) 的方式检索内存映射文件。
big.matrix 对象支持许多与标准 R 矩阵相同的操作:
typeof(bm)
## [1] "double"
dim(bm)
## [1] 1e+09 3e+00
nrow(bm)
## [1] 1e+09
ncol(bm)
## [1] 3
length(bm)
## [1] 3e+09
bm[1:5, ]
## [,1] [,2] [,3]
## [1,] 1 0 0
## [2,] 0 1 0
## [3,] 0 0 1
## [4,] 0 0 0
## [5,] 0 0 0
bm[1:3, ] <- diag(3)
bm[1:5, ]
## [,1] [,2] [,3]
## [1,] 1 0 0
## [2,] 0 1 0
## [3,] 0 0 1
## [4,] 0 0 0
## [5,] 0 0 0
当使用子集操作符 [ 时,bigmemory 将选定的数据部分作为矩阵加载到 RAM 中。随着 big.matrix 的不同部分被使用,bigmemory 会自动将相关数据部分加载到 RAM 中,并删除不再需要的部分。由于 [ 选定的所有内容都会加载到内存中,因此必须注意确保选定的数据可以适应可用的内存。例如 bm[, ] 的调用可能会引发内存不足错误。
现在我们来看一个 R 程序如何与 big.matrix 一起工作,一次处理一个数据块。首先我们将随机数据填充到它里面,一次一个数据块。第一列将包含均值为 1000 的泊松分布的整数。第二列将包含由一和零表示的二进制数据。第三列将包含在 0 到 100000 之间均匀分布的实数。以下代码以 100 个 10 百万行的块将随机数填充到 bm 中:
chunksize <- 1e7
start <- 1
while (start <= nrow(bm)) {end <- min(start + chunksize - 1, nrow(bm))chunksize <- end - start + 1bm[start:end, 1] <- rpois(chunksize, 1e3)bm[start:end, 2] <- sample(0:1, chunksize, TRUE,c(0.7, 0.3))bm[start:end, 3] <- runif(chunksize, 0, 1e5) start <- start + chunksize
}
分块计算的一个例子是计算每一列的标准差。调用 sd(bm[1, ]) 可能不会工作,因为即使是一列数据也可能超过可用的内存。需要通过数据两次:一次计算每一列的平均值,另一次计算与平均值的平方偏差。
数据可以分成 1000 万行的块,就像之前一样。在第一次遍历中,计算列的平均值:
col.sums <- numeric(3)
chunksize <- 1e7
start <- 1
while (start <= nrow(bm)) {end <- min(start + chunksize - 1, nrow(bm))col.sums <- col.sums + colSums(bm[start:end, ])start <- start + chunksize
}
col.means <- col.sums / nrow(bm)
代码遍历每个数据块,并使用 colSums() 函数计算每个数据块的列和。这被添加到全局列和 col.sums 中。一旦所有数据块都经过处理,通过将 col.sums 除以数据的行数来计算列均值。
在第二次遍历时,计算观测值与列均值的平方偏差:
col.sq.dev <- numeric(3)
start <- 1
while (start <= nrow(bm)) {end <- min(start + chunksize - 1, nrow(bm))col.sq.dev <- col.sq.dev +rowSums((t(bm[start:end, ]) - col.means) ^ 2)start <- start + chunksize
}
col.var <- col.sq.dev / (nrow(bm) - 1)
col.sd <- sqrt(col.var)
每个数据块首先使用 t() 进行转置,以便从转置数据的每一列中减去 col.means 来计算与均值的偏差。然后对偏差进行平方,并在转置后的数据行上求和。
一旦所有数据块都经过处理,每个列的总平方偏差然后除以 n-1 来计算每个列的方差。最后,列方差的平方根给出列标准差。
bigmemory 的作者还编写了一个配套包 biganalytics,它为 big.matrix 对象提供了常用的统计函数。我们可以将前面练习的结果与 biganalytics 的 colsd() 函数进行比较:
library(biganalytics)
col.sd
## [1] 3.162261e+01 4.582687e-01 2.886805e+04
big.col.sd <- colsd(bm)
all.equal(col.sd, big.col.sd)
## [1] TRUE
我们已经看到了如何使用 big.matrix 对象在数据块上执行计算。bigmemory 的作者还创建了其他 CRAN 包,这些包提供了在 big.matrix 对象上操作的有用函数。以下表格列出了这些函数:
| 包 | 提供的函数样本 |
|---|---|
biganalytics |
统计:colmean()、colmin()、min()、colmax()、max()、colrange()、range()、colvar()、colsd()、colsum()、sum()、colprod()、prod() 和 colna();应用:apply();线性模型:biglm.big.matrix()、bigglim.big.matrix();聚类:bigkmeans() |
bigtabulate |
表和 tapply:bigtabulate()、bigtable()、bigtsummary();分割:bigsplit() |
bigalgebra |
算术运算 |
The ff 包
虽然big.matrix对于可以强制转换为相同类型的数据很有用,但在处理异构数据类型时,有时需要更类似数据框的内存映射格式。ff CRAN 包提供了这种功能。
ff CRAN 包支持比 bigmemory 更多的数据类型。下表显示了可以在 ff 向量、数组和数据框中存储的不同数据类型,称为 vmodes:
| 数据类型或 vmode | 描述 |
|---|---|
Boolean |
无NA的 1 位逻辑 |
Logical |
2 位带NA的逻辑 |
Quad |
无NA的 2 位无符号整数 |
Nibble |
无NA的 4 位无符号整数 |
Byte |
带NA的 8 位符号整数 |
Ubyte |
无NA的 8 位无符号整数 |
Short |
16 位带NA的符号整数 |
Ushort |
无NA的 16 位无符号整数 |
Integer |
带NA的 32 位符号整数 |
Single |
32 位浮点数 |
Double |
64 位浮点数 |
Complex |
2 x 64 位浮点数 |
Raw |
8 位无符号字符 |
Factor |
因子(存储为 integer) |
Ordered |
有序因子(存储为 integer) |
POSIXct |
POSIXct(存储为 double) |
Date |
日期(存储为 double) |
可以通过将值向量传递给 ff() 函数来创建 ff 对象:
i <- ff(1:1e6)
i
## ff (open) integer length=1000000 (1000000)
## [1] [2] [3] [4] [5] [6]
## 1 2 3 4 5 6
## [7] [8] [999993] [999994] [999995]
## 7 8 : 999993 999994 999995
## [999996] [999997] [999998] [999999] [1000000]
## 999996 999997 999998 999999 1000000
filename(i)
## [1] "/private/var/folders/xw/xp2p4mjd3nb6n6h30w67jkdc0000gn/T/## RtmptxP4qw/ff449847497df9.ff"
由于没有指定文件名,ff() 自动在 R 的临时目录中创建一个新文件。文件名也可以通过 filename 参数指定,如下一个示例所示。
如果将标量传递给 ff() 并与新的 ff 对象的维度一起使用,标量值将用于初始化对象:
j <- ff(FALSE, dim = c(50, 100),filename = file.path(tempdir(), "j.ff"))
j
## ff (open) logical length=5000 (5000) dim=c(50,100)
## dimorder=c(1,2)
## [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8] [,100]
## [1,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## [2,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## [3,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## [4,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## [5,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## : : : : : : : : : : :
## [46,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## [47,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## [48,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## [49,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
## [50,] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE : FALSE
vmode 参数设置 ff 对象的存储模式:
q <- ff(sample(0:3, 1e6, TRUE), vmode = "quad")
q
## ff (open) quad length=1000000 (1000000)
## [1] [2] [3] [4] [5] [6]
## 2 2 2 2 0 1
## [7] [8] [999993] [999994] [999995]
## 1 0 : 1 0 1
## [999996] [999997] [999998] [999999] [1000000]
## 0 1 0 0 0
数据框可以使用 ffdf() 构建出来。在这里,我们使用前面代码中创建的整数和四舍五入 ff 向量创建一个新的 ffdf 对象:
d <- ffdf(i, q)
d[1:5, ]
## i q
## 1 1 2
## 2 2 2
## 3 3 2
## 4 4 2
## 5 5 0
vmode(d)
## i q
## "integer" "quad"
ff 对象提供了方便的 chunk() 函数,可以根据可用内存将数据分割成块。使用默认参数时,chunk() 建议一次性将整个数据框 d 载入一个块中:
chunk(d)
## [[1]]
## range index (ri) from 1 to 1000000 maxindex 1000000
最大的块大小(以字节为单位)也可以使用 BATCHBYTES 参数设置。当它设置为 200 万字节时,chunk() 建议将数据分割成四个块:
ch <- chunk(d, BATCHBYTES = 2e6)
ch
## [[1]]
## range index (ri) from 1 to 250000 maxindex 1000000
##
## [[2]]
## range index (ri) from 250001 to 500000 maxindex 1000000
##
## [[3]]
## range index (ri) from 500001 to 750000 maxindex 1000000
##
## [[4]]
## range index (ri) from 750001 to 1000000 maxindex 1000000
通常,希望块的数量更少,因为每个块都会产生(通常是小的)I/O 开销,每次 R 会话需要从磁盘读取数据时都需要这些开销。
chunk() 返回的索引可以用于索引 ffdf 或 ff 对象的行。以下代码遍历每个数据块,选择具有 d[idx, ] 和 q[idx] 的块,并在该块上执行一些计算。
total <- numeric(2)
quad.table <- integer(4)
names(quad.table) <- 0:3
for (idx in ch) {total <- total + colSums(d[idx, ])quad.table <- quad.table + table(q[idx])
}
total
## i q
## 500000500000 1500191
quad.table
## 0 1 2 3
## 249939 249964 250064 250033
ff CRAN 包有一个配套包 ffbase,它提供了用于操作 ff 和 ffdf 对象的有用函数。以下是这些函数的示例:
-
数学:
abs(),sign(),sqrt(),ceiling(),floor(),log(),exp(),cos(),cosh(),sin(),sinh(),gamma() -
汇总:
all(),any(),max(),min(),cumsum(),cummin() -
唯一性:
duplicated(),unique() -
应用:
ffdfdply()
当我们完成对 ff 或 ffdf 对象的操作后,可以使用 delete() 删除文件,并使用 rm() 删除 R 变量:
delete(d)
## [1] TRUE
delete(lm)
## [1] TRUE
rm(d)
rm(lm)
由于在删除数据框 d 时也会删除底层向量 i 和 q,尝试删除这些向量将导致错误。我们可以简单地删除 R 对象:
delete(i)
## [1] FALSE
## Warning message:
## In file.remove(attr(physical, "filename")) :
## cannot remove file '/private/var/folders/xw/## xp2p4mjd3nb6n6h30w67jkdc0000gn/T/RtmptxP4qw/ff449847497df9.ff', ## reason 'No such file or directory'
rm(i)
rm(q)
概述
在本章中,我们学习了 R 如何在内存中存储向量,以及如何估计不同类型数据所需的内存量。我们还学习了如何使用更高效的数据结构,如稀疏矩阵和位向量来存储某些类型的数据,以便它们可以在内存中完全加载和处理。
对于仍然太大的数据集,我们使用了big.matrix、ff和ffdf对象,通过内存映射文件将内存存储在磁盘上,并逐块处理数据。bigmemory和ff包及其配套包提供了一套丰富的功能,用于内存映射文件,这些功能在本书中无法完全涵盖。我们鼓励您查阅这些包的文档,以了解更多关于如何利用内存映射文件处理大型数据集的方法。
在下一章中,我们将超越在单个进程或线程中运行 R,学习如何并行运行 R 计算。
第八章. 使用并行计算提高性能
在本章中,我们将学习如何编写和执行并行 R 代码,其中代码的不同部分同时运行。到目前为止,我们已经学习了各种优化串行运行的 R 程序性能的方法,即在单个进程中运行。这并没有充分利用现代 CPU 的多核计算能力。并行计算使我们能够利用所有可用的计算资源,并通过许多倍的速度加快 R 程序的执行。我们将检查不同的并行类型以及如何在 R 中实现它们,并且我们将更详细地查看设计 R 程序并行架构时的几个性能考虑因素。
本章涵盖了以下主题:
-
数据并行与任务并行
-
实现数据并行算法
-
实现任务并行算法
-
在计算机集群上并行执行任务
-
共享内存与分布式内存并行
-
优化并行性能
数据并行与任务并行
许多现代软件应用程序被设计为在几乎任何计算机上并行运行计算,以利用今天几乎任何计算机上可用的多个 CPU 核心。许多 R 程序也可以类似地编写以并行运行。然而,可能的并行程度取决于涉及的计算任务。在衡量的一端是令人尴尬的并行任务,其中并行子任务之间没有依赖关系;这些任务可以非常容易地并行运行。这种例子之一是在随机森林算法中构建决策树集合——随机决策树可以独立于彼此并行构建,并在数十或数百个 CPU 上并行运行,然后可以组合成随机森林。在衡量的另一端是无法并行化的任务,因为任务的每一步都依赖于前一步的结果。一个这样的例子是树的深度优先搜索,其中每一步要搜索的子树取决于之前步骤中采取的路径。大多数算法在衡量之间,有些步骤必须串行运行,有些可以并行运行。考虑到这一点,在设计正确且高效的并行代码时,必须仔细思考。
通常,R 程序中的一些部分必须串行运行,而其他部分可以并行运行。在努力并行化任何 R 代码之前,估计可能实现的可实现性能增益是有用的。阿姆达尔定律提供了一种方法来估计将代码从串行转换为并行执行时可能实现的最佳性能增益。它将计算任务分为其串行和潜在并行部分,并指出并行执行任务所需的时间不会少于以下公式:
T(n) = T(1)(P + (1-P)/n),其中:
-
T(n) 是使用 n 个并行进程执行任务所需的时间
-
P 是整个任务中严格串行的比例
因此,并行算法的理论最佳加速比是:
S(n) = T(1) / T(n) = 1 / (P + (1-P)/n)
例如,给定一个在一个处理器上执行需要 10 秒的任务,其中一半的任务可以并行运行,那么在四个处理器上运行的最佳时间是 T(4) = 10(0.5 + (1-0.5)/4) = 6.25 秒。
使用四个处理器的并行算法的理论最佳加速比是 1 / (0.5 + (1-0.5)/4) = 1.6x。
下图显示了随着 CPU 核心数的增加,理论最佳执行时间如何降低。请注意,执行时间达到一个略高于五秒的限制。这对应于必须串行运行的任务的一半,此时并行化没有帮助。

最佳执行时间与 CPU 核心数的关系
通常,Amdahl 定律意味着任何并行化算法的最快执行时间都受算法串行部分所需时间的限制。请注意,Amdahl 定律仅提供了一个理论估计。它不考虑并行计算的开销(如启动和协调任务),并假设算法的并行部分可以无限扩展。在实践中,这些因素可能会显著限制并行化的性能提升,因此仅使用 Amdahl 定律来获得最大加速比的大致估计。
并行化主要有两类:数据并行化和任务并行化。理解这些概念有助于确定哪些类型的任务可以被修改以并行运行。
在 数据并行化 中,数据集被分成多个分区。不同的分区被分配到多个处理器上,并在每个数据分区上执行相同的任务。以例如在向量数据集中找到最大值的任务为例,假设有一个包含十亿个数值数据点的数据集。执行此任务的串行算法如下代码所示,它按顺序遍历数据中的每个元素以搜索最大值。(此代码故意冗长,以说明算法的工作原理;在实践中,R 中的max()函数虽然本质上也是串行的,但速度要快得多。)
serialmax <- function(data) {max = -Inffor (i in data) {if (i > max)max = i}return max
}
将此算法并行化的一种方法是将数据分成分区。如果我们有一台具有八个 CPU 核心的计算机,我们可以将数据分成八个分区,每个分区包含 1.25 亿个数字。以下是并行执行相同任务的伪代码:
# Run this in parallel across 8 CPU cores
part.results <- run.in.parallel(serialmax(data.part))
# Compute global max
global.max <- serialmax(part.results)
此伪代码并行运行了八个serialmax()实例——每个数据分区一个,以在每个分区中找到局部最大值。一旦所有分区都已被处理,算法通过在局部最大值中找到最大值来找到全局最大值。这个并行算法之所以有效,是因为数据集的全局最大值必须是所有分区局部最大值中的最大值。
下图以图示方式描述了数据并行。数据并行算法背后的关键是每个数据分区可以独立于其他分区进行处理,并且所有分区的结果可以合并来计算最终结果。这与 Hadoop 的 MapReduce 框架的机制相似。数据并行允许算法随着数据量的增加而轻松扩展——当数据集增加更多数据时,可以将更多计算节点添加到集群中,以处理新的数据分区。

数据并行
其他可以以数据并行方式运行的计算和算法示例包括:
-
逐元素矩阵运算,如加法和减法: 矩阵可以被分区,并且操作应用于每一对分区。
-
均值: 每个分区中的元素总和和元素数量可以相加,以找到全局总和和元素数量,从而可以计算均值。
-
K-means 聚类: 在数据分区之后,K 个质心被分配到所有分区。在分区中并行且独立地执行寻找最近的质心操作。通过首先并行计算各自的成员的总和和计数,然后在一个单一过程中合并它们来更新质心。
-
使用分区算法进行频繁项集挖掘: 在第一次遍历中,从每个数据分区中挖掘频繁项集以生成全局候选项集集;在第二次遍历中,从每个分区中汇总候选项集的支持度以过滤掉全局不频繁的项集。
另一类主要的并行性是任务并行性,其中任务被分配到不同的处理器上并行执行。每个处理器上的任务可能是相同的或不同的,它们作用的数据也可能是相同的或不同的。任务并行性与数据并行性的关键区别在于数据没有被分成分区。一个任务并行算法的例子是在相同的数据上执行相同任务的随机森林模型的训练。随机森林是一组独立在同一数据上构建的决策树。在特定树的训练过程中,选择数据的一个随机子集作为训练集,并且在每个树的分支上考虑的变量也是随机选择的。因此,尽管使用了相同的数据,但树彼此不同。为了训练一个包含 100 个决策树的随机森林,可以将工作量分配给一个具有 100 个处理器的计算集群,每个处理器构建一棵树。所有处理器都在相同的数据(或数据的精确副本)上执行相同的任务,但数据没有被分区。
并行任务也可以不同。例如,对同一组数据计算一组汇总统计量可以通过任务并行方式完成。每个进程可以分配计算不同的统计量——平均值、标准差、分位数等等。
一个任务并行算法的伪代码可能看起来像这样:
# Run 4 tasks in parallel across 4 cores
for (task in tasks)run.in.parallel(task)
# Collect the results of the 4 tasks
results <- collect.parallel.output()
# Continue processing after all 4 tasks are complete
实现数据并行算法
几个 R 包允许代码并行执行。R 自带的parallel包为其他包中的大多数并行计算能力提供了基础。让我们通过一个例子看看它是如何工作的。
这个例子涉及到查找匹配正则表达式的文档。正则表达式匹配是一个相对计算密集型的任务,这取决于正则表达式的复杂性。这个语料库,或文档集合,是来自 tm 包的 Reuters-21578 数据集关于公司收购(acq)主题的一个样本。因为这个数据集只包含 50 个文档,所以它们被复制了 100,000 次,形成一个包含 500 万个文档的语料库,这样并行化代码将导致执行时间的有意义节省。
library(tm)
data("acq")
textdata <- rep(sapply(content(acq), content), 1e5)
任务是找到匹配正则表达式 \d+(,\d+)? mln dlrs 的文档,该表达式表示百万美元的货币金额。在这个正则表达式中,\d+ 匹配一个或多个数字的字符串,而 (,\d+)? 可选地匹配一个逗号后跟一个或多个数字。例如,字符串 12 mln dlrs、1,234 mln dlrs 和 123,456,789 mln dlrs 将匹配这个正则表达式。首先,我们将测量使用 grepl() 逐个查找这些文档的执行时间:
pattern <- "\\d+(,\\d+)? mln dlrs"
system.time(res1 <- grepl(pattern, textdata))
## user system elapsed
## 65.601 0.114 65.721
接下来,我们将修改代码以并行运行,并在具有四个 CPU 核心的计算机上测量执行时间:
library(parallel)
detectCores()
## [1] 4
cl <- makeCluster(detectCores())
part <- clusterSplit(cl, seq_along(textdata))
text.partitioned <- lapply(part, function(p) textdata[p])
system.time(res2 <- unlist(parSapply(cl, text.partitioned, grepl, pattern = pattern)
))
## user system elapsed
## 3.708 8.007 50.806
stopCluster(cl)
在此代码中,detectCores()函数揭示了在执行此代码的机器上可用的 CPU 核心数量。在运行任何并行代码之前,调用makeCluster()来创建一个包含所有四个 CPU 核心的本地处理节点集群。然后使用clusterSplit()函数将语料库分成四个分区,以确定语料库的理想分割方式,使得每个分区具有大致相同的文档数量。
在语料库的每个分区上实际执行grepl()的并行化是由parSapply()函数完成的。集群中的每个处理节点都会收到它应该处理的数据分区的副本,以及要执行的代码和其他运行代码所需的变量(在这种情况下,是pattern参数)。当所有四个处理节点完成其任务后,结果会以类似于sapply()的方式合并。
最后,通过调用stopCluster()来销毁集群。
小贴士
在生产代码中确保始终调用stopCluster()是一个好的实践,即使执行过程中发生错误也是如此。这可以通过以下方式完成:
doSomethingInParallel <- function(...) {cl <- makeCluster(...)on.exit(stopCluster(cl))# do something
}
在这个例子中,在四个处理器上并行运行任务导致执行时间减少了 23%。这并不与执行任务所使用的计算资源量成比例;如果有四个 CPU 核心在处理任务,一个完全可并行化的任务可能会体验到高达 75%的运行时间减少。然而,请记住阿姆达尔定律——并行代码的速度受限于串行部分,这包括并行化的开销。在这种情况下,使用默认参数调用makeCluster()创建了一个基于套接字的集群。当创建这样的集群时,会运行 R 的额外副本作为工作进程。工作进程通过网络套接字与主 R 进程通信,因此得名。工作进程的 R 进程使用已加载的相关包初始化,并将数据分区序列化并发送到每个工作进程。这些开销可能相当大,尤其是在需要将大量数据传输到工作进程的数据并行算法中。
注意
除了parSapply()之外,parallel还提供了parApply()和parLapply()函数;这些函数分别类似于标准的sapply()、apply()和lapply()函数。此外,parLapplyLB()和parSapplyLB()函数提供了负载均衡,这在每个并行任务执行时间可变时很有用。最后,parRapply()和parCapply()是矩阵的并行行和列apply()函数。
在非 Windows 系统上,parallel 支持另一种类型的集群,这种集群通常产生的开销更少——分叉集群。在这些集群中,新的工作进程是从父 R 进程中分叉出来的,并带有数据的副本。然而,除非子进程修改了数据,否则数据实际上并不会在内存中复制。这意味着,与基于套接字的集群相比,初始化子进程更快,内存使用率通常更低。
使用分叉集群的另一个优点是,parallel 提供了一种方便且简洁的方式来通过 mclapply()、mcmapply() 和 mcMap() 函数在这些集群上运行任务。 (这些函数以 mc 开头,因为它们最初是 multicore 包的一部分) 没有必要显式地创建和销毁集群,因为这些函数会自动完成这些操作。我们只需调用 mclapply() 并通过 mc.cores 参数指定要分叉的工作进程数量:
system.time(res3 <- unlist(mclapply(text.partitioned, grepl, pattern = pattern,mc.cores = detectCores())
))
## user system elapsed
## 127.012 0.350 33.264
这显示了与串行版本相比,执行时间减少了 49%,与基于套接字的集群并行化相比减少了 35%。在这个例子中,分叉集群提供了最佳性能。
注意
由于系统配置的不同,当你在自己的环境中尝试本章的示例时,可能会看到非常不同的结果。当你开发并行代码时,在最终运行的环境中类似的环境中测试代码是非常重要的。
实现任务并行算法
现在我们来看看如何使用基于套接字和分叉集群两种方式实现任务并行算法。我们将探讨如何在集群的工作进程中运行相同的任务和不同的任务。
在集群的工作进程中运行相同的任务
为了演示如何在集群上运行相同的任务,这个例子中的任务是生成 5 亿个泊松随机数。我们将通过使用 L'Ecuyer 的组合多重递归生成器来完成这项工作,这是唯一支持在并行中生成随机数的基 R 随机数生成器。随机数生成器是通过调用 RNGkind() 函数选择的。
注意
我们不能在并行中使用任何随机的数生成器,因为数据的随机性取决于生成随机数据的算法以及每个并行任务给出的种子值。大多数其他算法都没有设计成在多个并行流中生成随机数,可能会产生多个高度相关的数字流,或者更糟,多个相同的数字流!
首先,我们将测量串行算法的执行时间:
RNGkind("L'Ecuyer-CMRG")
nsamples <- 5e8
lambda <- 10
system.time(random1 <- rpois(nsamples, lambda))
## user system elapsed
## 51.905 0.636 52.544
要在集群上生成随机数,我们首先将任务均匀地分配给工作者。在下面的代码中,整数向量samples.per.process包含每个工作者在四核 CPU 上需要生成的随机数的数量。seq()函数产生ncores+1个在0和nsamples之间均匀分布的数字,第一个数字是0,接下来的ncores个数字表示工作者进程之间的近似累积样本数。round()函数将这些数字四舍五入为整数,diff()函数计算它们之间的差值,从而给出每个工作者进程应生成的随机数数量。
cores <- detectCores()
cl <- makeCluster(ncores)
samples.per.process <-diff(round(seq(0, nsamples, length.out = ncores+1)))
在我们能够在集群上生成随机数之前,每个工作者需要不同的种子,以便从中生成随机数流。在运行任务之前需要在所有工作者上设置种子,以确保所有工作者生成不同的随机数。
对于基于套接字的集群,我们可以调用clusterSetRNGStream()来设置工作者的种子,然后在集群上运行随机数生成任务。当任务完成后,我们调用stopCluster()来关闭集群:
clusterSetRNGStream(cl)
system.time(random2 <- unlist(parLapply(cl, samples.per.process, rpois,lambda = lambda)
))
## user system elapsed
## 5.006 3.000 27.436
stopCluster(cl)
在基于套接字的集群中使用四个并行进程可以将执行时间减少 48%。对于这个例子,这种类型集群的性能优于数据并行示例,因为需要复制到工作者进程中的数据更少——只有一个整数,表示要生成多少个随机数。
接下来,我们在派生集群上运行相同的任务(再次强调,在 Windows 上不支持此操作)。当mc.set.seed参数设置为TRUE时,mclapply()函数可以为我们设置每个工作者的随机数种子,我们不需要调用clusterSetRNGStream()。否则,代码与基于套接字的集群类似:
system.time(random3 <- unlist(mclapply(samples.per.process, rpois,lambda = lambda,mc.set.seed = TRUE, mc.cores = ncores)
))
## user system elapsed
## 76.283 7.272 25.052
在我们的测试机器上,派生集群的执行时间略快,但接近基于套接字的集群,这表明这两种类型集群的此任务开销相似。
在集群的工作者上运行不同的任务
到目前为止,我们已经在每个并行进程中执行了相同的任务。并行包还允许不同的任务在不同的工作者上执行。在这个例子中,任务不仅是要生成泊松随机数,还要生成均匀、正态和指数随机数。和之前一样,我们首先测量执行此任务串行所需的时间:
RNGkind("L'Ecuyer-CMRG")
nsamples <- 5e7
pois.lambda <- 10
system.time(random1 <- list(pois = rpois(nsamples,pois.lambda),unif = runif(nsamples),norm = rnorm(nsamples),exp = rexp(nsamples)))
## user system elapsed
## 14.180 0.384 14.570
为了在基于套接字的集群的不同工作者上运行不同的任务,必须将函数调用及其相关参数传递给parLapply()。这有点繁琐,但遗憾的是parallel包没有提供更简单的接口来在基于套接字的集群上运行不同的任务。在下面的代码中,函数调用以列表的形式表示,其中每个子列表的第一个元素是在工作者上运行的函数的名称,第二个元素包含函数参数。使用do.call()函数来调用给定参数的函数。
cores <- detectCores()
cl <- makeCluster(cores)
calls <- list(pois = list("rpois", list(n = nsamples,lambda = pois.lambda)),unif = list("runif", list(n = nsamples)),norm = list("rnorm", list(n = nsamples)),exp = list("rexp", list(n = nsamples)))
clusterSetRNGStream(cl)
system.time(random2 <- parLapply(cl, calls,function(call) {do.call(call[[1]], call[[2]])})
)
## user system elapsed
## 2.185 1.629 10.403
stopCluster(cl)
在非 Windows 机器上的分叉集群中,mcparallel()和mccollect()函数提供了更直观的方式来在不同的工作者上运行不同的任务。对于每个任务,mcparallel()将给定的任务发送到可用的工作者。一旦所有工作者都分配了任务,mccollect()将等待工作者完成任务并从所有工作者收集结果。
mc.reset.stream()
system.time({jobs <- list()jobs[[1]] <- mcparallel(rpois(nsamples, pois.lambda),"pois", mc.set.seed = TRUE)jobs[[2]] <- mcparallel(runif(nsamples),"unif", mc.set.seed = TRUE)jobs[[3]] <- mcparallel(rnorm(nsamples),"norm", mc.set.seed = TRUE)jobs[[4]] <- mcparallel(rexp(nsamples),"exp", mc.set.seed = TRUE)random3 <- mccollect(jobs)
})
## user system elapsed
## 14.535 3.569 7.972
注意,我们还需要调用mc.reset.stream()来设置每个工作者中随机数生成的种子。当我们使用mclapply()时,这并不是必需的,因为mclapply()会为我们调用mc.reset.stream()。然而,mcparallel()不会这样做,因此我们需要自己调用它。
在计算机集群上并行执行任务
通过使用parallel包,我们不仅限于在单台计算机上运行并行代码;我们还可以在计算机集群上运行。这允许执行更大的计算任务,无论我们使用数据并行还是任务并行。只能使用基于套接字的集群来完成此目的,因为进程不能在不同的计算机上分叉。
有许多方法可以将计算机集群设置为与 R 一起工作。为了简化问题,集群中的所有计算机都应该有相同的 R 配置——相同的 R 版本、安装在同一目录下、使用相同版本的任何所需包,并且运行在相同的操作系统上。本节中的示例已在运行 Ubuntu 14.04 的三个计算机集群上测试过——一个主节点和两个工作者节点。
主节点和工作节点应在同一网络中,并且能够通过 SSH(端口 22)和另一个端口(用于交换数据和代码)相互通信。此通信端口可以通过R_PARALLEL_PORT环境变量设置。如果未设置,R 将随机选择 11000 到 11999 范围内的一个端口。
默认情况下,SSH 用于在工作者节点上启动 R。首先,请确保 SSH 服务器已在所有工作者节点上设置并运行。
对于 Windows 工作节点,请从www.cygwin.com下载并安装 Cygwin。当提示安装附加包时,请安装openssh包。然后,右键单击Cygwin图标并选择以管理员身份运行。在打开的终端窗口中,运行以下代码以设置 SSH 服务器。ssh-host-config命令使用默认设置配置 SSH 服务器。chmod 400命令设置生成的安全密钥的权限,使得只有拥有密钥的用户才能读取它们,而cygrunsrv -S sshd命令启动 SSH 服务器。
$ ssh-host-config -y -c "tty ntsec"
$ chmod 400 /etc/ssh_*_key
$ cygrunsrv -S sshd
其他操作系统如 Linux 通常自带安装的 SSH 服务器。请查阅您操作系统的文档了解如何设置。
主节点应该能够通过基于密钥的认证连接到工作节点,因为使用基于密码的认证来运行集群可能并不总是有效。请使用以下命令设置基于密钥的认证。Windows 用户应在 Cygwin 终端中运行此命令。
# Run all these commands on the master node
# Generate an RSA key pair without password
$ ssh-keygen -t rsa
$ chmod 400 .ssh/id_rsa
# Copy public key to worker node (run for every worker)
$ ssh-copy-id -i .ssh/id_rsa.pub worker_username@worker_address
# Test connection (run for every worker)
$ ssh worker_username@worker_address
# You should be able to log in without entering a password
一旦所有计算机都设置完毕,我们就可以像之前一样在集群上运行并行任务。唯一需要更改的是调用makeCluster()的地方,此时必须提供工作节点的 IP 地址或域名,而不是创建本地工作进程的数量。在以下示例中,将 IP 地址替换为您主节点和工作节点的 IP 地址。
workers <- c("192.168.213.225", "192.168.213.226")
nworkers <- length(workers)
cl <- makeCluster(workers, master = "192.168.213.138")
注意
如果您在使用makeCluster()自动启动集群时遇到困难,请在调用makeCluster()时添加manual=TRUE参数,然后按照给出的说明操作,以在每个工作节点上启动工作进程。
将任务发送给工作进程执行的相关代码与之前相同:
clusterSetRNGStream(cl)
samples.per.process <- c(2.5e8, 2.5e8)
lambda <- 10
random <- unlist(parLapply(cl, samples.per.process,function(n, lambda) rpois(n, lambda),lambda)
)
stopCluster(cl)
由于计算机集群必须通过网络进行通信,因此网络连接的带宽和延迟在整个集群的性能中起着至关重要的作用。最好将节点放置在同一位置,通过高速网络连接,以便主节点和工作节点之间可以快速交换数据和代码。
共享内存与分布式内存并行处理
在我们迄今为止看到的示例中,数据是从主进程或节点复制到每个工作进程的。这被称为分布式内存并行处理,其中每个进程都有自己的内存空间。换句话说,每个进程都需要拥有它需要处理的数据的副本,即使多个进程正在处理相同的数据。这是在计算机集群中分配数据的典型方式,因为集群中的工作进程无法访问彼此的 RAM,因此它们需要自己的数据副本。
然而,当你在单台计算机上的多个进程中运行并行代码时,这可能会导致巨大的冗余。如果一个数据集占用 5 GB 的内存,那么运行四个并行进程可能会导致内存中有五个数据副本——一个用于主进程,四个用于工作进程——总共占用 25 GB。之前我们看到,派生的集群可能不会遇到这个问题,因为大多数操作系统只有在数据被工作进程修改时才会复制内存中的数据。然而,这并不保证。在基于套接字的集群中,因为创建了新的 R 实例,所以每个工作进程都会为数据创建新的副本。
与共享内存并行处理进行对比,所有工作进程共享数据的一个副本。这不仅节省了内存,还减少了初始化和关闭集群所需的时间,因为不需要复制数据。
尽管默认情况下parallel包不支持共享内存并行处理,但我们可以通过使用合适的数据结构来实现。一个例子是我们在上一章(当时在 Windows 上不可用)中了解到的big.matrix,它来自bigmemory包。在第七章《使用有限 RAM 处理大型数据集》中,我们使用big.matrix的内存映射文件功能;在本章中,我们将利用它作为并行工作者的共享内存对象。除了在磁盘上以内存映射文件的形式存在外,big.matrix对象还可以是完全内存对象,其行为与标准 R 矩阵相同。关键区别在于,big.matrix对象不是根据我们在第六章《简单调整以减少 RAM 使用》中考察的常规 R 复制对象规则进行复制。相反,只有在调用deepcopy()时才会进行复制。让我们看看这在实践中是什么样子。首先,我们将创建一个big.matrix对象a,然后创建一个新的变量b,它指向a。
library(bigmemory)
a <- big.matrix(3, 3)
a[, ]
## [,1] [,2] [,3]
## [1,] NA NA NA
## [2,] NA NA NA
## [3,] NA NA NA
b <- a
b[, ]
## [,1] [,2] [,3]
## [1,] NA NA NA
## [2,] NA NA NA
## [3,] NA NA NA
接下来,我们将修改b的内容。根据 R 的正常数据复制规则,数据应该在内存中复制,以便不修改a的内容。然而,情况并非如此:
b[, ] <- diag(3)
b[, ]
## [,1] [,2] [,3]
## [1,] 1 0 0
## [2,] 0 1 0
## [3,] 0 0 1
a[, ]
## [,1] [,2] [,3]
## [1,] 1 0 0
## [2,] 0 1 0
## [3,] 0 0 1
很明显,a和b是同一个对象。查看它们指向数据指针的内部确认了这一点:
a
## An object of class "big.matrix"
## Slot "address":
## <pointer: 0x7fab5e2b8750>
b
## An object of class "big.matrix"
## Slot "address":
## <pointer: 0x7fab5e2b8750>
现在,让我们看看这如何影响并行代码的性能。此示例使用一个包含两个变量和 5000 万个观测值的矩阵以及等效的big.matrix:
r <- 5e7
m <- matrix(rnorm(r * 2), r, 2)
bm <- as.big.matrix(m)
此处的任务是计算每对数字的绝对差值。首先,我们将使用基于套接字的集群在矩阵上测量执行时间:
cl <- makeCluster(detectCores())
part <- clusterSplit(cl, seq_len(r))
system.time(res <- unlist(parLapply(cl, part,function(part, data) {abs(data[part, 1] - data[part, 2])},m)
))
## user system elapsed
## 5.199 1.856 10.590
stopCluster(cl)
在四个 CPU 核心上耗时 10.6 秒,每个线程消耗了 1.01 GB 的 RAM,如下面的截图所示,该截图来自 Mac OS X 的活动监视器:

使用基于套接字的集群使用矩阵数据的内存消耗
现在,让我们使用big.matrix来看看速度和内存效率是否有差异。为了将big.matrix传递给每个工进程,我们需要使用describe()将big.matrix的元数据传递给每个进程。在每个进程中,必须调用attach.big.matrix()来访问big.matrix。注意,在函数内部调用了library(bigmemory)。这是必需的,因为每个工进程都是一个新的 R 进程,因此运行任务所需的任何包都必须在工进程中加载。
cl <- makeCluster(detectCores())
system.time(res2 <- unlist(parLapply(cl, part,function(part, data.desc) {library(bigmemory)data <- attach.big.matrix(data.desc)abs(data[part, 1] - data[part, 2])},describe(bm))
))
## user system elapsed
## 1.278 0.692 2.956
stopCluster(cl)
这个版本运行得更快,执行时间节省了 72%,因为没有复制矩阵!此外,每个 R 进程仅占用大约 373 MB 的内存,如下图中Private Mem列所示。774 MB 的内存是从父进程共享的,其中大部分是big.matrix对象。

使用 big.matrix 数据时派生集群的内存消耗。
在这个案例中,共享内存并行工作是因为工进程只从数据中读取,而没有写入。设计写入共享内存的并行算法要复杂得多,超出了本书的范围。必须非常小心,以避免竞争条件,这是当从和写入相同内存位置的工进程没有适当协调时出现的冲突和编程错误。这可能导致数据损坏。
优化并行性能
在本章的示例中,我们看到了影响并行代码性能的各种因素。
运行并行 R 代码的一个开销是在设置集群时。默认情况下,makeCluster()指示工进程在启动时加载methods包。这可能会花费相当多的时间,所以如果要运行的任务不需要methods,可以通过将methods=FALSE传递给makeCluster()来禁用此行为。
并行性能的最大障碍之一是主进程和工进程之间数据的复制和传输。当你在计算机集群上运行并行任务时,这个障碍可能会很大,因为许多因素,如有限的网络带宽和数据加密,甚至在没有进行任何计算之前就会减慢数据的传输速度。即使在单台计算机上,内存中不必要的复制数据也会占用宝贵的秒数,随着数据的增长,这些秒数会成倍增加。这种情况也可能反过来发生,例如在随机数生成示例中,输入数据很小,但输出数据很大。
一种减少这些数据通信开销的方法是使用共享内存对象,正如我们在上一节所看到的。在某些情况下,数据压缩也有帮助,前提是压缩和解压缩数据所需的计算时间相对较短。另一种选择是在每个工作节点上存储数据,包括任何中间计算的结果,并将节点间数据通信仅限于协调任务所需的范围。这种方法的例子是 Hadoop 中的 MapReduce,我们将在第十章“第十章。R 和大数据”中探讨。
尽管有方法可以最小化数据通信的成本,但有时这些开销远远超过了并行化的收益,因此我们最好按顺序运行代码。计算并行化代码的性能提升和增加的开销之间的权衡可能很困难。当不确定时,可以像本章所做的那样进行小规模的实验。
摘要
在本章中,我们学习了两种并行处理类别:数据并行和任务并行。数据并行适用于可以在数据集的各个分区上并行执行的任务。要处理的数据集被分割成多个分区,每个分区在不同的工作进程中进行处理。另一方面,任务并行将一组相似或不同的任务分配给工作进程。在两种情况下,Amdahl 定律指出,通过并行化代码所能实现的最大速度提升受限于可以并行化的代码比例。
R 使用parallel包支持这两种并行处理类型。我们学习了如何使用基于套接字的集群和派生集群实现数据并行和任务并行算法。我们还学习了如何在计算机集群上使用基于套接字的集群并行运行任务。
本章中的示例表明,通过并行化代码所获得的性能提升取决于众多因素——集群的类型、任务是在单台计算机上运行还是在集群上运行、节点间交换的数据量、单个子任务的复杂性等等。虽然像共享内存并行这样的技术可以缓解一些瓶颈,但并行计算是一个复杂的学科,需要丰富的经验和技能才能有效执行。如果使用得当,速度和效率的回报可以非常显著。
要深入了解 R 中的并行计算,请参阅 Q. Ethan McCallum 和 Stephen Weston 合著的《Parallel R》。
在下一章中,我们将超越 R 的边界,利用专门的数据处理平台(如分析数据库)的处理能力。
第九章:将数据处理任务卸载到数据库系统
我们已经学习了多种优化 R 代码性能的方法,以实现速度和内存效率。但有时仅使用 R 是不够的。也许,一个非常大的数据集存储在数据仓库中。将所有数据提取到 R 中进行处理是不切实际的。我们甚至可能希望利用专门设计的分析数据库的强大功能,这些数据库可以比 R 更高效地执行计算。在本章中,我们将学习如何从 R 内部利用外部数据库系统的功能,并将这种功能与 R 语言的灵活性和易用性相结合。
本章涵盖了以下内容:
-
将数据提取到 R 中与在数据库中处理数据
-
使用 SQL 在关系型数据库中预处理数据
-
将 R 表达式转换为 SQL
-
在数据库中运行统计和机器学习算法
-
使用列式数据库以提高性能
-
使用数组数据库以实现最大的科学计算性能
将数据提取到 R 中与在数据库中处理数据
大多数 R 程序员熟悉且非常舒适地使用 R 数据结构和包来操作数据。这需要将所有数据移动到 R 中,无论是在内存中还是在磁盘上,在单台计算机上还是在集群上。在某些情况下,这可能不是高效的,特别是如果数据不断变化且需要经常更新——每次分析时从数据库或数据仓库中提取数据需要大量的时间和计算资源。在某些情况下,可能根本无法将数以兆计的数据从其来源移动到 R 中。
与将数据移动到 R 中不同,另一种方法是将计算任务移动到数据所在的位置。换句话说,我们可以在数据库中处理数据,并将结果仅检索到 R 中,这些结果通常比原始数据小得多。这减少了传输数据所需的网络带宽以及处理 R 中数据所需的本地存储和内存。它还允许 R 程序员利用专为大型数据集上的分析工作负载而设计的强大数据库。
为了在数据库中执行计算和分析,需要一套新的工具。所有数据库工具的基础是 SQL 语言,大多数关系型数据库都支持 SQL。虽然本书不是关于 SQL 的,但了解如何在数据库中运行简单的 SQL 语句可以帮助加快 R 中许多任务的执行速度。其他工具,如dplyr,建立在 SQL 的基础上,提供易于使用且熟悉的接口,例如类似数据框的对象,以便在数据库中操作数据。还有其他工具,如 MonetDB.R 和 SciDB,使我们能够利用专为高性能分析工作负载(如列式和数组数据库)而设计的数据库。在接下来的几节中,我们将探讨这些工具。
使用 SQL 在关系型数据库中预处理数据
我们将首先学习如何在 R 中运行 SQL 语句,最初的几个示例将展示如何在数据库中处理数据而不是将所有数据移动到 R 中,即使是简单的操作也能带来更快的性能。
要运行本章中的示例,你需要一个 R 支持的数据库服务器。CRAN 包RJDBC提供了一个接口,用于大多数数据库附带的 JDBC 驱动程序。或者,在 CRAN 上搜索提供针对每个数据库特定功能和优化的包,如RPostgreSQL、RMySQL和ROracle。
以下示例基于 PostgreSQL 数据库和RPostgreSQL包,因为我们将在本章学习PivotalR包和 MADlib 软件时需要它们。不过,你可以自由地将代码适配到你使用的数据库。
配置 PostgreSQL 与 R 一起工作涉及设置服务器和客户端。首先,我们需要设置 PostgreSQL 数据库服务器。这可以是在运行 R 的不同计算机上,以模拟从 R 中访问现有数据库;或者它也可以在同一台计算机上以简化操作。在我们的案例中,我们将设置一个 Linux 虚拟机来托管 PostgreSQL 数据库服务器,并使用 Mac OS X 作为客户端。以下是设置数据库服务器的步骤:
-
从
www.postgresql.org/download/下载 PostgreSQL,并遵循你操作系统的安装说明。 -
通过将以下命令行添加到
pg_hba.conf(在 PostgreSQL 的data文件夹中)来在数据库服务器上启用用户名/密码认证:host all all 0.0.0.0/0 md5 -
通过运行以下命令行创建一个用户账户和密码,该账户和密码可以用于从 R 连接到数据库(你可能需要作为
root或postgres用户来运行此命令):$ createuser --pwprompt ruser -
通过运行以下命令行创建用于本章示例的数据库(你可能需要作为
root或postgres用户来运行此命令):$ createdb --owner=ruser rdb -
通过将以下行添加到
postgresql.conf(在 PostgreSQL 的data文件夹中)来确保数据库可以通过运行 R 的计算机的网络连接访问:listen_address = '*' port = 5432 -
重启 PostgreSQL 服务器以使更改生效(你可能需要作为
root用户来完成此操作)。
接下来,我们将通过在运行 R 的计算机上安装RPostgreSQL包来设置客户端:
-
非 Windows 系统:安装
libpq,PostgreSQL 的 C 库,这是安装RPostgreSQL所需的。如果你已经在同一台计算机上安装了 PostgreSQL 服务器,那么库已经存在于系统中,因此你可以跳过此步骤。否则,请确保库的版本与 PostgreSQL 服务器的版本相匹配:# On Mac OS X (using Homebrew) $ brew install postgresql # On Debian / Ubuntu $ sudo apt-get install libpq-dev # On Redhat / CentOS $ sudo yum install postgresql-devel # On Windows: this step is not needed -
运行 R 并从其源代码安装
RPostgreSQLCRAN 包:# On platforms other than Windows install.packages("RPostgreSQL", type="source") # On Windows install.packages("RPostgreSQL") -
通过用你数据库的正确信息替换细节来测试 R 中的数据库连接:
library(RPostgreSQL) db.drv <- PostgreSQL() db.conn <- dbConnect(db.drv, host = "hostname",port = 5432, dbname = "rdb",user = "ruser",password = "rpassword") dbListTables(db.conn) ## character(0)
一旦数据库设置完成,我们将为后续的示例生成一些销售数据。示例数据库有两个表,sales和trans_items。sales表包含关于零售连锁店销售交易的信息,包括交易 ID、客户 ID 和商店 ID。trans_items表记录了每个交易中的单个商品以及每个商品的总价格。一旦在 R 中生成了数据,我们将使用dbWriteTable()将数据写入数据库中的新表,如下所示:
ntrans <- 1e5
ncust <- 1e4
nstore <- 100
sales <- data.frame(trans_id = seq_len(ntrans),cust_id = sample.int(ncust, ntrans, TRUE),store_id = sample.int(nstore, ntrans, TRUE))
dbWriteTable(db.conn, "sales", sales)
trans.lengths <- rpois(ntrans, 3) + 1L
trans.items <- data.frame(trans_id = rep.int(seq_len(ntrans), trans.lengths),item_id = unlist(lapply(trans.lengths, sample.int, n = 1000)),price = exp(rnorm(sum(trans.lengths))))
dbWriteTable(db.conn, "trans_items", trans.items)
第一个任务是计算每个商店的总销售额。让我们比较两种不同的方法。第一种方法是通过连接sales和trans_items表,提取所有商店 ID 以及与每个商店相关的商品价格。一旦这些数据在 R 中,每个商店的销售额是通过使用tapply()对每个商店 ID 的商品价格进行求和来计算的。计算相同数据的第二种方法是使用 SQL 的GROUP BY子句和SUM()函数在数据库中执行聚合。我们将使用microbenchmark()来比较两种方法的执行时间:
library(microbenchmark)
microbenchmark({res <- dbGetQuery(db.conn,'SELECT store_id, priceFROM sales INNER JOIN trans_items USING (trans_id);')res <- tapply(res$price, res$store_id, sum)
}, times = 10)
## Unit: milliseconds
## min lq median uq max neval
## 740.7533 745.2563 771.3706 775.3665 780.3819 10
microbenchmark({res <- dbGetQuery(db.conn,'SELECT store_id, SUM(price) as total_salesFROM sales INNER JOIN trans_items USING (trans_id)GROUP BY store_id;')
}, times = 10)
## Unit: milliseconds
## min lq median uq max neval
## 244.779 248.6401 251.1465 255.3652 279.6666 10
在这个简单的测试中,在数据库中执行计算只需要用 R 提取数据执行相同操作所需时间的 33%。让我们看看另一个例子。第二个任务是获取花费最多钱的前十位客户的列表,按金额降序排列。同样,我们将比较在 R 中执行计算与在数据库中执行计算的速率:
microbenchmark({res <- dbGetQuery(db.conn,'SELECT cust_id, priceFROM sales INNER JOIN trans_items USING (trans_id);')res <- tapply(res$price, res$cust_id, sum)res <- sort(res, decreasing = TRUE)res <- head(res, 10L)
}, times = 10)
## Unit: milliseconds
## min lq median uq max neval
## 814.2492 828.7774 843.1869 846.4235 952.1318 10
microbenchmark({res <- dbGetQuery(db.conn,'SELECT cust_id, SUM(price) as spendingFROM sales INNER JOIN trans_items USING (trans_id)GROUP BY cust_idORDER BY spending DESCLIMIT 10;')
}, times = 10)
## Unit: milliseconds
## min lq median uq max neval
## 259.1621 260.5494 260.9566 265.1368 294.1732 10
再次,在数据库中而不是在 R 中运行计算导致执行时间减少了 70%。
一旦我们完成,我们需要从数据库断开连接:
dbDisconnect(db.conn)
这些测试是在同一台计算机上进行的,数据库服务器运行在虚拟机中。即使在如此小的数据集和非常小的网络(主机计算机和虚拟机之间的虚拟网络)上,性能差异也非常明显。这些测试清楚地表明,最小化从数据库中复制的数据量可以提供巨大的性能提升。在更大的数据集和强大的分析数据库上,性能差异可能更加明显。
将 R 表达式转换为 SQL
虽然 SQL 是一种强大且灵活的语言,用于在数据库中操作数据,但并非每个人都能熟练掌握它。幸运的是,R 社区已经开发了一些包,可以将熟悉的 R 语法转换为在数据库上执行的 SQL 语句。我们将探讨其中的两个——dplyr和PivotalR。
使用 dplyr
dplyr包是一个方便的包,旨在允许使用标准操作和转换来操作类似表的数据,无论数据存储在哪里——在数据框、数据表或数据库中。它支持 SQLite、PostgreSQL、MySQL、Amazon RedShift、Google BigQuery 和 MonetDB 数据库。
dplyr 包提供了一种方法,可以在不实际在数据库服务器上执行计算的情况下指定要执行的一组操作,通过调用 collect() 函数来指示 R 执行。通过将几个操作组合在一起(而不是逐个执行),数据库服务器可以优化执行。这反过来有助于最小化服务器的计算负载。让我们通过一个例子看看它是如何工作的。
首先,我们需要与数据库建立连接,就像之前一样。在这里,我们将使用 dplyr 提供的 src_postgres() 函数。语法与 RPostgreSQL 的 dbConnect() 略有不同,但参数相似。建立连接后,我们将使用 tbl() 函数在数据库中创建对 sales 和 trans_items 表的引用:
library(dplyr)
db.conn <- src_postgres(dbname = "rdb", host = "hostname",port = 5432, user = "ruser",password = "rpassword")
sales.tb <- tbl(db.conn, "sales")
trans_items.tb <- tbl(db.conn, "trans_items")
让我们使用 dplyr 重新创建之前的例子:
joined.tb <- inner_join(sales.tb, trans_items.tb, by = "trans_id")
cust.items <- group_by(joined.tb, cust_id)
cust.spending <- summarize(cust.items, spending = sum(price))
cust.spending <- arrange(cust.spending, desc(spending))
cust.spending <- select(cust.spending, cust_id, spending)
第一步是使用 inner_join() 将 sales 和 trans_items 表连接起来。然后,group_by() 根据客户 ID 对项目进行分组,summarize() 对每个客户的总消费进行求和。最后,我们将使用 arrange() 按消费金额降序排序客户,并使用 select() 选择我们想要的列。
每个这些步骤的输出都是一个 tbl 对象:
class(cust.spending)
## [1] "tbl_postgres" "tbl_sql" "tbl"
这些是虚拟表,是迄今为止应用的所有操作的累积。到目前为止,尚未向数据库服务器发送 SQL 语句,也没有在其上执行计算。我们可以通过检索 tbl 对象的 query 成员来检查在检索结果时将执行的 SQL 查询:
cust.spending$query
## <Query> SELECT "cust_id" AS "cust_id", "spending" AS "spending"
## FROM (SELECT "cust_id", SUM("price") AS "spending"
## FROM (SELECT "row.names" AS "row.names.x", "trans_id" AS
## "trans_id", "cust_id" AS "cust_id", "store_id" AS "store_id"
## FROM "sales") AS "nsthygziij"
##
## INNER JOIN
##
## (SELECT "row.names" AS "row.names.y", "trans_id" AS "trans_id",
## "item_id" AS "item_id", "price" AS "price"
## FROM "trans_items") AS "cuwpqadrgf"
##
## USING ("trans_id")
## GROUP BY "cust_id") AS "_W8"
## ORDER BY "spending" DESC
## <PostgreSQLConnection:(11726,2)>
通常,collect() 函数用于运行 SQL 语句并检索结果:
custs.by.spending <- collect(cust.spending)
由于我们只想获取前 10 名客户而不是所有客户,我们可以使用 head() 来最小化从数据库传输到 R 的数据:
top.custs <- head(cust.spending, 10L)
随着在 dplyr 中构建更复杂的数据操作,单个 R 语句和临时变量可能会变得难以管理。dplyr 包提供了 %>% 操作符来链接操作。前面的结构可以更简洁地重写为:
top.custs <-sales.tb %>% inner_join(trans_items.tb, by = "trans_id") %>%group_by(cust_id) %>%summarize(spending = sum(price)) %>%arrange(desc(spending)) %>%select(cust_id, spending) %>%head(10L)
dplyr 包提供了其他有用的操作,如 filter() 用于过滤行,以及 mutate() 用于定义新列作为现有列的函数。这些操作可以以许多创造性和有用的方式组合起来,在将结果检索到 R 之前在数据库中处理数据。
使用 PivotalR
PivotalR 包提供了与 dplyr 类似的特性,但语法不同。因为它是由 Pivotal Software Inc. 开发的,所以它只支持 PostgreSQL 或 Pivotal (Greenplum) 数据库。
如同往常,使用该包的第一步是建立与数据库的连接:
library(PivotalR)
db.conn <- db.connect(host = "hostname", port = 5432,dbname = "rdb", user = "ruser",password = "rpassword")
注意
如果你还没有在 PostgreSQL 数据库上安装 MADlib(请参阅本章下一节),你可能会收到一个警告,提示“数据库中不存在 MADlib。”在本节中的示例不会遇到这个问题,因为它们不涉及 MADlib 函数。
下一步是使用 db.data.frame() 创建对数据库表的引用:
sales.tb <- db.data.frame("sales", db.conn)
trans_items.tb <- db.data.frame("trans_items", db.conn)
db.data.frame 对象在许多方面与标准的 R 数据框类似,但它们是需要在数据库上执行的 SQL 查询的包装器。许多标准的 R 信息和统计函数都得到了支持。为了执行 SQL 并检索结果,请使用 lookat() 函数(或简写 lk())。例如:
dim(sales.tb)
## [1] 1e+05 4e+00
names(sales.tb)
## [1] "row.names" "trans_id" "cust_id" "store_id"
lookat(count(sales.tb$cust_id))
## [1] 1e+05
lookat(min(trans_items.tb$price))
## [1] 0.009554177
lookat(max(trans_items.tb$price))
## [1] 121.3909
要查看将在数据库服务器上执行的 SQL 查询,请使用 content() 方法:
content(max(trans_items.tb$price))
## [1] "select max(\"price\") as \"price_max\"
## from \"trans_items\""
注意
如果你收到“无效的 SciDB 对象”错误消息,这可能意味着一些 PivotalR 函数被 SciDB 包中同名函数遮蔽了,我们将在本章后面讨论这个问题。特别是,这两个包都提供了 count() 函数。为了成功运行本节中的示例,请使用 detach("package:scidb", unload=TRUE) 卸载 scidb 包。
可以通过使用熟悉的 R 语法从现有列计算新列,而不会影响数据库中的数据;相反,转换被转换为在数据库上实时计算新列的 SQL 函数。在以下示例中,我们将计算一个新的列 foreign_price,该列返回到 R 的内存中,而不是存储在数据库中:
trans_items.tb$foreign_price <- trans_items.tb$price * 1.25
content(trans_items.tb)
## [1] "select \"row.names\" as \"row.names\", \"trans_id\" as
## \"trans_id\", \"item_id\" as \"item_id\", \"price\" as
## \"price\", (\"price\") * (1.25) as \"foreign_price\" from
## \"trans_items\""
让我们来看一个如何在 PivotalR 中构建查询的完整示例。比如说,我们想要计算一些统计信息,以了解交易层面的消费者购买模式。我们必须按交易分组数据,然后再按客户分组,以计算每个客户的统计信息:
trans <- by(trans_items.tb["price"], trans_items.tb$trans_id, sum)
sales.value <- merge(sales.tb[c("trans_id", "cust_id","store_id")],trans, by = "trans_id")
cust.sales <- by(sales.value, sales.value$cust_id,function(x) {trans_count <- count(x$trans_id)total_spend <- sum(x$price_sum)stores_visited <- count(x$store_id)cbind(trans_count, total_spend,stores_visited)})
names(cust.sales) <- c("cust_id", "trans_count", "total_spend","stores_visited")
lookat(cust.sales, 5)
## cust_id trans_count total_spend stores_visited
## 1 1 9 44.73121 9
## 2 2 7 41.90196 7
## 3 3 13 87.37564 13
## 4 4 11 58.34653 11
## 5 5 15 95.09015 15
by() 的第一次调用将项目级别的销售数据聚合到交易中;计算每个交易的总价值。然后,merge() 将 sales 表与聚合的交易数据合并,以匹配客户及其花费的金额。然后,我们将再次使用 by() 来按客户聚合所有交易。对于每个客户,我们将计算他们进行的交易数量、这些交易的总价值以及他们访问的店铺数量。
除了返回结果外,还可以使用 as.db.data.frame() 将它们存储到新的数据库表中。这对于涉及许多中间步骤的长时间计算非常有用。将中间结果存储在数据库中有助于减少 R 和数据库之间传输的数据量。
cust_sales.tb <- as.db.data.frame(cust.sales, "cust_sales")
可以从中间数据中计算更多的统计信息,例如客户消费的最小值、最大值、平均值和标准差:
lookat(min(cust_sales.tb$total_spend))
## [1] 0.4961619
lookat(max(cust_sales.tb$total_spend))
## [1] 227.8077
lookat(mean(cust_sales.tb$total_spend))
## [1] 66.16597
lookat(sd(cust_sales.tb$total_spend))
## [1] 26.71887
当中间数据不再需要时,可以从数据库中删除它:
delete(cust_sales.tb)
dplyr 和 PivotalR 都提供了灵活且简单的方法,使用 R 函数和语法在数据库中操作数据。它们使我们能够利用高性能数据库的处理能力和速度来查询大型数据集,并将查询结果集成到 R 中的其他分析中。由于它们在功能上相当相似,选择两者之间的区别主要取决于与现有数据库系统的兼容性以及个人对某种语法而非另一种语法的偏好。
在数据库中运行统计和机器学习算法
到目前为止,本章中的示例已在数据库中的数据上执行了简单的计算。有时我们需要执行比这更复杂的计算。一些数据库供应商已经开始将高级统计或甚至机器学习功能构建到他们的数据库产品中,允许这些高级算法在数据库中以高度优化的代码运行,以实现最佳性能。在本章中,我们将探讨一个开源项目 MADlib (madlib.net/),其开发由 Pivotal Inc. 支持,它将高级统计和机器学习功能带给 PostgreSQL 数据库。
MADlib 为 PostgreSQL 添加了大量统计功能,包括描述性统计、假设检验、数组算术、概率函数、降维、线性模型、聚类模型、关联规则和文本分析。新模型和统计方法不断添加到库中,以扩展其功能。
注意
目前,MADlib 二进制文件仅适用于 Mac OS X 和 Red Hat/CentOS Linux。对于其他操作系统,github.com/madlib/madlib/wiki/Building-MADlib-from-Source 提供了从源代码构建 MADlib 的说明。截至写作时,MADlib 不支持 Windows。
在安装 MADlib 之前,请确保已安装 PostgreSQL 的 plpython 模块。在 Redhat/CentOS 上,通过替换与 PostgreSQL 版本匹配的包名来运行此命令:
$ yum install postgresql93-plpython
在 Mac OS X 上,检查您的 PostgreSQL 安装方法的文档。例如,使用 Homebrew,以下命令安装带有 plpython 支持的 PostgreSQL:
$ brew install postgresql --with-python
一旦使用 plpython 设置了 PostgreSQL,请按照 github.com/madlib/madlib/wiki/Installation-Guide 中的说明安装 MADlib。用于安装 MADlib 的用户账户需要超级用户权限,这可以通过在 PostgreSQL 中运行 ALTER ROLE ruser WITH SUPERUSER; 来授予。
现在,返回 R 并使用 RPostgreSQL 包连接到 PostgreSQL:
db.drv <- PostgreSQL()
db.conn <- dbConnect(db.drv, host = "hostname",port = 5432, dbname = "rdb",user = "ruser",password = "rpassword")
假设我们想要挖掘我们的销售数据库中的关联规则。正如你从第六章中记得的,《减少内存使用的小技巧》,arules包提供了挖掘频繁项集和关联规则的函数。为了使用arules包,整个trans_items表需要提取到 R 中并转换为transactions对象。如果数据集很大,这可能会花费很长时间,或者根本不可能完成。
或者,我们可以使用 MADlib 函数在数据库中挖掘关联规则。数据根本不需要从数据库中复制出来,只要数据库服务器或集群有足够的容量,所有计算都可以在数据库中完成。
运行关联规则挖掘算法就像在 SQL SELECT语句中调用madlib.assoc_rules()函数一样简单:
dbGetQuery(db.conn,"SELECT *FROM madlib.assoc_rules(0.001, -- support0.01, -- confidence'trans_id', -- tid_col'item_id', -- item_col'trans_items', -- input_table'public', -- output_schemaTRUE -- verbose);")
## INFO: finished checking parameters
## CONTEXT: PL/Python function "assoc_rules"
## INFO: finished removing duplicates
## CONTEXT: PL/Python function "assoc_rules"
## # Output truncated
## INFO: 6 Total association rules found. Time: 0.00557494163513
## CONTEXT: PL/Python function "assoc_rules"
## output_schema output_table total_rules total_time
## 1 public assoc_rules 6 00:01:21.860964
前面的代码中包含了描述madlib.assoc_rules()参数的注释。在这里,算法被要求搜索支持至少为 0.001 和置信度至少为 0.01 的关联规则。指定了输入表和列的名称,以及可以存储结果的模式的名称。在这种情况下,结果将存储在public模式中名为assoc_rules的表中。
每次函数运行时,assoc_rules表将被覆盖;所以如果你想要保留结果的一个副本,你必须复制该表。
让我们检索结果,即满足最小支持和置信度的关联规则:
dbGetQuery(db.conn,'SELECT * FROM assoc_rules;')
## ruleid pre post support confidence lift conviction
## 1 1 {353} {656} 1e-04 0.02272727 5.516328 1.019040
## 2 2 {656} {353} 1e-04 0.02427184 5.516328 1.020366
## 3 3 {770} {420} 1e-04 0.02444988 6.022137 1.020901
## 4 4 {420} {770} 1e-04 0.02463054 6.022137 1.021059
## 5 5 {755} {473} 1e-04 0.02469136 6.203859 1.021236
## 6 6 {473} {755} 1e-04 0.02512563 6.203859 1.021619
结果表明每个关联规则左右两侧的项目,以及每个规则的统计数据,如支持、置信度、提升和确信度。
大多数其他 MADlib 函数以类似的方式工作——数据以数据库表的形式提供给函数,使用适当的参数调用函数,并将结果写入指定模式的新数据库表。
注意
因为 Pivotal, Inc.开发了PivotalR包和 MADlib,所以PivotalR自然提供了对一些 MADlib 函数的接口,例如线性模型、ARIMA 时间序列模型和决策树。它还提供了从 MADlib 输出中提取信息的有用函数,例如回归系数。不幸的是,PivotalR并没有提供所有 MADlib 函数的包装器,例如前面代码中使用的madlib.assoc_rules()函数。为了在 MADlib 库中使用最大的灵活性,请使用 SQL 语句调用 MADlib 函数。
像 MADlib 这样的数据库内分析库允许我们利用大型数据库中高级分析的力量,并将算法的结果带入 R 进行进一步分析和处理。
使用列式数据库提高性能
大多数关系型数据库使用基于行的数据存储架构——数据按行存储在数据库中。每当数据库执行查询时,它都会在处理查询之前检索查询相关的相关行。这种架构非常适合业务交易用途,其中完整的记录(即包括所有列)一次写入、读取、更新或删除。然而,对于大多数统计或分析用例,通常需要读取多行数据,通常只有少数列。因此,基于行的数据库在分析任务中有时效率不高,因为它们一次读取整个记录,而不管实际分析需要多少列。以下图展示了基于行的数据库如何计算一列的和。

在基于行的数据库中计算一列的和
近年来,对数据分析平台的需求增加导致了使用替代存储架构的数据库的发展,这些架构针对数据分析进行了优化,而不是针对业务交易。这种架构之一是列式存储。列式数据库按列存储数据,而不是按行存储。这与 R 数据框非常相似,其中数据框的每一列都存储在内存中的一个连续块中,形式为一个 R 向量。当计算一列的和时,列式数据库只需要读取一列数据,如下面的图所示:

在列式数据库中计算一列的和
列式数据库的一个例子是 MonetDB,可以从www.monetdb.org/Downloads下载。按照那里的说明进行安装。安装后,请按照以下步骤初始化并启动数据库。
在 Linux 或 Mac OS X 上,在终端窗口中运行以下命令:
# Create a new database farm
# (Replace the path with a location of your choice)
$ monetdbd create /path/to/mydbfarm
# Start the database server
$ monetdbd start /path/to/mydbfarm
# Create a new database within the farm
$ monetdb create rdb
# Release the new database from administration locks
$ monetdb release rdb
在 Windows 上,通过转到开始 | 程序 | MonetDB | 启动服务器来初始化和启动服务器。
由于 MonetDB 基于 SQL,因此从 R 连接到 MonetDB 并与 MonetDB 一起工作与使用 PostgreSQL 类似。我们可以使用MonetDB.R CRAN 包执行 SQL 语句或使用dplyr。例如,我们可以使用MonetDB.R将相同的产品销售和交易数据加载到 MonetDB 中:
library(MonetDB.R)
db.drv <- MonetDB.R()
db.conn <- dbConnect(db.drv, host = "hostname",port = 50000, dbname = "rdb",user = "monetdb",password = "monetdb")
dbWriteTable(db.conn, "sales", sales)
dbWriteTable(db.conn, "trans_items", trans.items)
现在,让我们基准测试一下在RPostgreSQL示例中使用的相同 SQL 查询的查询性能:
library(microbenchmark)
microbenchmark({res <- dbGetQuery(db.conn,'SELECT store_id, SUM(price) as total_salesFROM sales INNER JOIN trans_items USING (trans_id)GROUP BY store_id;')
}, times = 10)
## Unit: milliseconds
## min lq median uq max neval
## 112.1666 113.0484 113.9021 114.4349 114.7049 10
microbenchmark({res <- dbGetQuery(db.conn,'SELECT cust_id, SUM(price) as spendingFROM sales INNER JOIN trans_items USING (trans_id)GROUP BY cust_idORDER BY spending DESCLIMIT 10;')
}, times = 10)
## Unit: milliseconds
## min lq median uq max neval
## 114.2376 115.4617 116.515 117.1967 118.4736 10
与 PostgreSQL 相比,MonetDB 在查询(如你之前所记得,PostgreSQL 对前两个查询的中位时间分别为 251.1 毫秒和 260.1 毫秒)上节省了 55% 的时间。当然,这并不是行式数据库和列式数据库之间全面或严格的比较,但它表明,通过选择适合任务的正确数据库架构,可以实现性能的提升。
使用数组数据库实现最大科学计算性能
列式数据库为类似于 R 数据框的数据集提供了良好的查询性能,例如,大多数来自商业 IT 系统的数据。这些数据集通常是二维的,可以包含异构的数据类型。另一方面,科学数据有时包含同质的数据类型,但却是多维的。例如,不同时间和空间点的气象读数。对于此类应用,一种称为数组数据库的新类型数据库提供了更好的查询和科学计算性能。SciDB 是一个例子,可在 www.scidb.org/ 下载。SciDB 提供了一个大规模并行处理(MPP)架构,可以在 PB 级的数组数据上并行执行查询。它支持数据库内线性代数、图操作、线性模型、相关性检验和统计检验。它还通过 CRAN 上的 SciDB 软件包提供了一个 R 接口。
要下载和安装 SciDB,请遵循 github.com/Paradigm4/deployment 中的说明。然后,在 SciDB 服务器上安装 shim (github.com/paradigm4/shim),这是 R 与 SciDB 通信所需的。最后,从 CRAN 安装 scidb 软件包。
使用 scidbconnect() 函数连接到 SciDB 数据库:
library(scidb)
scidbconnect(host = "hostname", port = 8080)
然后,我们可以使用 as.scidb() 将一些数据加载到数据库中:
A <- as.scidb(matrix(rnorm(1200), 40, 30), name = "A")
B <- as.scidb(matrix(rnorm(1200), 30, 40), name = "B")
scidb 提供了熟悉的 R 语法来操作 SciDB 矩阵和数组:
# Matrix multiplication
A %*% B
# Transpose and addition / subtraction
A + t(B)
# Scalar operations
A * 1.5
我们甚至可以将 SciDB 矩阵/数组与 R 矩阵/数组混合使用:
C <- matrix(rnorm(1200), 30, 40)
A %*% C
与其他数据库包一样,操作实际上只有在检索结果时才会执行。在 SciDB 的情况下,[] 操作符会导致数据库执行计算并返回结果:
# Filter only the positive elements of A, and materialize the
# results into R
(A > 0)[]
SciDB 支持许多其他常见的数组/矩阵操作,如子集、比较、过滤、应用、连接、聚合和排序。它是处理大型、多维数值数据的有力工具。
摘要
在本章中,我们游览了各种数据库系统以及允许我们与之交互的 R 包,并看到了如何在数据库中进行查询和分析可以比将数据复制到 R 中进行相同分析提供更好的性能。这对于无法在 R 中轻松处理的大型数据集来说尤其如此;使用针对查询和分析进行优化的数据库可以帮助避免 R 中的性能问题。随着技术的进步,越来越多的先进分析和算法可以在数据库中运行,为面对高效分析大型数据集挑战的 R 程序员提供更多选择。这些强大的数据处理工具可以很好地补充 R——它们提供了分析大型数据集的计算力量,而 R 提供了易于操作和分析的接口。R 还可以帮助将不同的分析线索汇集在一起,无论使用什么工具,都可以使用数据可视化等工具来呈现一个连贯且引人入胜的数据图。
在下一章和最后一章中,我们将进入大数据的前沿,看看 R 如何与大数据工具一起使用来处理极大规模的数据集。
第十章:R 与大数据
我们已经来到了这本书的最后一章,我们将深入探讨大规模数据处理。术语大数据被用来描述在互联网上不断增长的、速度和种类都在增加的数据量,这些数据在连接的设备和许多其他地方被生成。现在,许多组织拥有测量在皮字节(一个皮字节是 1,048,576 千兆字节)以上的海量数据集,比以往任何时候都要多。对于传统的数据处理工具和数据库架构来说,处理和分析大数据极具挑战性。
2005 年,Yahoo!的 Doug Cutting 和 Mike Cafarella 开发了 Hadoop,基于 Google 之前的工作,以解决这些挑战。他们着手开发一个新的数据平台,以高效地处理、索引和查询数十亿网页。有了 Hadoop,以前需要非常昂贵的超级计算机才能完成的工作现在可以在大量廉价的标准化服务器集群上完成。随着数据量的增长,只需简单地向 Hadoop 集群添加更多服务器即可增加存储容量和计算能力。从那时起,Hadoop 及其工具生态系统已经成为最受欢迎的工具套件之一,用于收集、存储、处理和分析大数据集。在本章中,我们将学习如何从 R 中利用 Hadoop 的力量。
本章涵盖了以下主题:
-
理解 Hadoop
-
在 Amazon Web Services 上设置 Hadoop
-
使用 RHadoop 批量处理大数据集
理解 Hadoop
在我们学习如何使用 Hadoop(更多信息请参阅hadoop.apache.org/)和 R 中的相关工具之前,我们需要了解 Hadoop 的基本知识。就我们的目的而言,了解 Hadoop 包含两个关键组件就足够了:Hadoop 分布式文件系统(HDFS)和用于执行数据处理任务的MapReduce框架。Hadoop 还包括许多其他组件,用于任务调度、作业管理等,但在这本书中我们不会关注这些。
如其名所示,HDFS 是一个分布在整个服务器集群上的虚拟文件系统。HDFS 以块的形式存储文件,默认块大小为 128 MB。例如,一个 1 GB 的文件会被分割成八个 128 MB 的块,这些块被分布到集群中的不同服务器上。此外,为了防止由于服务器故障导致的数据丢失,这些块会被复制。默认情况下,它们会被复制三次——集群中每个数据块的副本有三个,每个副本存储在不同的服务器上。这样,即使集群中的一些服务器失败,数据也不会丢失,并且可以被重新复制以确保高可用性。
MapReduce 是一个框架,用于以数据并行的方式处理存储在 HDFS 中的数据。注意数据存储的分布式特性如何使 Hadoop 成为适合我们已在 第八章 通过并行计算提高性能 中了解到的数据并行算法的理想选择——每个工作节点上存储的数据块会同时并行处理,然后每个节点的结果会被合并以产生最终结果。MapReduce 的工作方式与第八章 通过并行计算提高性能 中的数据并行算法非常相似,区别在于数据已经驻留在工作节点上;在运行 R 服务器集群时,每次运行任务都不需要将数据分布,正如 HDFS 和 MapReduce 所做的那样。Map 指的是在每个工作节点上对数据进行计算或映射数据到相应的输出的步骤。Reduce 指的是将工作节点的结果合并或减少到最终结果的过程。
MapReduce 中的数据表示为键值对。每个 MapReduce 操作本质上是从一组键值对到另一组键值对的转换。例如,一个 mapper 可能会从数据库中读取单个客户记录,并生成一个键值对,如 ("Alice", 32),其中包含客户的名称("Alice")作为键,以及她在给定周收集的奖励点(32)作为相应的值。在 map 步骤之后,所有键值对都会根据键进行排序,具有相同键的键值对会被分配给单个 reducer。例如,对于键 "Alice" 的所有键值对会有一个 reducer,对于键 "Bob" 另一个 reducer,对于 "Charlie" 另一个 reducer。reducer 会接受分配给它的所有键值对,对它们进行计算,并以另一个键值对的形式返回结果。
在我们的简单示例中,reducers 可以计算所有客户收集的每周奖励点的平均值。然后 MapReduce 系统收集所有 reducers 的结果作为最终输出,可能类似于 [("Alice", 26.5), ("Bob", 42.3), ("Charlie", 35.6), ...]。
虽然 HDFS 和 MapReduce 是 Hadoop 的基础,但它们并不适合所有数据处理任务。一个关键原因是存储在 HDFS 中的数据位于服务器的硬盘上。每次执行 MapReduce 任务时,数据都必须从磁盘读取,计算结果必须写回磁盘。因此,HDFS 和 MapReduce 对于需要超过读取/写入数据和其他运行 Hadoop 集群开销的计算任务时间较大的批量处理任务表现合理。
在 Amazon Web Services 上设置 Hadoop
设置 Hadoop 集群的方法有很多。我们可以在伪分布式模式下将 Hadoop 安装在单个服务器上以模拟集群,或者在真实的服务器集群或完全分布式模式下的虚拟机上安装。还有几个 Hadoop 发行版可供选择,从 Apache 基金会提供的纯开源版本到商业发行版,如 Cloudera、Hortonworks 和 MapR。涵盖所有不同的 Hadoop 设置方法超出了本书的范围。我们提供了一种设置 Hadoop 以及本章示例中所需的其他相关工具的说明。如果您正在使用现有的 Hadoop 集群或以不同的方式设置,可能需要修改一些步骤。
注意
由于 Hadoop 及其相关工具主要是为基于 Linux/Unix 的操作系统开发的,因此本章中的代码可能在 Windows 上无法运行。如果您是 Windows 用户,请按照本章中的说明在亚马逊网络服务(Amazon Web Services)上设置 Hadoop、R 和所需的软件包。
亚马逊网络服务(AWS)提供一项名为 弹性 MapReduce(EMR)的服务,允许我们按小时租用和运行 Hadoop 集群。创建 Hadoop 集群就像指定集群中的服务器数量、每台服务器的规模以及在每个服务器上设置所需工具的说明一样简单。要设置 AWS 账户,请遵循 前言 中的说明。在本章中运行 AWS 上的示例将产生一些费用,只要 EMR 集群运行,就会产生费用。查看此链接了解最新的 EMR 价格:aws.amazon.com/elasticmapreduce/pricing/。
我们还需要一个脚本,用于在每个服务器上设置所需的工具。将以下脚本保存为 emr-bootstrap.sh。此脚本会在 Hadoop 集群中的每个服务器上安装本章所需的 R 软件包,包括 rhdfs、rmr2 和 R.utils。
#!/bin/bash
# Set unix environment variables
cat << EOF >> $HADOOP_HOME/.bashrc
export HADOOP_CMD=$HADOOP_HOME/bin/hadoop
export HADOOP_STREAMING=$HADOOP_HOME/contrib/streaming/hadoop-streaming.jar
EOF
. $HADOOP_HOME/.bashrc
# Fix hadoop tmp permission
sudo chmod 777 -R /mnt/var/lib/hadoop/tmp
# Install dependencies
sudo yum install -y libcurl-devel
# Install R packages
sudo –E R CMD javareconf
sudo –E R --no-save << EOF
install.packages("R.utils", repos="http://cran.rstudio.com")
EOF
# Install HadoopR dependencies
sudo –E R --no-save << EOF
install.packages(c("bitops", "caTools", "digest", "functional", "plyr", "Rcpp","reshape2", "rJava", "RJSONIO", "stringr"),repos="http://cran.rstudio.com")
EOF
# Install rhdfs package
wget https://raw.githubusercontent.com/RevolutionAnalytics/rhdfs/master/build/rhdfs_1.0.8.tar.gz
sudo -E R CMD INSTALL rhdfs_1.0.8.tar.gz
# Install rmr2 package
wget https://raw.githubusercontent.com/RevolutionAnalytics/rmr2/master/build/rmr2_3.2.0.tar.gz
sudo –E R CMD INSTALL rmr2_3.2.0.tar.gz
将 emr-bootstrap.sh 上传到 AWS 简单存储服务(S3),以便 EMR 服务器在首次运行时可以获取它。为此:
-
前往 AWS 控制台,点击 S3。
-
通过点击 创建存储桶 创建一个新的存储桶来存储脚本。
-
点击刚刚创建的存储桶,然后点击 上传 以上传脚本。
接下来,按照以下步骤创建 Hadoop 集群:
-
前往 AWS 控制台,点击 EMR。
-
点击 创建集群。
-
在 软件配置 下,选择亚马逊 Hadoop 发行版(本章中的示例使用的是亚马逊机器镜像(AMI)版本 3.2.1)。
-
从应用程序列表中删除 Hive 和 Pig,因为它们不是必需的。
-
在硬件配置下,选择 Hadoop 服务器的实例类型。主节点和核心节点的实例类型至少应有 15 GB 的 RAM,例如
m1.xlarge或m3.xlarge实例类型。输入您希望在集群中使用的节点数量。考虑到默认的 HDFS 复制因子为三,至少应有三个核心节点。任务节点是可选的。 -
在安全和访问下,选择用于登录集群的 EC2 密钥对。
-
在引导操作下,选择自定义操作,然后点击配置和添加。在S3 位置下出现的对话框中,输入或浏览上传
emr-bootstrap.sh的 S3 位置。 -
(可选) 在集群配置下启用日志记录,以便将所有 Hadoop 日志自动存储在 S3 存储桶中。要使用此选项,首先创建一个用于存储日志的 S3 存储桶,并在日志文件夹 S3 位置字段中输入存储桶的名称。虽然这是可选的,但存储 Hadoop 日志对于追踪错误和调试很有用,没有日志的话,这些操作可能会很具挑战性,因为执行程序会在 Hadoop 的多个进程和计算机节点上启动。
-
点击创建集群,等待几分钟,直到集群设置完成。
当 EMR 集群准备就绪后,从集群详细信息页面获取主公共 DNS,然后使用您的 AWS EC2 安全密钥从命令行登录到主服务器(将 hadoop.pem 替换为您的密钥名称):
$ ssh –i hadoop.pem root@master-public-dns
登录后,运行 R,它是 EMR 集群预安装的:
$ R
使用 Hadoop 批处理处理大型数据集
批处理是 HDFS 和 MapReduce 可以执行的最基本类型的任务。类似于第八章第八章.通过并行计算提高性能中的数据并行算法,主节点向工作节点发送一组指令,这些节点在其存储的数据块上执行指令。然后,结果被写入 HDFS 的磁盘。
当需要聚合结果时,对数据进行映射和减少步骤。例如,为了计算分布式数据集的平均值,工作节点上的映射器首先计算每个本地数据块中的总和和元素数量。然后,减少器将这些结果相加以计算全局平均值。
在其他时候,如果不需要聚合,则只执行映射步骤。这在数据转换或清理操作中很常见,其中数据只是从一种格式转换到另一种格式。一个例子是从一组文档中提取电子邮件地址。在这种情况下,工作节点上映射器的结果作为新的数据集存储在 HDFS 中,不需要减少器。
R 社区已经开发了几个包来从 R 执行 MapReduce 任务。其中之一是由 Revolution Analytics 开发的 RHadoop 系列包(更多信息请参阅github.com/RevolutionAnalytics/RHadoop)。RHadoop 包括rhdfs包,它提供了在 HDFS 中操作文件和目录的功能,以及rmr2包,它将 MapReduce 的功能作为 R 函数暴露出来。这些函数使得在不使用 Hadoop Java API 编程的情况下使用 MapReduce 变得容易。相反,rmr2在每个工作节点上运行 R 的一个副本,映射器和归约器作为 R 函数编写,以应用于每个数据块。
如果你没有使用前一个部分中的 Hadoop 设置说明,请按照github.com/RevolutionAnalytics/RHadoop/wiki中的rhdfs和rmr2的安装说明进行操作。
将数据上传到 HDFS
首先要做的是将数据放入 HDFS。对于本章,我们将使用 Google Books Ngrams 数据(更多信息请参阅storage.googleapis.com/books/ngrams/books/datasetsv2.html)。在这里,n-gram 是文本中连续出现的单词序列,其中n代表短语中的单词数量——1-gram 只是一个单词(例如,“Batman”),2-gram 是两个连续的单词(例如,“Darth Vader”),而 6-gram 是六个连续的单词(例如,“Humpty Dumpty sat on a wall”)。我们将使用 1-gram 的数据作为我们的示例。
注意
本章的数据集足够大,可以测试 Hadoop 在小集群上的性能,但与许多其他现实世界的数据集相比,它仍然相对较小。例如,ffdf(第七章, 使用有限 RAM 处理大型数据集) 工具可能被用来在单机上处理这个数据集。但当数据量变得很大时,Hadoop 或其他大数据工具可能是处理数据的唯一方式。
以下代码下载 1-gram 数据并将其上传到 HDFS。Google 提供了单独的文件,每个文件包含以字母表中的每个字母开头的单词。在这段代码中,hdfs.init()首先初始化到 HDFS 的连接。然后,hdfs.mkdir()在 HDFS 中创建用于存储数据的目录/ngrams/data。for循环中的代码下载每个文件,解压缩它,并使用hdfs.put()将其上传到 HDFS:
library(rhdfs)
library(R.utils)
hdfs.init()
hdfs.mkdir("/ngrams/data")
files <- paste0("googlebooks-eng-all-1gram-20120701-", letters)
for (f in files) {gzfile <- paste0(f, ".gz")url <- paste0("http://storage.googleapis.com/","books/ngrams/books/",gzfile)download.file(url, destfile = gzfile)gunzip(gzfile)hdfs.put(f, paste0("/ngrams/data/", f))file.remove(f)
}
我们可以检查所有文件是否已成功上传到 HDFS:
hdfs.ls("/ngrams/data")
## permission owner group size modtime
## 1 -rw-r--r-- hadoop supergroup 1801526075 2014-10-05 09:59
## 2 -rw-r--r-- hadoop supergroup 1268392934 2014-10-05 10:00
## 3 -rw-r--r-- hadoop supergroup 2090710388 2014-10-05 10:01
## 4 -rw-r--r-- hadoop supergroup 1252213884 2014-10-05 10:01
## 5 -rw-r--r-- hadoop supergroup 1085415448 2014-10-05 10:02
## file
## 1 /ngrams/data/googlebooks-eng-all-1gram-20120701-a
## 2 /ngrams/data/googlebooks-eng-all-1gram-20120701-b
## 3 /ngrams/data/googlebooks-eng-all-1gram-20120701-c
## 4 /ngrams/data/googlebooks-eng-all-1gram-20120701-d
## 5 /ngrams/data/googlebooks-eng-all-1gram-20120701-e
## $ hdfs dfs -du -h /ngrams
# Output truncated
使用 RHadoop 分析 HDFS 数据
现在数据已加载到 HDFS 中,我们可以使用 MapReduce 来分析数据。比如说,我们想要比较自 1950 年代以来蝙蝠侠与超人的人气。Google Ngrams 数据可能对此提供一些见解。
Ngrams 数据中的每一行都是一个制表符分隔的值列表,以 Ngram 开头,然后是年份,该 Ngram 的出现次数以及该 Ngram 出现在其中的书籍数量。例如,以下命令行表示在 1978 年,单词"mountain"在 Google Books 图书馆的 1,453 本书中出现了 1,435,642 次。
mountain 1978 1435642 1453
要比较蝙蝠侠和超人的人气,我们需要找到代表这两个单词从 1950 年及以后的代码行,并整理出现值。
由于数据由制表符分隔的文本文件组成,我们需要指定输入格式,以便rmr2函数知道如何读取文件。这可以通过使用make.input.format()函数来完成:
library(rmr2)
input.format <- make.input.format(format = "csv", sep = "\t",col.names = c("ngram", "year", "occurrences", "books"),colClasses = c("character", "integer", "integer", "integer"))
对于如逗号分隔值或制表符分隔值的分隔文本文件,make.input.format()接受与read.table()大多数相同的参数。事实上,rmr2使用read.table()将每个数据块读取到一个数据框中。
除了分隔文本文件外,rmr2还可以以原始文本(format = "text")、JSON("json")、R 的内部数据序列化格式("native")、Hadoop SequenceFiles("sequence.typedbytes")、HBase 表("hbase")和 Hive 或 Pig("pig.hive")的形式读取/写入数据。有关这些数据类型的参数,请参阅包文档。
我们分析中的地图步骤涉及过滤每行数据以找到相关记录。我们将定义一个mapper函数,如下面的代码所示,该函数接受一组键和一组值作为参数。由于 Ngrams 数据不包含键,因此keys参数为NULL。values参数是一个包含数据块的数据框。mapper函数查找数据框中包含我们感兴趣的单词的行,对于 1950 年或之后的年份。如果找到任何相关行,则调用keyval()函数以返回将传递给 reduce 函数的键值对。在这种情况下,键是单词,值是对应的年份和出现次数:
mapper <- function(keys, values) {values$ngram <- tolower(values$ngram)superheroes <- values$ngram %in% c("batman", "superman") &values$year >= 1950Lif (any(superheroes)) {keyval(values$ngram[superheroes],values[superheroes, c("year", "occurrences")])}
}
注意
如果你熟悉 MapReduce,你可能已经注意到rmr2允许 mapper 接受和作为列表和表示整个数据块的数据框发出键值对,而不是每次一个记录,就像经典 MapReduce 那样。这可以帮助提高 R 的性能;可以使用向量化 R 操作来处理整个数据块。
下一个步骤发生在幕后,MapReduce 收集所有由 mapper 发出的数据,并按键分组。在这个例子中,它将找到两个组,分别对应于"batman"和"superman"键。然后 MapReduce 调用 reducer 函数一次处理一组数据。
给定特定超级英雄的数据,reducer 的职责是使用tapply()函数按年汇总这位超级英雄名字出现的次数。这是必需的,因为 Ngrams 数据集中的单词是区分大小写的。例如,我们需要将"Batman"、"batman"和"BATMAN"在每个年份出现的次数加起来。然后,reducer 返回超级英雄的名字作为键,以及包含按年汇总的总出现次数的数据框作为值。reducer 的代码如下所示:
reducer <- function(key, values) {val <- tapply(values$occurrences, values$year, sum)val <- data.frame(year = as.integer(names(val)),occurrences = val)keyval(key, val)
}
现在我们已经定义了我们的 mapper 和 reducer 函数,我们可以使用mapreduce()函数执行 MapReduce 作业。我们将调用此函数,指定输入目录和数据格式,结果输出目录,以及 mapper 和 reducer 函数。
job <- mapreduce(input = "/ngrams/data",input.format = input.format,output = "/ngrams/batmanVsuperman",map = mapper, reduce = reducer)
当这个 MapReduce 作业执行时,结果键值对将被写入 HDFS 的/ngrams/batmanVsuperman文件夹。我们可以使用from.dfs()函数从 HDFS 检索结果到 R 对象。此函数返回一个包含两个组件的列表:key和value。在这种情况下,key是一个字符向量,指定了每行数据的超级英雄名字,而val是一个包含相应年份和出现次数的数据框。
results <- from.dfs(job)
batman <- results$val[results$key == "batman", ]
head(batman)
## year occurrences
## 1950 1950 153
## 1951 1951 105
## 1952 1952 173
## 1953 1953 133
## 1954 1954 359
## 1955 1955 150
superman <- results$val[results$key == "superman", ]
head(superman)
## year occurrences
## 1950 1950 1270
## 1951 1951 1130
## 1952 1952 1122
## 1953 1953 917
## 1954 1954 1222
## 1955 1955 1087
让我们绘制结果,以便比较这两位超级英雄在多年来的受欢迎程度:

根据 Google Books,自 1950 年代以来蝙蝠侠与超人的受欢迎程度对比
虽然这两位超级英雄的受欢迎程度在多年稳步上升,但在 20 世纪 70 年代,关于超人被提及的次数有一个有趣的峰值。这可能是因为 1978 年上映的多届奥斯卡获奖电影《超人》,由克里斯托弗·里夫主演。然而,这种受欢迎程度的激增是短暂的。
完成 MapReduce 算法所需的时间取决于数据的大小、任务的复杂性和集群中的节点数。我们使用m1.xlarge AWS 服务器测试了这个示例,每个服务器有 4 个 CPU 和 15 GB 的 RAM,集群大小从 4 到 32 个核心节点(在 EMR 术语中,这些是存储数据并处理数据的节点)。以下图表显示了随着集群中节点数量的增加,执行时间如何减少:

随着集群大小增加的执行时间
由于rmr2在每个 Hadoop 节点上启动一个 R 实例来处理数据,因此 MapReduce 任务的效率取决于 mapper 和 reducer 函数的 R 代码效率。本书中许多用于提高串行 R 程序性能的技术也可以在您设计 mapper 和 reducer 函数时应用。此外,每个 MapReduce 作业都会产生启动新作业、从磁盘读取数据以及在集群中协调作业执行的开销。在可能的情况下,将单个任务组合成可以一次性执行的大 MapReduce 任务,通过减少这些开销来提高整体性能。
一旦你完成对 Hadoop 集群的使用,请记住从 AWS EMR 控制台终止集群,以防止意外收费。
其他用于 R 的 Hadoop 包
尽管本书的范围仅允许我们介绍一些与 Hadoop 接口的 R 包,但社区已经开发了更多包,以将 Hadoop 的力量带给 R。以下是一些可能有用的更多包:
除了rhdfs和rmr2之外,RHadoop 还提供了其他包:
-
plyrmr:它提供了类似于plyr在 MapReduce 上的功能 -
rhbase:它提供了与 HBase 数据一起工作的函数 -
ravro:它提供了在 Avro 格式中读取/写入数据的功能
另一个名为RHIPE(更多信息请参阅www.datadr.org/)的包族提供了类似 MapReduce 的功能,但语法略有不同:
-
RHIPE:此包提供了核心的 HDFS 和 MapReduce 功能 -
datadr:它提供了类似于plyr/dplyr的数据操作功能 -
Trelliscope:它提供了在 HDFS 中可视化大数据集的功能
在撰写本文时,RHIPE不支持 YARN 或 MapReduce 2.0。需要使用较旧版本的 Hadoop 才能使用RHIPE包,直到这个问题得到解决。
另一个名为Segue(更多信息请参阅code.google.com/p/segue/)的包采取了不同的方法。它不提供完整的 MapReduce 功能。相反,它将亚马逊的 EMR 视为计算密集型 R 任务的额外计算资源。这与第八章中提到的集群计算类似,即通过并行计算提高性能,但使用 EMR 作为计算集群。Segue包提供了emrlapply()函数,该函数在 EMR 集群上执行并行lapply操作;这类似于parallel包中的mclapply()。
摘要
在本章中,我们学习了如何在 Amazon Elastic MapReduce 上设置 Hadoop 集群,以及如何使用 RHadoop 系列包来使用 MapReduce 分析 HDFS 中的数据。我们看到了随着更多服务器被添加到 Hadoop 集群中,MapReduce 任务的性能如何显著提高,但最终由于 Amdahl 定律(第八章,通过并行计算提高性能)的限制,性能最终会达到一个极限。
Hadoop 及其工具生态系统正在迅速发展。其他工具正在积极开发中,以使 Hadoop 的性能更加出色。例如,Apache Spark (spark.apache.org/) 提供了弹性分布式数据集 (RDDs),可以在 Hadoop 集群中存储数据。这使得数据可以从 HDFS 中读取一次,并多次使用,从而显著提高数据探索和梯度下降或 k-means 聚类等迭代任务等交互式任务的性能。另一个例子是 Apache Storm (storm.incubator.apache.org/),它允许您处理实时数据流。因为这些工具及其相关的 R 接口正在积极开发中,它们在你阅读这本书的时候可能会发生变化,所以我们决定在这里不包含它们。但如果你有内存分析或实时数据处理等特定需求,它们是值得研究的。
我们已经来到了这本书的结尾。回顾过去,我们探索了整个光谱的技术,旨在提高 R 程序的性能,从优化内存利用率和计算速度,到通过并行编程和集群计算来增加计算能力。在这里我们所涵盖的仅仅是基础;关于编写更高效的 R 代码,还有许多东西需要学习。有其他资源会深入探讨比这里更具体的话题。虽然有时有些晦涩难懂,但阅读包文档总是有用的;有时,了解什么可行唯一的方法就是尝试。当然,还有庞大的 R 用户社区,他们在在线论坛、邮件列表和其他地方,总是乐于提供答案和建议。
我们希望您喜欢这本书,并且从中学到了我们撰写它时所学到的东西。感谢您加入我们的这次旅程,并祝愿您在探索 R 高性能计算的世界中一切顺利。

