1.概述
Apache Spark是一个快速的分布式数据处理系统。它执行内存中的数据处理,并使用内存中的缓存和优化的执行,从而实现了快速的性能。它为流行的编程语言(例如Scala,Python,Java和R)提供了高级API。
在本快速教程中,我们将介绍Spark的三个基本概念:数据帧,数据集和RDD。
2.数据框
从Spark 1.3开始,Spark SQL引入了表格形式的数据抽象,称为DataFrame。从那时起,它已成为Spark中最重要的功能之一。当我们要处理结构化和半结构化的分布式数据时,此API很有用。
在第3节中,我们将讨论弹性分布式数据集(RDD)。 DataFrame以比RDD更有效的方式存储数据,这是因为DataFrame使用RDD的不变,内存,弹性,分布式和并行功能,但它们也将架构应用于数据。 DataFrames还可以将SQL代码转换为优化的低级RDD操作。
我们可以通过三种方式创建DataFrames:
转换现有的RDD
运行SQL查询
加载外部数据
Spark团队SparkSession ,它统一了所有不同的上下文,从而确保开发人员无需担心创建不同的上下文:
SparkSession session = SparkSession.builder() .appName("TouristDataFrameExample") .master("local[*]") .getOrCreate(); DataFrameReader dataFrameReader = session.read();
我们将分析Tourist.csv文件:
Dataset<Row> data = dataFrameReader.option("header", "true") .csv("data/Tourist.csv");
由于Spark 2.0 DataFrame成为Row类型Dataset ,因此我们可以将DataFrame用作**Dataset<Row>** .
我们可以选择感兴趣的特定列。我们还可以对给定的列进行过滤和分组:
data.select(col("country"), col("year"), col("value")) .show(); data.filter(col("country").equalTo("Mexico")) .show(); data.groupBy(col("country")) .count() .show();
3. Datasets
数据集是一组强类型的结构化数据。它们提供了熟悉的面向对象编程风格以及类型安全性的好处,因为数据集可以在编译时检查语法并捕获错误。
Dataset是DataFrame的扩展,因此我们可以将DataFrame视为数据集的无类型视图。
Spark团队Dataset API,正如他们提到的那样:“ Spark Datasets的目标是提供一个API,使用户可以轻松地表达对象域上的转换,同时还提供Spark SQL执行的性能和鲁棒性优势。引擎”。
首先,我们需要创建一个类型TouristData的类:
public class TouristData { private String region; private String country; private String year; private String series; private Double value; private String footnotes; private String source; // ... getters and setters }
要将每个记录映射到指定的类型,我们将需要使用编码器。编码器在Java对象和Spark的内部二进制格式之间进行转换:
// SparkSession initialization and data load Dataset<Row> responseWithSelectedColumns = data.select(col("region"), col("country"), col("year"), col("series"), col("value").cast("double"), col("footnotes"), col("source")); Dataset<TouristData> typedDataset = responseWithSelectedColumns .as(Encoders.bean(TouristData.class));
与DataFrame一样,我们可以按特定的列进行过滤和分组:
typedDataset.filter((FilterFunction) record -> record.getCountry() .equals("Norway")) .show(); typedDataset.groupBy(typedDataset.col("country")) .count() .show();
我们还可以进行操作,例如按列匹配特定范围进行过滤或计算特定列的总和,以获取其总值:
typedDataset.filter((FilterFunction) record -> record.getYear() != null && (Long.valueOf(record.getYear()) > 2010 && Long.valueOf(record.getYear()) < 2017)).show(); typedDataset.filter((FilterFunction) record -> record.getValue() != null && record.getSeries() .contains("expenditure")) .groupBy("country") .agg(sum("value")) .show();
4. RDD
弹性分布式数据集或RDD是Spark的主要编程抽象。它表示元素的集合,这些元素是不可变的,有弹性的和分布式的。
一个RDD封装了一个大型数据集,Spark将自动在整个集群中分布RDD中包含的数据,并使我们对它们执行的操作并行化。
我们只能通过稳定存储中的数据操作或其他RDD上的操作来创建RDD。
当我们处理大量数据并且数据分布在群集计算机上时,容错能力至关重要。由于Spark内置的故障恢复机制,因此RDD具有弹性。 Spark依赖于以下事实:RDD会记住它们的创建方式,以便我们可以轻松地追溯沿袭来恢复分区。
我们可以对RDD执行两种类型的操作: Transformations和Actions 。
4.1 转变
我们可以将转换应用于RDD以操纵其数据。执行完此操作后,我们将获得全新的RDD,因为RDD是不可变的对象。
我们将检查如何实现Map和Filter,这是两种最常见的转换。
首先,我们需要创建一个JavaSparkContext Tourist.csv文件中将数据作为RDD加载:
SparkConf conf = new SparkConf().setAppName("uppercaseCountries") .setMaster("local[*]"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> tourists = sc.textFile("data/Tourist.csv");
接下来,让我们应用map函数从每个记录中获取国家的名称,并将名称转换为大写。我们可以将此新生成的数据集保存为磁盘上的文本文件:
JavaRDD<String> upperCaseCountries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1].toUpperCase(); }).distinct(); upperCaseCountries.saveAsTextFile("data/output/uppercase.txt");
如果只想选择一个特定国家/地区,则可以对原始游客RDD应用过滤功能:
JavaRDD<String> touristsInMexico = tourists .filter(line -> line.split(COMMA_DELIMITER)[1].equals("Mexico")); touristsInMexico.saveAsTextFile("data/output/touristInMexico.txt");
4.2 动作
在对数据进行一些计算之后,动作将返回最终值或将结果保存到磁盘。
Spark中经常使用的两个动作是Count和Reduce。
让我们在CSV文件中计算国家总数:
// Spark Context initialization and data load JavaRDD<String> countries = tourists.map(line -> { String[] columns = line.split(COMMA_DELIMITER); return columns[1]; }).distinct(); Long numberOfCountries = countries.count();
现在,我们将按国家/地区计算总支出。我们需要过滤描述中包含支出的记录。
如果不使用的JavaRDD ,我们将使用JavaPairRDD 。一对RDD是可以存储键值对的一种RDD 。接下来让我们检查一下:
JavaRDD<String> touristsExpenditure = tourists .filter(line -> line.split(COMMA_DELIMITER)[3].contains("expenditure")); JavaPairRDD<String, Double> expenditurePairRdd = touristsExpenditure .mapToPair(line -> { String[] columns = line.split(COMMA_DELIMITER); return new Tuple2<>(columns[1], Double.valueOf(columns[6])); }); List<Tuple2<String, Double>> totalByCountry = expenditurePairRdd .reduceByKey((x, y) -> x + y) .collect();
5.结论
综上所述,当我们需要特定于域的API,需要聚合,求和或SQL查询等高级表达式时,应使用DataFrames或Datasets。或者,当我们想要在编译时进行类型安全时。
另一方面,当数据是非结构化的并且不需要实现特定的架构时,或者在需要低级的转换和操作时,我们应该使用RDD。
0 评论