Python xử lý hàng triệu dữ liệu như thế nào?
Thiếu CPU, chương trình của bạn chạy chậm hơn; . Nhưng bạn có thể xử lý bộ dữ liệu lớn hơn RAM bằng Python, như bạn sẽ tìm hiểu trong loạt bài viết sau Show
cấu trúc mãKỹ thuật quản lý dữ liệugấu trúcNumPyĐo mức sử dụng bộ nhớTìm hiểu các kỹ năng kỹ thuật phần mềm Python thực tế mà bạn có thể sử dụng trong công việc của mìnhĐăng ký nhận bản tin của tôi và tham gia cùng hơn 6500 nhà phát triển Python và nhà khoa học dữ liệu học các công cụ và kỹ thuật thực tế, từ hiệu suất Python đến đóng gói Docker, với một bài viết mới miễn phí trong hộp thư đến của bạn mỗi tuần Hướng dẫn này giới thiệu việc xử lý một tập dữ liệu khổng lồ trong python. Nó cho phép bạn làm việc với một lượng lớn dữ liệu với máy tính xách tay của riêng bạn. Với phương pháp này, bạn có thể sử dụng các hàm tổng hợp trên tập dữ liệu mà bạn không thể nhập trong DataFrame Trong ví dụ của chúng tôi, máy có 32 nhân với 17GB Ram. Về dữ liệu tệp có tên user_log. csv, số hàng của tập dữ liệu là 400 Triệu (6. 7 GB được nén) và nó tương ứng với nhật ký người dùng hàng ngày mô tả hành vi nghe của người dùng Về các tính năng
– ngày. định dạng %Y%m%d – num_25. # bài hát được phát dưới 25% thời lượng bài hát – num_50. Số bài hát được phát từ 25% đến 50% thời lượng bài hát – num_75. Số bài hát được phát từ 50% đến 75% thời lượng bài hát – num_985. # bài hát được phát từ 75% đến 98. 5% thời lượng bài hát – num_100. # bài hát được phát trên 98. 5% thời lượng bài hát – num_unq. Số bài hát duy nhất được phát – total_sec. tổng số giây đã phát Hướng dẫn của chúng tôi bao gồm hai phần. Phần đầu tiên sẽ tập trung vào tổng hợp dữ liệu. Không thể nhập tất cả dữ liệu trong khung dữ liệu và sau đó thực hiện tổng hợp. # Load the required packages import time import psutil import numpy as np import pandas as pd import multiprocessing as mp # Check the number of cores and memory usage num_cores = mp.cpu_count() print("This kernel has ",num_cores,"cores and you can find the information regarding the memory usage:",psutil.virtual_memory()) This kernel has 32 cores and you can find the information regarding the memory usage: svmem(total=121466597376, available=91923750912, percent=24.3, used=27252092928, free=63477522432, active=46262022144, inactive=9810317312, buffers=1972326400, cached=28764655616, shared=1120292864) Gói # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer6 hiển thị cho bạn số lượng lõi của máy trong khi gói # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer7 hiển thị thông tin khác trên bộ nhớ máy của bạn. Các gói duy nhất mà chúng tôi cần xử lý là # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer8 và # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer0. # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer1 sẽ chỉ được sử dụng để hiển thị thời lượng cho mỗi lần lặp lại tổng hợpCác hàm tổng hợp được chọn là # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer2, # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer3 và # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer4 cho đối tượng địa lý “ngày” và # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer5 đối với đối tượng địa lý “num_25”, “num_50”, “num_75”, “num_985”, “num_100”, “num_unq” và “totalc_secs”. Do đó với mỗi khách hàng chúng tôi sẽ có ngày đầu tiên, ngày cuối cùng và số lần sử dụng dịch vụ. Cuối cùng, chúng tôi sẽ thu thập số lượng bài hát đã phát theo độ dài # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer Để tổng hợp dữ liệu của chúng tôi, chúng tôi phải sử dụng chunksize. Tùy chọn này của read_csv cho phép bạn tải tệp lớn dưới dạng các phần nhỏ trong # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer6. Chúng tôi quyết định lấy 10% tổng chiều dài cho chunksize tương ứng với 40 triệu hàng. Hãy cẩn thận, không nhất thiết phải thú vị khi lấy một giá trị nhỏ. Thời gian giữa mỗi lần lặp lại có thể quá dài với kích thước chaunk nhỏ. Để tìm ra sự đánh đổi tốt nhất “Mức sử dụng bộ nhớ – Thời gian”, bạn có thể thử các kích thước khối khác nhau và chọn cái tốt nhất sẽ tiêu tốn ít bộ nhớ hơn và cái nào sẽ nhanh hơn. # Number of rows for each chunk size = 4e7 # 40 Millions reader = pd.read_csv('user_logs.csv', chunksize = size, index_col = ['msno']) start_time = time.time() for i in range(10): user_log_chunk = next(reader) if(i==0): result = process_user_log(user_log_chunk) print("Number of rows ",result.shape[0]) print("Loop ",i,"took %s seconds" % (time.time() - start_time)) else: result = result.append(process_user_log(user_log_chunk)) print("Number of rows ",result.shape[0]) print("Loop ",i,"took %s seconds" % (time.time() - start_time)) del(user_log_chunk) # Unique users vs Number of rows after the first computation print("size of result:", len(result)) check = result.index.unique() print("unique user in result:", len(check)) result.columns = ['_'.join(col).strip() for col in result.columns.values] Number of rows 1925303 Loop 0 took 76.11969661712646 seconds Number of rows 3849608 Loop 1 took 150.54171466827393 seconds Number of rows 5774168 Loop 2 took 225.91669702529907 seconds Number of rows 7698020 Loop 3 took 301.34390926361084 seconds Number of rows 9623341 Loop 4 took 379.118084192276 seconds Number of rows 11547939 Loop 5 took 456.7346053123474 seconds Number of rows 13472137 Loop 6 took 533.522665977478 seconds Number of rows 15397016 Loop 7 took 609.7849867343903 seconds Number of rows 17322397 Loop 8 took 686.7019085884094 seconds Number of rows 19166671 Loop 9 took 747.1662466526031 seconds size of result: 19166671 unique user in result: 5234111 Với tính toán đầu tiên của chúng tôi, chúng tôi đã bao phủ dữ liệu 40 triệu hàng trên 40 triệu hàng nhưng có thể một khách hàng nằm trong nhiều mẫu con. Tổng thời gian tính toán là khoảng mười hai phút. Kết quả tập dữ liệu mới được tạo bởi 19 triệu hàng cho 5 triệu người dùng duy nhất. Vì vậy, cần phải tính toán lần thứ hai các hàm tổng hợp của chúng ta. Nhưng bây giờ có thể làm điều đó trên toàn bộ dữ liệu vì chúng tôi chỉ có 19 triệu hàng so với 400 triệu lúc đầu Đối với phép tính thứ hai, không cần thiết phải sử dụng chunksize, chúng ta có bộ nhớ cần thiết để thực hiện phép tính trên toàn bộ kết quả. Nếu bạn không thể làm điều đó trên toàn bộ dữ liệu, bạn có thể chạy mã trước đó với kích thước khối khác và dẫn đến đầu vào để giảm dữ liệu lần thứ hai # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer0 # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer1 Cuối cùng, chúng tôi có khung dữ liệu mới với 5 Hàng triệu hàng và một người dùng khác nhau theo hàng. Với dữ liệu này, chúng tôi đã mất đi tính tạm thời mà chúng tôi có trong dữ liệu đầu vào nhưng chúng tôi có thể làm việc với dữ liệu này. Thật thú vị cho cách tiếp cận dạng bảng đối với máy học Giảm mức sử dụng bộ nhớTrong phần này chúng ta sẽ quan tâm đến việc sử dụng bộ nhớ. Chúng ta có thể thấy rằng tất cả các cột ngoại trừ “date_min” và “total_secs_sum” đều là # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer7. Nó không phải lúc nào cũng hợp lý và nó sử dụng rất nhiều bộ nhớ để làm gì. với hàm # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer8 ta thấy chỉ có đặc trưng “total_secs_sum” là đúng kiểu. Chúng tôi đã thay đổi loại cho từng tính năng để giảm mức sử dụng bộ nhớ # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer4 # Writing as a function def process_user_log(chunk): grouped_object = chunk.groupby(chunk.index,sort = False) # not sorting results in a minor speedup func = {'date':['min','max','count'],'num_25':['sum'],'num_50':['sum'], 'num_75':['sum'],'num_985':['sum'], 'num_100':['sum'],'num_unq':['sum'],'total_secs':['sum']} answer = grouped_object.agg(func) return answer5 Với loại phù hợp cho từng tính năng, chúng tôi đã giảm 44% mức sử dụng. Nó không phải là không thể bỏ qua, đặc biệt là khi chúng tôi có hạn chế về phần cứng hoặc khi bạn cần bộ nhớ để triển khai mô hình máy học. Nó tồn tại các phương pháp khác để giảm mức sử dụng bộ nhớ. Bạn phải cẩn thận về loại của từng tính năng nếu bạn muốn tối ưu hóa thao tác dữ liệu Python xử lý dữ liệu lớn như thế nào?3 cách xử lý tập dữ liệu lớn trong Python. Là một nhà khoa học dữ liệu, tôi thấy mình ngày càng phải đối mặt với “dữ liệu lớn”. . Giảm mức sử dụng bộ nhớ bằng cách tối ưu hóa các loại dữ liệu. . Chia dữ liệu thành nhiều khối. . Tận dụng đánh giá lười biếng Python có thể xử lý hàng triệu bản ghi không?Câu trả lời là CÓ. Bạn có thể xử lý các tập dữ liệu lớn trong python bằng Pandas bằng một số kỹ thuật .
Python có thể xử lý 1 tỷ hàng không?Giới thiệu về Vaex. Vaex là một thư viện python là khung dữ liệu ngoài lõi, có thể xử lý tới 1 tỷ hàng mỗi giây . 1 tỷ hàng.
Pandas có thể xử lý 20 triệu hàng không?Thông thường, Pandas tìm thấy điểm thích hợp của nó khi sử dụng trong các bộ dữ liệu có kích thước từ thấp đến trung bình lên đến vài triệu hàng. Ngoài ra, các khung phân tán hơn như Spark hoặc Dask thường được ưu tiên hơn. Tuy nhiên, có thể mở rộng quy mô gấu trúc ra ngoài điểm này . |