拨开荷叶行,寻梦已然成。仙女莲花里,翩翩白鹭情。
IMG-LOGO
主页 文章列表 Apache Spark:数据帧,数据集和RDD之间的差异

Apache Spark:数据帧,数据集和RDD之间的差异

白鹭 - 2021-11-08 2253 0 2

1.概述

Apache Spark是一个快速的分布式数据处理系统。它执行内存中的数据处理,并使用内存中的缓存和优化的执行,从而实现了快速的性能。它为流行的编程语言(例如Scala,Python,Java和R)提供了高级API。


在本快速教程中,我们将介绍Spark的三个基本概念:数据帧,数据集和RDD。

2.数据框

从Spark 1.3开始,Spark SQL引入了表格形式的数据抽象,称为DataFrame。从那时起,它已成为Spark中最重要的功能之一。当我们要处理结构化和半结构化的分布式数据时,此API很有用。


在第3节中,我们将讨论弹性分布式数据集(RDD)。 DataFrame以比RDD更有效的方式存储数据,这是因为DataFrame使用RDD的不变,内存,弹性,分布式和并行功能,但它们也将架构应用于数据。 DataFrames还可以将SQL代码转换为优化的低级RDD操作。


我们可以通过三种方式创建DataFrames:

  • 转换现有的RDD

  • 运行SQL查询

  • 加载外部数据

Spark团队SparkSession ,它统一了所有不同的上下文,从而确保开发人员无需担心创建不同的上下文:

SparkSession session = SparkSession.builder()

 .appName("TouristDataFrameExample")

 .master("local[*]")

 .getOrCreate();



 DataFrameReader dataFrameReader = session.read();

我们将分析Tourist.csv文件:

Dataset<Row> data = dataFrameReader.option("header", "true")

 .csv("data/Tourist.csv");

由于Spark 2.0 DataFrame成为Row类型Dataset ,因此我们可以将DataFrame用作**Dataset<Row>** .

我们可以选择感兴趣的特定列。我们还可以对给定的列进行过滤和分组:

data.select(col("country"), col("year"), col("value"))

 .show();



 data.filter(col("country").equalTo("Mexico"))

 .show();



 data.groupBy(col("country"))

 .count()

 .show();

3. Datasets

数据集是一组强类型的结构化数据它们提供了熟悉的面向对象编程风格以及类型安全性的好处,因为数据集可以在编译时检查语法并捕获错误。


Dataset是DataFrame的扩展,因此我们可以将DataFrame视为数据集的无类型视图。


Spark团队Dataset API,正如他们提到的那样:“ Spark Datasets的目标是提供一个API,使用户可以轻松地表达对象域上的转换,同时还提供Spark SQL执行的性能和鲁棒性优势。引擎”。


首先,我们需要创建一个类型TouristData的类:

public class TouristData { private String region; private String country; private String year; private String series; private Double value; private String footnotes; private String source; // ... getters and setters

 }

要将每个记录映射到指定的类型,我们将需要使用编码器。编码器在Java对象和Spark的内部二进制格式之间进行转换:

// SparkSession initialization and data load

 Dataset<Row> responseWithSelectedColumns = data.select(col("region"),

 col("country"), col("year"), col("series"), col("value").cast("double"),

 col("footnotes"), col("source"));



 Dataset<TouristData> typedDataset = responseWithSelectedColumns

 .as(Encoders.bean(TouristData.class));

与DataFrame一样,我们可以按特定的列进行过滤和分组:

typedDataset.filter((FilterFunction) record -> record.getCountry()

 .equals("Norway"))

 .show();



 typedDataset.groupBy(typedDataset.col("country"))

 .count()

 .show();

我们还可以进行操作,例如按列匹配特定范围进行过滤或计算特定列的总和,以获取其总值:

typedDataset.filter((FilterFunction) record -> record.getYear() != null

 && (Long.valueOf(record.getYear()) > 2010

 && Long.valueOf(record.getYear()) < 2017)).show();



 typedDataset.filter((FilterFunction) record -> record.getValue() != null

 && record.getSeries()

 .contains("expenditure"))

 .groupBy("country")

 .agg(sum("value"))

 .show();

4. RDD

弹性分布式数据集或RDD是Spark的主要编程抽象。它表示元素的集合,这些元素是不可变的,有弹性的和分布式的。


一个RDD封装了一个大型数据集,Spark将自动在整个集群中分布RDD中包含的数据,并使我们对它们执行的操作并行化。


我们只能通过稳定存储中的数据操作或其他RDD上的操作来创建RDD。


当我们处理大量数据并且数据分布在群集计算机上时,容错能力至关重要。由于Spark内置的故障恢复机制,因此RDD具有弹性。 Spark依赖于以下事实:RDD会记住它们的创建方式,以便我们可以轻松地追溯沿袭来恢复分区。


我们可以对RDD执行两种类型的操作: Transformations和Actions 。

4.1 转变

我们可以将转换应用于RDD以操纵其数据。执行完此操作后,我们将获得全新的RDD,因为RDD是不可变的对象。


我们将检查如何实现Map和Filter,这是两种最常见的转换。


首先,我们需要创建一个JavaSparkContext Tourist.csv文件中将数据作为RDD加载:

SparkConf conf = new SparkConf().setAppName("uppercaseCountries")

 .setMaster("local[*]");

 JavaSparkContext sc = new JavaSparkContext(conf);



 JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");

接下来,让我们应用map函数从每个记录中获取国家的名称,并将名称转换为大写。我们可以将此新生成的数据集保存为磁盘上的文本文件:

JavaRDD<String> upperCaseCountries = tourists.map(line -> {

 String[] columns = line.split(COMMA_DELIMITER); return columns[1].toUpperCase();

 }).distinct();



 upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");

如果只想选择一个特定国家/地区,则可以对原始游客RDD应用过滤功能:

JavaRDD<String> touristsInMexico = tourists

 .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico"));



 touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");

4.2 动作

在对数据进行一些计算之后,动作将返回最终值或将结果保存到磁盘。


Spark中经常使用的两个动作是Count和Reduce。


让我们在CSV文件中计算国家总数:

// Spark Context initialization and data load

 JavaRDD<String> countries = tourists.map(line -> {

 String[] columns = line.split(COMMA_DELIMITER); return columns[1];

 }).distinct();



 Long numberOfCountries = countries.count();

现在,我们将按国家/地区计算总支出。我们需要过滤描述中包含支出的记录。


如果不使用的JavaRDD ,我们将使用JavaPairRDD 。一对RDD是可以存储键值对的一种RDD 。接下来让我们检查一下:


JavaRDD<String> touristsExpenditure = tourists

 .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure"));



 JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure

 .mapToPair(line -> {

 String[] columns = line.split(COMMA_DELIMITER); return new Tuple2<>(columns[1], Double.valueOf(columns[6]));

 });



 List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd

 .reduceByKey((x, y) -> x + y)

 .collect();

5.结论

综上所述,当我们需要特定于域的API,需要聚合,求和或SQL查询等高级表达式时,应使用DataFrames或Datasets。或者,当我们想要在编译时进行类型安全时。


另一方面,当数据是非结构化的并且不需要实现特定的架构时,或者在需要低级的转换和操作时,我们应该使用RDD。


0 评论

发表评论

您的电子邮件地址不会被公开。 必填的字段已做标记 *