Hướng dẫn này giới thiệu nhanh về cách sử dụng Spark. Trước tiên, chúng tôi sẽ giới thiệu API thông qua shell tương tác của Spark [bằng Python hoặc Scala], sau đó trình bày cách viết ứng dụng bằng Java, Scala và Python
Để làm theo hướng dẫn này, trước tiên, hãy tải xuống bản phát hành đóng gói của Spark từ trang web Spark. Vì chúng tôi sẽ không sử dụng HDFS, bạn có thể tải xuống gói cho bất kỳ phiên bản Hadoop nào
Lưu ý rằng, trước Spark 2. 0, giao diện lập trình chính của Spark là Bộ dữ liệu phân tán đàn hồi [RDD]. Sau tia lửa 2. 0, RDD được thay thế bằng Bộ dữ liệu, được đánh máy mạnh mẽ giống như RDD, nhưng với khả năng tối ưu hóa phong phú hơn. Giao diện RDD vẫn được hỗ trợ và bạn có thể tham khảo chi tiết hơn tại hướng dẫn lập trình RDD. Tuy nhiên, chúng tôi thực sự khuyên bạn nên chuyển sang sử dụng Bộ dữ liệu, có hiệu suất tốt hơn RDD. Xem hướng dẫn lập trình SQL để biết thêm thông tin về Dataset
Khái niệm cơ bản
Shell của Spark cung cấp một cách đơn giản để tìm hiểu API, cũng như một công cụ mạnh mẽ để phân tích dữ liệu một cách tương tác. Nó có sẵn trong Scala [chạy trên máy ảo Java và do đó là một cách tốt để sử dụng các thư viện Java hiện có] hoặc Python. Bắt đầu nó bằng cách chạy phần sau trong thư mục Spark
Sự trừu tượng hóa chính của Spark là một tập hợp các mục phân tán được gọi là Bộ dữ liệu. Bộ dữ liệu có thể được tạo từ Hadoop InputFormats [chẳng hạn như tệp HDFS] hoặc bằng cách chuyển đổi Bộ dữ liệu khác. Hãy tạo một Bộ dữ liệu mới từ văn bản của tệp README trong thư mục nguồn Spark
scala> val textFile = spark.read.textFile["README.md"]
textFile: org.apache.spark.sql.Dataset[String] = [value: string]
Bạn có thể nhận trực tiếp các giá trị từ Tập dữ liệu, bằng cách gọi một số hành động hoặc chuyển đổi Tập dữ liệu để nhận một tập dữ liệu mới. Để biết thêm chi tiết, vui lòng đọc tài liệu API
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
Bây giờ, hãy chuyển đổi Bộ dữ liệu này thành một bộ dữ liệu mới. Chúng tôi gọi
scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
9 để trả về một Tập dữ liệu mới với một tập hợp con các mục trong tệpscala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
Chúng ta có thể xâu chuỗi các chuyển đổi và hành động lại với nhau
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
Hoặc nếu PySpark được cài đặt với pip trong môi trường hiện tại của bạn
Sự trừu tượng hóa chính của Spark là một tập hợp các mục phân tán được gọi là Bộ dữ liệu. Bộ dữ liệu có thể được tạo từ Hadoop InputFormats [chẳng hạn như tệp HDFS] hoặc bằng cách chuyển đổi Bộ dữ liệu khác. Do tính chất động của Python, chúng tôi không cần Bộ dữ liệu được gõ mạnh bằng Python. Do đó, tất cả Bộ dữ liệu trong Python đều là Bộ dữ liệu [Hàng] và chúng tôi gọi nó là
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
0 để phù hợp với khái niệm khung dữ liệu trong Pandas và R. Hãy tạo một DataFrame mới từ văn bản của tệp README trong thư mục nguồn Spark>>> textFile = spark.read.text["README.md"]
Bạn có thể nhận trực tiếp các giá trị từ DataFrame, bằng cách gọi một số hành động hoặc chuyển đổi DataFrame để nhận giá trị mới. Để biết thêm chi tiết, vui lòng đọc tài liệu API
>>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
Bây giờ, hãy chuyển đổi DataFrame này sang một DataFrame mới. Chúng tôi gọi
scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
9 để trả về một DataFrame mới với một tập hợp con các dòng trong tệp>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
Chúng ta có thể xâu chuỗi các chuyển đổi và hành động lại với nhau
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
0Thông tin thêm về Hoạt động tập dữ liệu
Các hành động và biến đổi tập dữ liệu có thể được sử dụng cho các tính toán phức tạp hơn. Giả sử chúng ta muốn tìm dòng có nhiều từ nhất
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
1Điều này đầu tiên ánh xạ một dòng thành một giá trị số nguyên, tạo một Bộ dữ liệu mới.
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
2 được gọi trên Tập dữ liệu đó để tìm số lượng từ lớn nhất. Các đối số của scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
3 và scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
2 là các ký tự hàm Scala [các bao đóng] và có thể sử dụng bất kỳ tính năng ngôn ngữ hoặc thư viện Scala/Java nào. Ví dụ: chúng ta có thể dễ dàng gọi các hàm được khai báo ở nơi khác. Chúng tôi sẽ sử dụng chức năng scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
5 để làm cho mã này dễ hiểu hơnscala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
6Một mẫu luồng dữ liệu phổ biến là MapReduce, được phổ biến bởi Hadoop. Spark có thể triển khai các luồng MapReduce một cách dễ dàng
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
0Ở đây, chúng tôi gọi
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
6 để chuyển đổi Tập dữ liệu gồm các dòng thành Tập dữ liệu gồm các từ, sau đó kết hợp scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
7 và scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
8 để tính toán số lượng mỗi từ trong tệp dưới dạng Tập dữ liệu gồm các cặp [Chuỗi, Dài]. Để thu thập số lượng từ trong vỏ của chúng tôi, chúng tôi có thể gọi scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
9scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
1scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
2Điều này đầu tiên ánh xạ một dòng tới một giá trị số nguyên và đặt bí danh cho nó là “numWords”, tạo một DataFrame mới.
>>> textFile = spark.read.text["README.md"]
0 được gọi trên DataFrame đó để tìm số lượng từ lớn nhất. Các đối số của >>> textFile = spark.read.text["README.md"]
1 và >>> textFile = spark.read.text["README.md"]
0 đều là Cột, chúng ta có thể sử dụng >>> textFile = spark.read.text["README.md"]
3 để lấy một cột từ Khung dữ liệu. Chúng tôi cũng có thể nhập pyspark. sql. chức năng, cung cấp nhiều chức năng thuận tiện để tạo Cột mới từ cột cũMột mẫu luồng dữ liệu phổ biến là MapReduce, được phổ biến bởi Hadoop. Spark có thể triển khai các luồng MapReduce một cách dễ dàng
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
3Ở đây, chúng tôi sử dụng hàm
>>> textFile = spark.read.text["README.md"]
4 trong >>> textFile = spark.read.text["README.md"]
1, để chuyển đổi Tập dữ liệu gồm các dòng thành Tập dữ liệu từ, sau đó kết hợp >>> textFile = spark.read.text["README.md"]
6 và scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
8 để tính số lượng mỗi từ trong tệp dưới dạng Khung dữ liệu gồm 2 cột. “từ” và “đếm”. Để thu thập số lượng từ trong vỏ của chúng tôi, chúng tôi có thể gọi scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
9scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
4Bộ nhớ đệm
Spark cũng hỗ trợ kéo các tập dữ liệu vào bộ đệm trong bộ nhớ toàn cụm. Điều này rất hữu ích khi dữ liệu được truy cập lặp đi lặp lại, chẳng hạn như khi truy vấn một tập dữ liệu “nóng” nhỏ hoặc khi chạy một thuật toán lặp như PageRank. Ví dụ đơn giản, hãy đánh dấu bộ dữ liệu
>>> textFile = spark.read.text["README.md"]
9 của chúng tôi sẽ được lưu vào bộ đệmscala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
5Có vẻ ngớ ngẩn khi sử dụng Spark để khám phá và lưu trữ tệp văn bản 100 dòng. Điều thú vị là các chức năng tương tự này có thể được sử dụng trên các tập dữ liệu rất lớn, ngay cả khi chúng được chia thành hàng chục hoặc hàng trăm nút. Bạn cũng có thể thực hiện điều này một cách tương tác bằng cách kết nối
>>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
0 với một cụm, như được mô tả trong hướng dẫn lập trình RDDscala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
6Có vẻ ngớ ngẩn khi sử dụng Spark để khám phá và lưu trữ tệp văn bản 100 dòng. Điều thú vị là các chức năng tương tự này có thể được sử dụng trên các tập dữ liệu rất lớn, ngay cả khi chúng được chia thành hàng chục hoặc hàng trăm nút. Bạn cũng có thể thực hiện điều này một cách tương tác bằng cách kết nối
>>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
1 với một cụm, như được mô tả trong hướng dẫn lập trình RDDGiả sử chúng ta muốn viết một ứng dụng độc lập bằng Spark API. Chúng ta sẽ xem qua một ứng dụng đơn giản trong Scala [với sbt], Java [với Maven] và Python [pip]
Chúng ta sẽ tạo một ứng dụng Spark rất đơn giản trong Scala–đơn giản đến mức nó được đặt tên là
>>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
2scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
7Lưu ý rằng các ứng dụng nên xác định phương thức
>>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
3 thay vì mở rộng >>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
4. Các lớp con của >>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
4 có thể không hoạt động chính xácChương trình này chỉ đếm số dòng chứa 'a' và số chứa 'b' trong Spark README. Lưu ý rằng bạn sẽ cần thay thế YOUR_SPARK_HOME bằng vị trí đã cài đặt Spark. Không giống như các ví dụ trước với trình bao Spark khởi tạo SparkSession của chính nó, chúng tôi khởi tạo SparkSession như một phần của chương trình
Chúng tôi gọi
>>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
6 để xây dựng một >>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
7, sau đó đặt tên ứng dụng và cuối cùng gọi >>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
8 để lấy phiên bản >>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
7Ứng dụng của chúng tôi phụ thuộc vào API Spark, vì vậy chúng tôi cũng sẽ bao gồm tệp cấu hình sbt,
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
0, giải thích rằng Spark là một phần phụ thuộc. Tệp này cũng thêm một kho lưu trữ mà Spark phụ thuộc vàoscala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
8Để sbt hoạt động chính xác, chúng ta cần bố trí
>>> textFile.count[] # Number of rows in this DataFrame
126
>>> textFile.first[] # First row in this DataFrame
Row[value=u'# Apache Spark']
2 và >>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
0 theo cấu trúc thư mục điển hình. Khi đã sẵn sàng, chúng ta có thể tạo gói JAR chứa mã của ứng dụng, sau đó sử dụng tập lệnh >>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
3 để chạy chương trình của chúng tascala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
9Ví dụ này sẽ sử dụng Maven để biên dịch JAR ứng dụng, nhưng bất kỳ hệ thống xây dựng tương tự nào cũng sẽ hoạt động
Chúng ta sẽ tạo một ứng dụng Spark rất đơn giản,
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
4scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
0Chương trình này chỉ đếm số dòng chứa 'a' và số chứa 'b' trong Spark README. Lưu ý rằng bạn sẽ cần thay thế YOUR_SPARK_HOME bằng vị trí đã cài đặt Spark. Không giống như các ví dụ trước với trình bao Spark khởi tạo SparkSession của chính nó, chúng tôi khởi tạo SparkSession như một phần của chương trình
Để xây dựng chương trình, chúng tôi cũng viết tệp Maven
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
5 liệt kê Spark dưới dạng phụ thuộc. Lưu ý rằng các tạo phẩm Spark được gắn thẻ với phiên bản Scalascala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
1Chúng tôi sắp xếp các tệp này theo cấu trúc thư mục Maven chuẩn
scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
2Bây giờ, chúng ta có thể đóng gói ứng dụng bằng Maven và thực thi nó với
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
6scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
3Bây giờ chúng tôi sẽ trình bày cách viết một ứng dụng bằng API Python [PySpark]
Nếu bạn đang xây dựng một ứng dụng hoặc thư viện PySpark đóng gói, bạn có thể thêm nó vào thiết lập của mình. tệp py dưới dạng
scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
4Ví dụ, chúng ta sẽ tạo một ứng dụng Spark đơn giản,
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
7scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
5Chương trình này chỉ đếm số dòng chứa 'a' và số chứa 'b' trong một tệp văn bản. Lưu ý rằng bạn sẽ cần thay thế YOUR_SPARK_HOME bằng vị trí đã cài đặt Spark. Như với các ví dụ về Scala và Java, chúng tôi sử dụng SparkSession để tạo Bộ dữ liệu. Đối với các ứng dụng sử dụng các lớp tùy chỉnh hoặc thư viện của bên thứ ba, chúng tôi cũng có thể thêm các phụ thuộc mã vào
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
3 thông qua đối số >>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
9 của nó bằng cách đóng gói chúng vào một. zip [xem scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
00 để biết chi tiết]. scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
01 đủ đơn giản để chúng tôi không cần chỉ định bất kỳ mã phụ thuộc nàoChúng tôi có thể chạy ứng dụng này bằng tập lệnh
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
02scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
6Nếu bạn đã cài đặt PySpark pip vào môi trường của mình [e. g. ,
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
03], bạn có thể chạy ứng dụng của mình bằng trình thông dịch Python thông thường hoặc sử dụng tùy chọn 'spark-submit' được cung cấp tùy ýscala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
7Các công cụ quản lý phụ thuộc khác như Conda và pip cũng có thể được sử dụng cho các lớp tùy chỉnh hoặc thư viện của bên thứ ba. Xem thêm Quản lý gói Python