Python viết dòng

Hướng dẫn này giới thiệu nhanh về cách sử dụng Spark. Trước tiên, chúng tôi sẽ giới thiệu API thông qua shell tương tác của Spark [bằng Python hoặc Scala], sau đó trình bày cách viết ứng dụng bằng Java, Scala và Python

Để làm theo hướng dẫn này, trước tiên, hãy tải xuống bản phát hành đóng gói của Spark từ trang web Spark. Vì chúng tôi sẽ không sử dụng HDFS, bạn có thể tải xuống gói cho bất kỳ phiên bản Hadoop nào

Lưu ý rằng, trước Spark 2. 0, giao diện lập trình chính của Spark là Bộ dữ liệu phân tán đàn hồi [RDD]. Sau tia lửa 2. 0, RDD được thay thế bằng Bộ dữ liệu, được đánh máy mạnh mẽ giống như RDD, nhưng với khả năng tối ưu hóa phong phú hơn. Giao diện RDD vẫn được hỗ trợ và bạn có thể tham khảo chi tiết hơn tại hướng dẫn lập trình RDD. Tuy nhiên, chúng tôi thực sự khuyên bạn nên chuyển sang sử dụng Bộ dữ liệu, có hiệu suất tốt hơn RDD. Xem hướng dẫn lập trình SQL để biết thêm thông tin về Dataset

Khái niệm cơ bản

Shell của Spark cung cấp một cách đơn giản để tìm hiểu API, cũng như một công cụ mạnh mẽ để phân tích dữ liệu một cách tương tác. Nó có sẵn trong Scala [chạy trên máy ảo Java và do đó là một cách tốt để sử dụng các thư viện Java hiện có] hoặc Python. Bắt đầu nó bằng cách chạy phần sau trong thư mục Spark

Sự trừu tượng hóa chính của Spark là một tập hợp các mục phân tán được gọi là Bộ dữ liệu. Bộ dữ liệu có thể được tạo từ Hadoop InputFormats [chẳng hạn như tệp HDFS] hoặc bằng cách chuyển đổi Bộ dữ liệu khác. Hãy tạo một Bộ dữ liệu mới từ văn bản của tệp README trong thư mục nguồn Spark

scala> val textFile = spark.read.textFile["README.md"]
textFile: org.apache.spark.sql.Dataset[String] = [value: string]

Bạn có thể nhận trực tiếp các giá trị từ Tập dữ liệu, bằng cách gọi một số hành động hoặc chuyển đổi Tập dữ liệu để nhận một tập dữ liệu mới. Để biết thêm chi tiết, vui lòng đọc tài liệu API

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark

Bây giờ, hãy chuyển đổi Bộ dữ liệu này thành một bộ dữ liệu mới. Chúng tôi gọi

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
9 để trả về một Tập dữ liệu mới với một tập hợp con các mục trong tệp

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]

Chúng ta có thể xâu chuỗi các chuyển đổi và hành động lại với nhau

scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15

Hoặc nếu PySpark được cài đặt với pip trong môi trường hiện tại của bạn

Sự trừu tượng hóa chính của Spark là một tập hợp các mục phân tán được gọi là Bộ dữ liệu. Bộ dữ liệu có thể được tạo từ Hadoop InputFormats [chẳng hạn như tệp HDFS] hoặc bằng cách chuyển đổi Bộ dữ liệu khác. Do tính chất động của Python, chúng tôi không cần Bộ dữ liệu được gõ mạnh bằng Python. Do đó, tất cả Bộ dữ liệu trong Python đều là Bộ dữ liệu [Hàng] và chúng tôi gọi nó là

scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
0 để phù hợp với khái niệm khung dữ liệu trong Pandas và R. Hãy tạo một DataFrame mới từ văn bản của tệp README trong thư mục nguồn Spark

>>> textFile = spark.read.text["README.md"]

Bạn có thể nhận trực tiếp các giá trị từ DataFrame, bằng cách gọi một số hành động hoặc chuyển đổi DataFrame để nhận giá trị mới. Để biết thêm chi tiết, vui lòng đọc tài liệu API

>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']

Bây giờ, hãy chuyển đổi DataFrame này sang một DataFrame mới. Chúng tôi gọi

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
9 để trả về một DataFrame mới với một tập hợp con các dòng trong tệp

>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]

Chúng ta có thể xâu chuỗi các chuyển đổi và hành động lại với nhau

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
0

Thông tin thêm về Hoạt động tập dữ liệu

Các hành động và biến đổi tập dữ liệu có thể được sử dụng cho các tính toán phức tạp hơn. Giả sử chúng ta muốn tìm dòng có nhiều từ nhất

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
1

Điều này đầu tiên ánh xạ một dòng thành một giá trị số nguyên, tạo một Bộ dữ liệu mới.

scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
2 được gọi trên Tập dữ liệu đó để tìm số lượng từ lớn nhất. Các đối số của
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
3 và
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
2 là các ký tự hàm Scala [các bao đóng] và có thể sử dụng bất kỳ tính năng ngôn ngữ hoặc thư viện Scala/Java nào. Ví dụ: chúng ta có thể dễ dàng gọi các hàm được khai báo ở nơi khác. Chúng tôi sẽ sử dụng chức năng
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
5 để làm cho mã này dễ hiểu hơn

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
6

Một mẫu luồng dữ liệu phổ biến là MapReduce, được phổ biến bởi Hadoop. Spark có thể triển khai các luồng MapReduce một cách dễ dàng

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
0

Ở đây, chúng tôi gọi

scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
6 để chuyển đổi Tập dữ liệu gồm các dòng thành Tập dữ liệu gồm các từ, sau đó kết hợp
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
7 và
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
8 để tính toán số lượng mỗi từ trong tệp dưới dạng Tập dữ liệu gồm các cặp [Chuỗi, Dài]. Để thu thập số lượng từ trong vỏ của chúng tôi, chúng tôi có thể gọi
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
9

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
1

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
2

Điều này đầu tiên ánh xạ một dòng tới một giá trị số nguyên và đặt bí danh cho nó là “numWords”, tạo một DataFrame mới.

>>> textFile = spark.read.text["README.md"]
0 được gọi trên DataFrame đó để tìm số lượng từ lớn nhất. Các đối số của
>>> textFile = spark.read.text["README.md"]
1 và
>>> textFile = spark.read.text["README.md"]
0 đều là Cột, chúng ta có thể sử dụng
>>> textFile = spark.read.text["README.md"]
3 để lấy một cột từ Khung dữ liệu. Chúng tôi cũng có thể nhập pyspark. sql. chức năng, cung cấp nhiều chức năng thuận tiện để tạo Cột mới từ cột cũ

Một mẫu luồng dữ liệu phổ biến là MapReduce, được phổ biến bởi Hadoop. Spark có thể triển khai các luồng MapReduce một cách dễ dàng

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
3

Ở đây, chúng tôi sử dụng hàm

>>> textFile = spark.read.text["README.md"]
4 trong
>>> textFile = spark.read.text["README.md"]
1, để chuyển đổi Tập dữ liệu gồm các dòng thành Tập dữ liệu từ, sau đó kết hợp
>>> textFile = spark.read.text["README.md"]
6 và
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
8 để tính số lượng mỗi từ trong tệp dưới dạng Khung dữ liệu gồm 2 cột. “từ” và “đếm”. Để thu thập số lượng từ trong vỏ của chúng tôi, chúng tôi có thể gọi
scala> textFile.filter[line => line.contains["Spark"]].count[] // How many lines contain "Spark"?
res3: Long = 15
9

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
4

Bộ nhớ đệm

Spark cũng hỗ trợ kéo các tập dữ liệu vào bộ đệm trong bộ nhớ toàn cụm. Điều này rất hữu ích khi dữ liệu được truy cập lặp đi lặp lại, chẳng hạn như khi truy vấn một tập dữ liệu “nóng” nhỏ hoặc khi chạy một thuật toán lặp như PageRank. Ví dụ đơn giản, hãy đánh dấu bộ dữ liệu

>>> textFile = spark.read.text["README.md"]
9 của chúng tôi sẽ được lưu vào bộ đệm

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
5

Có vẻ ngớ ngẩn khi sử dụng Spark để khám phá và lưu trữ tệp văn bản 100 dòng. Điều thú vị là các chức năng tương tự này có thể được sử dụng trên các tập dữ liệu rất lớn, ngay cả khi chúng được chia thành hàng chục hoặc hàng trăm nút. Bạn cũng có thể thực hiện điều này một cách tương tác bằng cách kết nối

>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
0 với một cụm, như được mô tả trong hướng dẫn lập trình RDD

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
6

Có vẻ ngớ ngẩn khi sử dụng Spark để khám phá và lưu trữ tệp văn bản 100 dòng. Điều thú vị là các chức năng tương tự này có thể được sử dụng trên các tập dữ liệu rất lớn, ngay cả khi chúng được chia thành hàng chục hoặc hàng trăm nút. Bạn cũng có thể thực hiện điều này một cách tương tác bằng cách kết nối

>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
1 với một cụm, như được mô tả trong hướng dẫn lập trình RDD

Giả sử chúng ta muốn viết một ứng dụng độc lập bằng Spark API. Chúng ta sẽ xem qua một ứng dụng đơn giản trong Scala [với sbt], Java [với Maven] và Python [pip]

Chúng ta sẽ tạo một ứng dụng Spark rất đơn giản trong Scala–đơn giản đến mức nó được đặt tên là

>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
2

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
7

Lưu ý rằng các ứng dụng nên xác định phương thức

