Scala và Python là những API phổ biến nhất. Bài đăng trên blog này thực hiện so sánh chi tiết cách viết Spark với Scala và Python và giúp người dùng chọn API ngôn ngữ phù hợp nhất với nhóm của họ
Cả hai API ngôn ngữ đều là những tùy chọn tuyệt vời cho hầu hết các quy trình công việc. Spark cho phép bạn viết mã thanh lịch để chạy công việc trên bộ dữ liệu lớn – đó là một công nghệ tuyệt vời
Chọn API ngôn ngữ phù hợp là rất quan trọng. Hãy suy nghĩ và thử nghiệm rộng rãi trước khi đưa ra quyết định cuối cùng
Python và Apache Spark là những từ thông dụng nhất trong ngành phân tích. Apache Spark là một khung mã nguồn mở phổ biến, đảm bảo xử lý dữ liệu với tốc độ cực nhanh và hỗ trợ nhiều ngôn ngữ khác nhau như Scala, Python, Java và R. Sau đó, nó tập trung vào tùy chọn ngôn ngữ và phạm vi công việc của bạn. Thông qua bài viết lập trình PySpark này, tôi sẽ nói về Spark với Python để chứng minh cách Python tận dụng các chức năng của Apache Spark
Trước khi chúng ta bắt đầu hành trình Lập trình PySpark, hãy để tôi liệt kê các chủ đề mà tôi sẽ đề cập trong bài viết này
Vì vậy, hãy bắt đầu với chủ đề đầu tiên trong danh sách của chúng tôi, tôi. e. , Lập trình PySpark
PySpark là sự hợp tác của Apache Spark và Python
Apache Spark là một khung điện toán cụm nguồn mở, được xây dựng dựa trên tốc độ, tính dễ sử dụng và phân tích luồng trong khi Python là ngôn ngữ lập trình cấp cao, có mục đích chung. Nó cung cấp một loạt các thư viện và được sử dụng chủ yếu cho Machine Learning và Real-Time Streaming Analytics
Nói cách khác, đó là API Python dành cho Spark cho phép bạn khai thác tính đơn giản của Python và sức mạnh của Apache Spark để chế ngự Dữ liệu lớn. Tìm hiểu thêm về Dữ liệu lớn và các ứng dụng của nó từ Cộng tác viên kỹ sư dữ liệu Azure
Bạn có thể thắc mắc, tại sao tôi lại chọn Python để làm việc với Spark khi có sẵn các ngôn ngữ khác. Để trả lời câu hỏi này, tôi đã liệt kê một số lợi ích mà bạn sẽ thích với Python
- Python rất dễ học và thực hiện
- Nó cung cấp API đơn giản và toàn diện
- Với Python, khả năng đọc mã, bảo trì và mức độ quen thuộc tốt hơn nhiều
- Nó cung cấp các tùy chọn khác nhau để trực quan hóa dữ liệu, điều khó sử dụng Scala hoặc Java
- Python đi kèm với một loạt các thư viện như numpy, pandas, scikit-learning, seaborn, matplotlib, v.v.
- Nó được hỗ trợ bởi một cộng đồng lớn và tích cực
Bây giờ bạn đã biết những ưu điểm của lập trình PySpark, chúng ta hãy đi sâu vào các nguyên tắc cơ bản của PySpark
Lập trình PySpark. Đào tạo PySpark. Edureka
Video này sẽ cung cấp cho bạn thông tin chi tiết về các khái niệm cơ bản của PySpark
Bộ dữ liệu phân tán đàn hồi [RDD]
RDD là khối xây dựng của bất kỳ ứng dụng Spark nào. RDD là viết tắt của
- Đàn hồi. Nó có khả năng chịu lỗi và có khả năng xây dựng lại dữ liệu khi bị lỗi
- phân phối. Dữ liệu được phân phối giữa nhiều nút trong một cụm
- tập dữ liệu. Thu thập dữ liệu được phân vùng với các giá trị
Đó là một lớp dữ liệu trừu tượng trên bộ sưu tập được phân phối. Nó không thay đổi về bản chất và tuân theo sự biến đổi lười biếng.
Với RDD, bạn có thể thực hiện hai loại thao tác
- phép biến hình. Các thao tác này được áp dụng để tạo RDD mới
- hành động. Các thao tác này được áp dụng trên RDD để hướng dẫn Apache Spark áp dụng tính toán và chuyển kết quả trở lại trình điều khiển
Khung dữ liệu
Dataframe trong PySpark là tập hợp dữ liệu có cấu trúc hoặc bán cấu trúc được phân phối. Dữ liệu này trong Dataframe được lưu trữ trong các hàng dưới các cột được đặt tên tương tự như các bảng cơ sở dữ liệu quan hệ hoặc trang tính excel.
Nó cũng chia sẻ một số thuộc tính chung với RDD như Bản chất không thay đổi, tuân theo các đánh giá lười biếng và được phân phối trong tự nhiên. Nó hỗ trợ nhiều định dạng như JSON, CSV, TXT và nhiều định dạng khác. Ngoài ra, bạn có thể tải nó từ các RDD hiện có hoặc bằng cách chỉ định lược đồ theo chương trình.
PySpark SQL
PySpark SQL là mô-đun trừu tượng cấp cao hơn trên PySpark Core. Nó chủ yếu được sử dụng để xử lý các tập dữ liệu có cấu trúc và bán cấu trúc. Nó cũng cung cấp một API được tối ưu hóa có thể đọc dữ liệu từ nhiều nguồn dữ liệu chứa các định dạng tệp khác nhau. Do đó, với PySpark, bạn có thể xử lý dữ liệu bằng cách sử dụng SQL cũng như HiveQL. Do tính năng này, PySparkSQL đang dần trở nên phổ biến đối với các lập trình viên cơ sở dữ liệu và người dùng Apache Hive
Đăng ký kênh YouTube của chúng tôi để nhận thông tin cập nhật mới
Phát trực tuyến PySpark
PySpark Streaming là một hệ thống có khả năng mở rộng, chịu lỗi tuân theo mô hình lô RDD. Về cơ bản, nó được vận hành theo các đợt nhỏ hoặc khoảng thời gian hàng loạt có thể nằm trong khoảng từ 500 mili giây đến các cửa sổ khoảng thời gian lớn hơn
Trong trường hợp này, Spark Streaming nhận luồng dữ liệu đầu vào liên tục từ các nguồn như Apache Flume, Kinesis, Kafka, ổ cắm TCP, v.v. Những dữ liệu được truyền phát này sau đó được chia nhỏ bên trong thành nhiều lô nhỏ hơn dựa trên khoảng thời gian của lô và được chuyển tiếp đến Spark Engine. Spark Engine xử lý các lô dữ liệu này bằng thuật toán phức tạp được thể hiện bằng các chức năng cấp cao như bản đồ, thu nhỏ, nối và cửa sổ. Sau khi quá trình xử lý hoàn tất, các lô đã xử lý sẽ được đẩy ra cơ sở dữ liệu, hệ thống tệp và trang tổng quan trực tiếp
Sự trừu tượng hóa chính cho Spark Streaming là Luồng rời rạc [DStream]. DStream được xây dựng trên RDD tạo điều kiện cho các nhà phát triển Spark làm việc trong cùng bối cảnh của RDD và lô để giải quyết các sự cố phát trực tuyến. Hơn nữa, Spark Streaming cũng tích hợp với MLlib, SQL, DataFrames và GraphX giúp mở rộng phạm vi chức năng của bạn. Là một API cấp cao, Spark Streaming cung cấp khả năng chịu lỗi "chính xác một lần" ngữ nghĩa cho các hoạt động có trạng thái.
GHI CHÚ. Ngữ nghĩa “chính xác một lần” có nghĩa là các sự kiện sẽ được xử lý “chính xác một lần” bởi tất cả các toán tử trong ứng dụng luồng, ngay cả khi có bất kỳ lỗi nào xảy ra
Sơ đồ bên dưới, đại diện cho các thành phần cơ bản của Spark Streaming
Như bạn có thể thấy, Dữ liệu được nhập vào Spark Stream từ nhiều nguồn khác nhau như ổ cắm Kafka, Flume, Twitter, ZeroMQ, Kinesis hoặc TCP, v.v. Hơn nữa, dữ liệu này được xử lý bằng các thuật toán phức tạp được thể hiện bằng các chức năng cấp cao như bản đồ, rút gọn, nối và cửa sổ. Cuối cùng, dữ liệu đã xử lý này được đẩy ra các hệ thống tệp, cơ sở dữ liệu và bảng điều khiển trực tiếp khác nhau để sử dụng thêm
Tôi hy vọng điều này cung cấp cho bạn một bức tranh rõ ràng về cách hoạt động của Spark Streaming. Bây giờ chúng ta hãy chuyển sang chủ đề cuối cùng nhưng hấp dẫn nhất của bài viết Lập trình PySpark này, tôi. e. Học máy
Tìm hiểu thêm về Dữ liệu lớn và các ứng dụng của nó từ Khóa học Kỹ thuật dữ liệu Azure ở Mumbai
Học máy
Như bạn đã biết, Python là một ngôn ngữ trưởng thành đang được sử dụng nhiều cho khoa học dữ liệu và máy học từ rất lâu đời. Trong PySpark, máy học được hỗ trợ bởi một thư viện Python có tên MLlib [Machine Learning Library]. Không có gì khác ngoài một trình bao bọc trên PySpark Core, thực hiện phân tích dữ liệu bằng thuật toán học máy như phân loại, phân cụm, hồi quy tuyến tính và một số tính năng khác.
Một trong những tính năng hấp dẫn của máy học với PySpark là nó hoạt động trên các hệ thống phân tán và có khả năng mở rộng cao
MLlib hiển thị ba chức năng học máy cốt lõi với PySpark
- Chuẩn bị dữ liệu. Nó cung cấp nhiều tính năng khác nhau như trích xuất, chuyển đổi, lựa chọn, băm, v.v.
- thuật toán máy học. Nó sử dụng một số thuật toán hồi quy, phân loại và phân cụm phổ biến và nâng cao cho máy học
- tiện ích. Nó có các phương pháp thống kê như kiểm định chi bình phương, thống kê mô tả, đại số tuyến tính và phương pháp đánh giá mô hình
Hãy để tôi chỉ cho bạn cách triển khai máy học bằng cách sử dụng phân loại thông qua hồi quy logistic
Ở đây, tôi sẽ thực hiện một phân tích dự đoán đơn giản về dữ liệu kiểm tra thực phẩm của Thành phố Chicago
##Importing the required libraries from pyspark.ml import Pipeline from pyspark.ml.classification import LogisticRegression from pyspark.ml.feature import HashingTF, Tokenizer from pyspark.sql import Row from pyspark.sql.functions import UserDefinedFunction from pyspark.sql.types import * ##creating a RDD by importing and parsing the input data def csvParse[s]: import csv from StringIO import StringIO sio = StringIO[s] value = csv.reader[sio].next[] sio.close[] return value food_inspections = sc.textFile['file:////home/edureka/Downloads/Food_Inspections_Chicago_data.csv'] .map[csvParse] ##Display data format food_inspections.take[1]
#Structuring the data schema = StructType[[ StructField["id", IntegerType[], False], StructField["name", StringType[], False], StructField["results", StringType[], False], StructField["violations", StringType[], True]]] #creating a dataframe and a temporary table [Results] required for the predictive analysis. ##sqlContext is used to perform transformations on structured data ins_df = spark.createDataFrame[food_inspections.map[lambda l: [int[l[0]], l[1], l[12], l[13]]] , schema] ins_df.registerTempTable['Count_Results'] ins_df.show[]
##Let's now understand our dataset #show the distinct values in the results column result_data = ins_df.select['results'].distinct[].show[]
##converting the existing dataframe into a new dataframe ###each inspection is represented as a label-violations pair. ####Here 0.0 represents a failure, 1.0 represents a success, and -1.0 represents some results besides those two def label_Results[s]: if s == 'Fail': return 0.0 elif s == 'Pass with Conditions' or s == 'Pass': return 1.0 else: return -1.0 ins_label = UserDefinedFunction[label_Results, DoubleType[]] labeled_Data = ins_df.select[ins_label[ins_df.results].alias['label'], ins_df.violations].where['label >= 0'] labeled_Data.take[1]
##Creating a logistic regression model from the input dataframe tokenizer = Tokenizer[inputCol="violations", outputCol="words"] hashingTF = HashingTF[inputCol=tokenizer.getOutputCol[], outputCol="features"] lr = LogisticRegression[maxIter=10, regParam=0.01] pipeline = Pipeline[stages=[tokenizer, hashingTF, lr]] model = pipeline.fit[labeled_Data] ## Evaluating with Test Data test_Data = sc.textFile['file:////home/edureka/Downloads/Food_Inspections_test.csv'] .map[csvParse] .map[lambda l: [int[l[0]], l[1], l[12], l[13]]] test_df = spark.createDataFrame[test_Data, schema].where["results = 'Fail' OR results = 'Pass' OR results = 'Pass with Conditions'"] predict_Df = model.transform[test_df] predict_Df.registerTempTable['Predictions'] predict_Df.columns
## Printing 1st row predict_Df.take[1]
## Predicting the final result numOfSuccess = predict_Df.where["""[prediction = 0 AND results = 'Fail'] OR [prediction = 1 AND [results = 'Pass' OR results = 'Pass with Conditions']]"""].count[] numOfInspections = predict_Df.count[] print "There were", numOfInspections, "inspections and there were", numOfSuccess, "successful predictions" print "This is a", str[[float[numOfSuccess] / float[numOfInspections]] * 100] + "%", "success rate"
Với điều này, chúng ta đi đến cuối blog này về Lập trình PySpark. Hy vọng nó đã giúp thêm một số giá trị cho kiến thức của bạn
Tìm hiểu cách phân tích dữ liệu bằng Python với Spark. ĐĂNG KÝ NGAY BÂY GIỜ
Nếu bạn thấy blog Lập trình PySpark này phù hợp, hãy xem Khóa đào tạo PySpark của Edureka, một công ty học trực tuyến đáng tin cậy với mạng lưới hơn 250.000 người học hài lòng trải khắp toàn cầu.