>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
3 thay vì mở rộng
>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
4. Các lớp con của
>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
4 có thể không hoạt động chính xác

Chương trình này chỉ đếm số dòng chứa 'a' và số chứa 'b' trong Spark README. Lưu ý rằng bạn sẽ cần thay thế YOUR_SPARK_HOME bằng vị trí đã cài đặt Spark. Không giống như các ví dụ trước với trình bao Spark khởi tạo SparkSession của chính nó, chúng tôi khởi tạo SparkSession như một phần của chương trình

Chúng tôi gọi

>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
6 để xây dựng một
>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
7, sau đó đặt tên ứng dụng và cuối cùng gọi
>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
8 để lấy phiên bản
>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
7

Ứng dụng của chúng tôi phụ thuộc vào API Spark, vì vậy chúng tôi cũng sẽ bao gồm tệp cấu hình sbt,

>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
0, giải thích rằng Spark là một phần phụ thuộc. Tệp này cũng thêm một kho lưu trữ mà Spark phụ thuộc vào

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
8

Để sbt hoạt động chính xác, chúng ta cần bố trí

>>> textFile.count[]  # Number of rows in this DataFrame
126

>>> textFile.first[]  # First row in this DataFrame
Row[value=u'# Apache Spark']
2 và
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
0 theo cấu trúc thư mục điển hình. Khi đã sẵn sàng, chúng ta có thể tạo gói JAR chứa mã của ứng dụng, sau đó sử dụng tập lệnh
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
3 để chạy chương trình của chúng ta

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
9

Ví dụ này sẽ sử dụng Maven để biên dịch JAR ứng dụng, nhưng bất kỳ hệ thống xây dựng tương tự nào cũng sẽ hoạt động

Chúng ta sẽ tạo một ứng dụng Spark rất đơn giản,

>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
4

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
0

Chương trình này chỉ đếm số dòng chứa 'a' và số chứa 'b' trong Spark README. Lưu ý rằng bạn sẽ cần thay thế YOUR_SPARK_HOME bằng vị trí đã cài đặt Spark. Không giống như các ví dụ trước với trình bao Spark khởi tạo SparkSession của chính nó, chúng tôi khởi tạo SparkSession như một phần của chương trình

Để xây dựng chương trình, chúng tôi cũng viết tệp Maven

>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
5 liệt kê Spark dưới dạng phụ thuộc. Lưu ý rằng các tạo phẩm Spark được gắn thẻ với phiên bản Scala

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
1

Chúng tôi sắp xếp các tệp này theo cấu trúc thư mục Maven chuẩn

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
2

Bây giờ, chúng ta có thể đóng gói ứng dụng bằng Maven và thực thi nó với

>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
6

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
3

Bây giờ chúng tôi sẽ trình bày cách viết một ứng dụng bằng API Python [PySpark]

Nếu bạn đang xây dựng một ứng dụng hoặc thư viện PySpark đóng gói, bạn có thể thêm nó vào thiết lập của mình. tệp py dưới dạng

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
4

Ví dụ, chúng ta sẽ tạo một ứng dụng Spark đơn giản,

>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
7

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
5

Chương trình này chỉ đếm số dòng chứa 'a' và số chứa 'b' trong một tệp văn bản. Lưu ý rằng bạn sẽ cần thay thế YOUR_SPARK_HOME bằng vị trí đã cài đặt Spark. Như với các ví dụ về Scala và Java, chúng tôi sử dụng SparkSession để tạo Bộ dữ liệu. Đối với các ứng dụng sử dụng các lớp tùy chỉnh hoặc thư viện của bên thứ ba, chúng tôi cũng có thể thêm các phụ thuộc mã vào

>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
3 thông qua đối số
>>> linesWithSpark = textFile.filter[textFile.value.contains["Spark"]]
9 của nó bằng cách đóng gói chúng vào một. zip [xem
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
00 để biết chi tiết].
scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
01 đủ đơn giản để chúng tôi không cần chỉ định bất kỳ mã phụ thuộc nào

Chúng tôi có thể chạy ứng dụng này bằng tập lệnh

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
02

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
6

Nếu bạn đã cài đặt PySpark pip vào môi trường của mình [e. g. ,

scala> textFile.count[] // Number of items in this Dataset
res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs

scala> textFile.first[] // First item in this Dataset
res1: String = # Apache Spark
03], bạn có thể chạy ứng dụng của mình bằng trình thông dịch Python thông thường hoặc sử dụng tùy chọn 'spark-submit' được cung cấp tùy ý

scala> val linesWithSpark = textFile.filter[line => line.contains["Spark"]]
linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
7

Các công cụ quản lý phụ thuộc khác như Conda và pip cũng có thể được sử dụng cho các lớp tùy chỉnh hoặc thư viện của bên thứ ba. Xem thêm Quản lý gói Python

Chủ Đề